In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [None]:
import os
import json 
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from datetime import datetime

SEED = 1234
tf.random.set_seed(SEED)  

# Get current working directory
cwd = os.getcwd()

In [None]:
def create_csv(results, results_dir='./'):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')


In [None]:
# Run this cell only if you are using Colab with Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Run this cell only if you are using Colab with Drive
!unzip '/content/drive/My Drive/DeepLearning/kaggle/data/artificial-neural-networks-and-deep-learning-2020.zip'

In [None]:
labels_dir = os.path.join(cwd, 'MaskDataset')
training_dir = os.path.join(cwd, 'MaskDataset/training')

bs = 8
img_h = img_w = 256

with open(os.path.join(labels_dir, "train_gt.json")) as f:
  dic = json.load(f)
labels = pd.DataFrame(dic.items())
labels.rename(columns = {0:'filename', 1:'class'}, inplace = True)
labels["class"] = labels["class"].astype(str)

train_data_gen = ImageDataGenerator(rescale=1./255)
train_generator = data_gen.flow_from_dataframe(labels,
                                               training_dir,
                                               batch_size=bs,
                                               class_mode='categorical',
                                               shuffle=True,
                                               seed=SEED,
                                               target_size=(img_h, img_w)
                                               )



In [None]:
test_dir = os.path.join(cwd, 'MaskDataset/test')

# same data generator as training set (normalization only)
test_data_gen = ImageDataGenerator(rescale=1./255)
test_generator = test_data_gen.flow_from_directory( test_dir,
                                               batch_size=bs,
                                               class_mode='categorical',
                                               shuffle=True,
                                               seed=SEED,
                                               target_size=(img_h, img_w)
                                               )

In [None]:
train_gen, valid_gen = train_test_split(train_generator, test_size=0.33, random_state=SEED)

In [None]:
num_classes = 3

#img_h, img_w ??
train_dataset = tf.data.Dataset.from_generator(lambda: train_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes])
                                               )
train_dataset = train_dataset.repeat()

valid_dataset = tf.data.Dataset.from_generator(lambda: valid_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes])
                                               )
valid_dataset = valid_dataset.repeat()

In [None]:
# Let's test data augmentation
# ----------------------------
import time
import matplotlib.pyplot as plt

%matplotlib inline
    
iterator = iter(train_dataset)

In [None]:
augmented_img, target = next(iterator)
augmented_img = np.array(augmented_img[0])   # First element
augmented_img = augmented_img * 255  # denormalize
   
plt.imshow(np.uint8(augmented_img))
# fig.canvas.draw()
plt.plot()

In [None]:
decide_class_indices = False
if decide_class_indices:
    classes = ['NO_MASK',     # 0 
               'ALL_MASK',    # 1
               'SOME_MASK']   # 2

else:
    classes=None

In [None]:
# Check how keras assigned the labels
train_generator.class_indices

In [None]:
# Architecture: Features extraction -> Classifier

start_f = 8
depth = 5

model = tf.keras.Sequential()

# Features extraction
for i in range(depth):

    if i == 0:
        input_shape = [img_h, img_w, 3]
    else:
        input_shape=[None]

    # Conv block: Conv2D -> Activation -> Pooling
    model.add(tf.keras.layers.Conv2D(filters=start_f, 
                                     kernel_size=(3, 3),
                                     strides=(1, 1),
                                     padding='same',
                                     input_shape=input_shape))
    model.add(tf.keras.layers.ReLU())
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))

    start_f *= 2
    
# Classifier
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(units=512, activation='relu'))
model.add(tf.keras.layers.Dense(units=num_classes, activation='softmax'))

In [None]:
# In this cell we create the same model of the cell before, but we implement it
# by inheriting from tf.keras.Model. This is to provide you with an example of how
# we can create customized models. DO NOT run this cell if you have already created 
# the model in the cell above.

start_f = 8
depth = 5

# Create convolutional block
class ConvBlock(tf.keras.Model):
    def __init__(self, num_filters):
        super(ConvBlock, self).__init__()
        self.conv2d = tf.keras.layers.Conv2D(filters=num_filters,
                                             kernel_size=(3, 3),
                                             strides=(1, 1), 
                                             padding='same')
        self.activation = tf.keras.layers.ReLU()  # we can specify the activation function directly in Conv2D
        self.pooling = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
        
    def call(self, inputs):
        x = self.conv2d(inputs)
        x = self.activation(x)
        x = self.pooling(x)
        return x

class CNNClassifier(tf.keras.Model):
    def __init__(self, depth, start_f, num_classes):
        super(CNNClassifier, self).__init__()
        
        self.feature_extractor = tf.keras.Sequential()
    
        for i in range(depth):
            self.feature_extractor.add(ConvBlock(num_filters=start_f))
            start_f *= 2
            
        self.flatten = tf.keras.layers.Flatten()
        self.classifier = tf.keras.Sequential()
        self.classifier.add(tf.keras.layers.Dense(units=512, activation='relu'))
        self.classifier.add(tf.keras.layers.Dense(units=num_classes, activation='softmax'))
        
    def call(self, inputs):
        x = self.feature_extractor(inputs)
        x = self.flatten(x)
        x = self.classifier(x)
        return x
    
# Create Model instance
model = CNNClassifier(depth=depth,
                      start_f=start_f,
                      num_classes=num_classes)
# Build Model (Required)
model.build(input_shape=(None, img_h, img_w, 3))

In [None]:
# Visualize created model as a table
model.summary()

# Visualize initialized weights
model.weights

In [None]:
# Optimization params
# -------------------

# Loss
loss = tf.keras.losses.CategoricalCrossentropy()

# learning rate
lr = 1e-4
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
# -------------------

# Validation metrics
# ------------------

metrics = ['accuracy']
# ------------------

# Compile Model
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
%load_ext tensorboard
%tensorboard --logdir /content/drive/My\ Drive/Keras3/classification_experiments/

In [None]:
import os
from datetime import datetime


cwd = os.getcwd()

exps_dir = os.path.join('/content/drive/My Drive/Keras3/', 'classification_experiments')
if not os.path.exists(exps_dir):
    os.makedirs(exps_dir)

now = datetime.now().strftime('%b%d_%H-%M-%S')

model_name = 'CNN'

exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
    os.makedirs(exp_dir)
    
callbacks = []

# Model checkpoint
# ----------------
ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'), 
                                                   save_weights_only=True)  # False to save the model directly
callbacks.append(ckpt_callback)

# Visualize Learning on Tensorboard
# ---------------------------------
tb_dir = os.path.join(exp_dir, 'tb_logs')
if not os.path.exists(tb_dir):
    os.makedirs(tb_dir)
    
# By default shows losses and metrics for both training and validation
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                             profile_batch=0,
                                             histogram_freq=1)  # if 1 shows weights histograms
callbacks.append(tb_callback)

# Early Stopping
# --------------
early_stop = True
if early_stop:
    es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
    callbacks.append(es_callback)

In [None]:
# Run this cell only if you want to plot the confusion matrix in tensorboard
import io
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

file_writer_cm = tf.summary.create_file_writer(tb_dir + '/cm')

# Function to convert input figure to png tf image 
# (for plotting the confusion matrix in tensorboard)
def plot_to_image(figure):
  # Save the plot to a PNG in memory.
  buf = io.BytesIO()
  plt.savefig(buf, format='png')
  # Closing the figure prevents it from being displayed directly inside
  # the notebook.
  plt.close(figure)
  buf.seek(0)
  # Convert PNG buffer to TF image
  image = tf.image.decode_png(buf.getvalue(), channels=4)
  # Add the batch dimension
  image = tf.expand_dims(image, 0)
  return image

# Function to compute the confusion matrix (using sklearn) and to save it 
# in tensorboard
def log_confusion_matrix(epoch, logs):
  Y_prediction = model.predict_generator(valid_gen, len(valid_gen))
  # Convert predictions classes to one hot vectors 
  Y_pred_classes = np.argmax(Y_prediction,axis = 1) 
  # Convert validation observations to one hot vectors
  Y_true = valid_gen.classes
  # compute the confusion matrix
  confusion_mtx = confusion_matrix(Y_true, Y_pred_classes)
  fig = plt.figure(figsize=(10,8))
  sns.heatmap(confusion_mtx, annot=True, fmt="d");
  
  cm_image = plot_to_image(fig)

  with file_writer_cm.as_default():
    tf.summary.image("Confusion Matrix", cm_image, step=epoch)

cm_callback = tf.keras.callbacks.LambdaCallback(on_epoch_end=log_confusion_matrix)
callbacks.append(cm_callback)

In [None]:
model.fit(x=train_dataset,
          epochs=10,  #### set repeat in training dataset
          steps_per_epoch=len(train_gen),
          validation_data=valid_dataset,
          validation_steps=len(valid_gen), 
          #callbacks=callbacks
          )

# How to visualize Tensorboard

# 1. tensorboard --logdir EXPERIMENTS_DIR --port PORT     <- from terminal
# 2. localhost:PORT   <- in your browser

In [None]:
# Let's visualize the activations of our network
from PIL import Image

test_iter = iter(valid_dataset)

# Get a test image
test_img = next(test_iter)[0]
test_img = test_img[0]

# Visualize the image
Image.fromarray(np.uint8(np.array(test_img)*255.))

In [None]:
test_dir = os.path.join(cwd, 'MaskDataset/test')
image_filenames = next(os.walk(test_dir))[2]

results = {}
for image_name in image_filenames:

   img = Image.open('.../test/img_name').convert('RGB')
   img_array = np.array(img)
   img_array = np.expand_dims(img_array, 0) 
   #....
   data_normalization...
   #....
   prediction = argmax(softmax)   # predicted class
   #.....
   results[image_name] = prediction

create_csv(results)


In [None]:
# Get the activations (the output of each ReLU layer)
layer_outputs = [layer.output for layer in model.layers if isinstance(layer, tf.keras.layers.ReLU)]
print(layer_outputs)
# We can do it by creating a new model (activation_model) with model.input as input 
# and all the ReLU activations as output
activation_model = tf.keras.Model(inputs=model.input, outputs=layer_outputs)
# Finally we get the output values given the imput test image
activations = activation_model.predict(tf.expand_dims(test_img, 0))

In [None]:
# Use this instead of the previous cell if you have implemented the custom classes

# Get the activations (the output of each ReLU layer in the feature_extractor)
activations = []
x = tf.expand_dims(test_img, 0)
for conv_block in model.feature_extractor.layers:
  for layer in conv_block.layers:
    x = layer(x)
    if isinstance(layer, tf.keras.layers.ReLU):
      activations.append(x)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
def display_activation(activations, act_index): 
    # activations: list of all the activations
    # act_index: the layer we want to visualize (an int in [0, network depth) )
    activation = activations[act_index]
    activation = tf.image.resize(activation, size=[128, 128])
    col_size = activations[0].shape[-1]
    row_size = 1 + act_index
    activation_index=0
    fig, ax = plt.subplots(row_size, col_size, figsize=(8*2.5, 8*1.5), squeeze=False)
    for row in range(0,row_size):
        for col in range(0,col_size):
            ax[row][col].imshow(activation[0, :, :, activation_index], cmap='gray')
            activation_index += 1

In [None]:
display_activation(activations, 1)

In [None]:
# Print Confusion Matrix and Classification Report (Precision, Recall, and F1-score)
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

Y_prediction = model.predict_generator(valid_gen, len(valid_gen))
# Convert predictions classes to one hot vectors 
Y_pred_classes = np.argmax(Y_prediction,axis = 1) 
# Convert validation observations to one hot vectors
Y_true = valid_gen.classes
# compute the confusion matrix
confusion_mtx = confusion_matrix(Y_true, Y_pred_classes)
class_report = classification_report(Y_true, Y_pred_classes, 
                                     target_names=valid_gen.class_indices.keys())  # target_names must be ordered depending on the class labels
print('Confusion Matrix:')
print(confusion_mtx)
print()
print('Classification Report:')
print(class_report)