In [None]:
import os
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ModelCheckpoint
from keras.models import Sequential
import pickle
from sklearn.metrics import accuracy_score

In [None]:
PICTURE_SIZE = 204
PATCH_SIZE = 180
EPOCHS = 1
FILE_PATH = r"C:/Users/lenovo/Desktop/DL_data/5/patches/patches/"
BATCH_SIZE = 32
LEARNING_RATE = 0.0005
PARTITION_PATH = 'C:/Users/lenovo/Desktop/DL_data/5/partition_data.pkl'
LABELS_PATH = 'C:/Users/lenovo/Desktop/DL_data/5/labels_data.pkl'

In [None]:
class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, labels, batch_size=BATCH_SIZE, dim=(PATCH_SIZE, PATCH_SIZE), n_channels=1,
                 n_classes=PICTURE_SIZE, shuffle=True, data_gen = True):
        'Initialization' #
        self.dim = dim
        self.batch_size = batch_size
        self.labels = labels
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()
        self.data_gen = data_gen

        self.datagen = ImageDataGenerator(
          rotation_range=20,
          width_shift_range=0.2,
          height_shift_range=0.2,
          horizontal_flip=True)

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples'
        # Initialization
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty((self.batch_size), dtype=int)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            filepath = FILE_PATH
            loaded_data = np.load(filepath + ID + '.npy')
            loaded_data = np.expand_dims(loaded_data, axis=-1)
            if self.data_gen:
              loaded_data = self.datagen.random_transform(loaded_data)

            X[i,] = loaded_data
            # Store class
            y[i] = self.labels[ID]

        return X, y

In [None]:
# Parameters
params = {'dim': (PATCH_SIZE, PATCH_SIZE),
          'batch_size': BATCH_SIZE,
          'n_classes': PICTURE_SIZE,
          'n_channels': 1,
          'shuffle': True}

with open(PARTITION_PATH, "rb") as file:
    partition = pickle.load(file)

with open(LABELS_PATH, "rb") as file:
    labels = pickle.load(file)

# Generators
training_generator = DataGenerator(partition['train'], labels, **params, data_gen = True)
validation_generator = DataGenerator(partition['validation'], labels, **params, data_gen = True)

In [None]:
model = tf.keras.models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(PATCH_SIZE, PATCH_SIZE, 1)),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(256, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),

    layers.Flatten(),

    layers.Dense(128, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(PICTURE_SIZE)
])


In [None]:
model.summary()

In [None]:


filepath = "/content/model_at_epoch_{epoch}.hdf5"

checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='auto', save_freq='epoch')

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
# This Version Is For Not Saving The Model Throughout The Training Process

# history = model.fit(training_generator,
#           validation_data=validation_generator,
#           epochs=EPOCHS,
#           workers=1)

history = model.fit(training_generator,
          validation_data=validation_generator,
          epochs=EPOCHS,
          workers=1,
          callbacks=[checkpoint])

In [None]:


tests_predictions_lists_by_img = []
tests_predictions_by_img = []
tests_lables_by_img = []
for idx, sublist in enumerate(partition['test']):
  tests_patches = []
  tests_lables_by_img.append(idx)
  for ID in sublist:
    filepath = FILE_PATH
    tests_patches.append(np.expand_dims(np.load(filepath + ID + '.npy'), axis=-1))

  predictions = model.predict(np.array(tests_patches))
  predicted_classes = np.argmax(predictions, axis=1)

  tests_predictions_lists_by_img.append(predictions)

  tests_predictions_by_img.append(predicted_classes) # save only the prediction of the patches to save in memory

all_tests_predictions = []
all_tests_labels = []
for idx, sublist in enumerate(tests_predictions_by_img):
  all_tests_predictions.extend(sublist)
  all_tests_labels.extend([idx] * len(sublist))

test_acc = accuracy_score(all_tests_labels, all_tests_predictions)

print('\nTest accuracy:', test_acc)

In [None]:
predictions_by_images = np.array([])
for predictions in tests_predictions_by_img:
  most_common = np.bincount(predictions).argmax()
  predictions_by_images = np.append(predictions_by_images, most_common)


In [None]:
# Here we sum all the predictions into one set of predictions and then find the maximum.
# It is more successful than the previous method.

predictions_by_sum_method = np.array([])

for predictions in tests_predictions_lists_by_img:
  sum_predictions = np.sum(predictions, axis=0)
  predicted_classes_sum = np.argmax(sum_predictions, axis=0)
  predictions_by_sum_method = np.append(predictions_by_sum_method, predicted_classes_sum)

misclassified_imgs1 = []

for i in range(len(predictions_by_sum_method)):
  if i != int(predictions_by_sum_method[i]):
    misclassified_imgs1.append((i, int(predictions_by_sum_method[i])))

# Save to a pkl file
with open('misclassified_imgs.pkl', 'wb') as f:
    pickle.dump(misclassified_imgs1, f)

In [None]:

# Plot training & validation accuracy values
plt.figure(figsize=(12,6))
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.grid(True)
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

# Create the confusion matrix
most_common_cm = confusion_matrix(tests_lables_by_img, predictions_by_images)

# Visualize the confusion matrix
plt.figure(figsize=(40, 40))
sns.heatmap(most_common_cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title("Confusion Matrix of the Most Common Prediction Method Accuracy Score", fontsize=16)
plt.show()

# Create the confusion matrix
sum_predictions_cm = confusion_matrix(tests_lables_by_img, predictions_by_sum_method)

# Visualize the confusion matrix
plt.figure(figsize=(40, 40))
sns.heatmap(sum_predictions_cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title("Confusion Matrix of the Sum Predictions Method Accuracy Score", fontsize=16)
plt.show()

num_of_patches_cm = confusion_matrix(all_tests_labels, all_tests_predictions)

# Visualize the confusion matrix
plt.figure(figsize=(60, 60))
sns.heatmap(num_of_patches_cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title("Confusion Matrix of Amount of incorrect and correct patches classified", fontsize=16)
plt.show()


In [None]:
from sklearn.metrics import accuracy_score
print('Picture Size: ', PICTURE_SIZE)
print('Epochs: ', EPOCHS)
print('Learning Rate: ', LEARNING_RATE)
print('Batch Size: ', BATCH_SIZE, '\n')
print('Results: ')
print('Test:  ', round(test_acc, 3))
print('Accuracy Score', round(accuracy_score(tests_lables_by_img, predictions_by_images), 3))
print('Sum Predictions Accuracy Score', round(accuracy_score(tests_lables_by_img, predictions_by_sum_method), 3))

In [None]:
from keras import backend as K
from keras.models import Model

def deprocess_image(x):
    x -= tf.reduce_mean(x)
    x /= (tf.math.reduce_std(x) + 1e-5)
    x *= 0.1
    x += 0.5
    x = tf.clip_by_value(x, 0, 1)
    x *= 255
    x = tf.clip_by_value(x, 0, 255)
    return tf.cast(x, tf.uint8)

def generate_pattern(layer_name, filter_index, model_new, size=PATCH_SIZE):
    def calculate_loss_and_grads(input_image):
        with tf.GradientTape() as tape:
            tape.watch(input_image)
            loss = tf.reduce_mean(model_new(input_image)[:, :, :, filter_index])
        grads = tape.gradient(loss, input_image)
        grads /= (tf.sqrt(tf.reduce_mean(tf.square(grads))) + 1e-5)
        return loss, grads

    input_img_data = tf.random.uniform((1, size, size, 1)) * 20 + 128
    input_img_data = tf.Variable(input_img_data)
    step = 1.
    for i in range(100):
        loss_value, grads_value = calculate_loss_and_grads(input_img_data)
        input_img_data.assign_add(grads_value * step)
    img = input_img_data[0]
    return deprocess_image(img)

def plot_max_neuron_reaction(layer_name, neuron_num, fig_num=32):
  filter_index = 0
  layer_output = model.get_layer(layer_name).output
  loss = K.mean(layer_output[:, :, :, filter_index])

  input_img_data = tf.random.uniform((1, PATCH_SIZE, PATCH_SIZE, 1)) * 20 + 128
  input_img_data = tf.Variable(input_img_data)

  with tf.GradientTape() as tape:
      tape.watch(input_img_data)
      loss = model(input_img_data)

  grads = tape.gradient(loss, input_img_data)
  grads /= (tf.sqrt(tf.reduce_mean(tf.square(grads))) + 1e-5)

  step = 1.
  for i in range(40):
      with tf.GradientTape() as tape:
          tape.watch(input_img_data)
          loss = model(input_img_data)

      grads = tape.gradient(loss, input_img_data)
      grads /= (tf.sqrt(tf.reduce_mean(tf.square(grads))) + 1e-5)
      input_img_data.assign_add(grads * step)

  model_new = Model(inputs=model.inputs, outputs=model.get_layer(layer_name).output)

  plt.figure(figsize=(20, 20) )
  plt.suptitle(layer_name, fontsize=16)
  for i in range(fig_num):
      plt.subplot(8, 8, i+1)
      plt.xticks([])
      plt.yticks([])
      plt.grid(False)
      plt.imshow(generate_pattern(layer_name, i,model_new ))
  plt.show()


for layer in model.layers:
  if layer.name.startswith("conv2d") or layer.name.startswith("max_pooling2d"):
    plot_max_neuron_reaction(layer.name, neuron_num=32)
    