<a href="https://colab.research.google.com/github/chayaphon/DADS7202_DL/blob/main/CNN_Sushi/ResNet50_FineTune.ipynb\" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# List all NVIDIA GPUs as avaialble.
!nvidia-smi -L

In [25]:
import sys
import numpy as np
import sklearn
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from IPython.display import Image, display
import matplotlib.cm as cmp
import matplotlib.pyplot as plt
import seaborn as sns


In [None]:
print( f"Python {sys.version}\n" )
print( f"NumPy {np.__version__}\n" )
%matplotlib inline
print(f'The scikit-learn version is {sklearn.__version__}')
print( f"TensorFlow {tf.__version__}" )
print( f"tf.keras.backend.image_data_format() = {tf.keras.backend.image_data_format()}" )
gpus = tf.config.list_physical_devices('GPU')
print( f"TensorFlow detected { len(gpus) } GPU(s):" )
for i, gpu in enumerate(gpus):
  print( f".... GPU No. {i}: Name = {gpu.name} , Type = {gpu.device_type}" )

# Set Seed

In [None]:
seed_value = 123456 #np.random.randint(1000, 99999)
print(seed_value)

tf.random.set_seed(seed_value)
np.random.seed(seed_value)

# Prep Data

In [None]:
x1 = np.load('./Images Numpy/x_sushi_aburi_224x224.npz')
x2 = np.load('./Images Numpy/x_sushi_ebi_224x224.npz')
x3 = np.load('./Images Numpy/x_sushi_maguro_224x224.npz')
x4 = np.load('./Images Numpy/x_sushi_salmon_224x224.npz')

x1 = x1['x']
x2 = x2['x']
x3 = x3['x']
x4 = x4['x']

y1 = np.array([0 for i in range(172)])
y2 = np.array([1 for i in range(172)])
y3 = np.array([2 for i in range(172)])
y4 = np.array([3 for i in range(172)])

print(f'x1 : {x1.shape} | y1: {y1.shape}')
print(f'x2 : {x2.shape} | y2: {y2.shape}')
print(f'x3 : {x3.shape} | y3: {y3.shape}')
print(f'x4 : {x4.shape} | y4: {y4.shape}')

In [None]:
x = np.concatenate((x1, x2, x3, x4))
y = np.concatenate((y1, y2, y3, y4))
print(x.shape)
print(y.shape)

# Train Test Split

In [None]:
test_size = 0.2
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size, random_state=np.random.randint(1,100), stratify=y)

print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

# Visualize the first 10 image of x_train and x_test

In [None]:
label = ['sushi_aburi','sushi_ebi','sushi_maguro','sushi_salmon']

plt.figure(figsize = (15,5))
plt.suptitle('Training Set', y=1.02, fontsize=16, weight='bold')
for i in range(10):
  plt.subplot(2, 5, i+1).set_title(f'class {y_train[i]} : ({label[y_train[i]]})')
  plt.imshow(x_train[i])
  
plt.tight_layout() 
plt.show()

In [None]:
plt.figure(figsize = (15,5))
plt.suptitle('Test Set', y=1.02, fontsize=16, weight='bold')
for i in range(10):
  plt.subplot(2, 5, i+1).set_title(f'class {y_test[i]} : ({label[y_test[i]]})')
  plt.imshow(x_test[i])
  
plt.tight_layout()
plt.show()

# Select Pre-Trained Model ResNet50

In [None]:
x_extractor = tf.keras.applications.ResNet50(weights = 'imagenet', include_top=False, input_shape = (224, 224, 3))
x_extractor.summary()

In [None]:
file_diagram_model = './out/ResNet50_Model.png'
tf.keras.utils.plot_model(
    x_extractor, 
    to_file=file_diagram_model, 
    show_shapes=True, 
    show_dtype=False, 
    show_layer_names=True, 
    dpi=90,
    rankdir='TB', 
    expand_nested=True 
)

# Processing 

In [None]:
x_train_resnet = tf.keras.applications.resnet50.preprocess_input(x_train)
x_test_resnet = tf.keras.applications.resnet50.preprocess_input(x_test)

print(x_train_resnet.shape)
print(x_test_resnet.shape)

# Data Augmentation

In [None]:
def custom_augmentation(image):
    # Random crop
    #image = tf.image.random_crop(image, size=(100, 100, 3))  # Adjust size as needed
    # Add noise
    noise = tf.random.normal(shape=tf.shape(image), mean=0.0, stddev=0.1, dtype=tf.float32)
    image = image + noise
    # Adjust contrast
    image = tf.image.random_contrast(image, lower=0.8, upper=1.2)
    return image

train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    preprocessing_function=custom_augmentation,  # Augmentation
    channel_shift_range=0.1,
    samplewise_center=False,  # ResNet50 preprocessing already centers based on ImageNet mean
    samplewise_std_normalization=True,  
    rotation_range=30,
    height_shift_range=5.0,
    width_shift_range=4.0,
    shear_range=2.0,
    zoom_range=0.2,
    horizontal_flip=True,
    vertical_flip=True,
    validation_split=0.2
)

test_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    samplewise_center=False,           # ResNet50 preprocessing already centers based on ImageNet mean
    samplewise_std_normalization=True  # Normalize each image by its own std deviation
)

# define seed to master seed
train_datagen.seed = seed_value
test_datagen.seed = seed_value

train_datagen.fit(x_train_resnet)
test_datagen.fit(x_test_resnet)

In [None]:
train_generator = train_datagen.flow(x_train, y_train, batch_size=5, subset='training')
n_show = 1   # Show 'n_show' batches of generated data (1 batch includes 5 images)
for i, (x_batch, y_batch) in enumerate(train_generator):
    print(f"===== Train batch no. {i+1}/{n_show} =====")
    plt.figure(figsize=(15, 5))
    for j in range(5):
        plt.subplot(1, 5, j+1).set_title(y_batch[j])
        plt.imshow(x_batch[j])
        plt.axis("off")  # remove all tick marks
    plt.show()

    if i+1 >= n_show:
        break

In [None]:
validation_generator = train_datagen.flow(x_train, y_train, batch_size=5, subset='validation')
n_show = 1   # Show 'n_show' batches of generated data (1 batch includes 5 images)
for i, (x_batch, y_batch) in enumerate(validation_generator):
    print(f"===== Validation batch no. {i+1}/{n_show} =====")
    plt.figure(figsize=(15, 5))
    for j in range(5):
        plt.subplot(1, 5, j+1).set_title(y_batch[j])
        plt.imshow(x_batch[j])
        plt.axis("off")  # remove all tick marks
    plt.show()

    if i+1 >= n_show:
        break

In [None]:
test_generator = test_datagen.flow(x_test, y_test, batch_size=5)
n_show = 1   # Show 'n_show' batches of generated data (1 batch includes 5 images)
for i, (x_batch, y_batch) in enumerate(test_generator):
    print(f"===== Test batch no. {i+1}/{n_show} =====")
    plt.figure(figsize=(15, 5))
    for j in range(5):
        plt.subplot(1, 5, j+1).set_title(y_batch[j])
        plt.imshow(x_batch[j])
        plt.axis("off")  # remove all tick marks
    plt.show()

    if i+1 >= n_show:
        break

# Recursively freeze all layers in the model

In [None]:
x_extractor.trainable = True
for i, layer in enumerate(x_extractor.layers):
  print(f'Layer {i}: Name = {layer.name}, Trainable = {layer.trainable}')

# Un-freez some layer

In [41]:
# layer_list = ['conv5_block1_out', 'conv5_block2_', 'conv5_block3_']

# for i, layer in enumerate(x_extractor.layers):
#     if any(part in layer.name for part in layer_list):
#         layer.trainable = True

# for i, layer in enumerate(x_extractor.layers):
#   print(f'Layer {i}: Name = {layer.name}, Trainable = {layer.trainable}')

# Train Model

In [None]:
# Output from 'conv2_block3_out'
conv2_block3_out = x_extractor.get_layer('conv2_block3_out').output

# Custom convolutional layers
custom = tf.keras.layers.Conv2D(filters=256, kernel_size=7, padding='same', name='custom1_conv1')(conv2_block3_out)
custom = tf.keras.layers.BatchNormalization(name='custom1_bn1')(custom)
custom = tf.keras.layers.Activation('relu', name='custom1_relu1')(custom) 

custom = tf.keras.layers.Conv2D(filters=128, kernel_size=7, padding='same', name='custom1_conv2')(custom)
custom = tf.keras.layers.BatchNormalization(name='custom1_bn2')(custom)
custom = tf.keras.layers.Activation('relu', name='custom1_relu2')(custom)

custom = tf.keras.layers.Conv2D(filters=128, kernel_size=7, padding='same', name='custom1_conv3')(custom)
custom = tf.keras.layers.BatchNormalization(name='custom1_bn3')(custom)
custom = tf.keras.layers.Activation('relu', name='custom1_relu3')(custom)

custom = tf.keras.layers.MaxPooling2D(pool_size=2, name='custom1_max_pool')(custom)

custom = tf.keras.layers.Conv2D(filters=128, kernel_size=5, padding='same', name='custom2_conv1')(custom)
custom = tf.keras.layers.BatchNormalization(name='custom2_bn1')(custom)
custom = tf.keras.layers.Activation('relu', name='custom2_relu1')(custom) 

custom = tf.keras.layers.Conv2D(filters=128, kernel_size=5, padding='same', name='custom2_conv2')(custom)
custom = tf.keras.layers.BatchNormalization(name='custom2_bn2')(custom)
custom = tf.keras.layers.Activation('relu', name='custom2_relu2')(custom)

custom = tf.keras.layers.MaxPooling2D(pool_size=2, name='custom2_max_pool')(custom)

custom = tf.keras.layers.Conv2D(filters=128, kernel_size=3, padding='same', name='custom3_conv1')(custom)
custom = tf.keras.layers.BatchNormalization(name='custom3_bn1')(custom)
custom = tf.keras.layers.Activation('relu', name='custom3_relu1')(custom)

custom = tf.keras.layers.Conv2D(filters=256, kernel_size=3, padding='same', name='custom3_conv2')(custom)
custom = tf.keras.layers.BatchNormalization(name='custom3_bn2')(custom)
custom = tf.keras.layers.Activation('relu', name='custom3_relu2')(custom)

custom = tf.keras.layers.Conv2D(filters=1024, kernel_size=3, padding='same', name='custom3_conv3')(custom)
custom = tf.keras.layers.BatchNormalization(name='custom3_bn3')(custom)
custom = tf.keras.layers.Activation('relu', name='custom3_relu3')(custom)

custom = tf.keras.layers.MaxPooling2D(pool_size=2, name='custom3_max_pool')(custom)

# Final adjustment to match the number of channels with ResNet
custom = tf.keras.layers.Conv2D(filters=2048, kernel_size=1, padding='same', name='adjust_channels')(custom)
custom = tf.keras.layers.BatchNormalization(name='adjust_bn')(custom)
custom = tf.keras.layers.Activation('relu', name='adjust_relu')(custom) 

# Output of the 'conv4_block6_add' layer from the base model
conv5_block1_add = x_extractor.get_layer('conv5_block1_add').output

# Concatenate the custom convolutional path and the original ResNet path
#combined = tf.keras.layers.Concatenate(name='concat_layer')([conv5_block1_add, custom])
# Addition (if the channels match)
combined = tf.keras.layers.Add(name='add_layer')([conv5_block1_add, custom])

# Continue with ResNet50 layers from 'conv5_block1_out' onwards
conv5_block1_out = x_extractor.get_layer('conv5_block1_out')(combined)
conv5_block2_1_conv = x_extractor.get_layer('conv5_block2_1_conv')(conv5_block1_out)
conv5_block2_1_bn = x_extractor.get_layer('conv5_block2_1_bn')(conv5_block2_1_conv)
conv5_block2_1_relu = x_extractor.get_layer('conv5_block2_1_relu')(conv5_block2_1_bn)
conv5_block2_2_conv = x_extractor.get_layer('conv5_block2_2_conv')(conv5_block2_1_relu)
conv5_block2_2_bn = x_extractor.get_layer('conv5_block2_2_bn')(conv5_block2_2_conv)
conv5_block2_2_relu = x_extractor.get_layer('conv5_block2_2_relu')(conv5_block2_2_bn)
conv5_block2_3_conv = x_extractor.get_layer('conv5_block2_3_conv')(conv5_block2_2_relu)
conv5_block2_3_bn = x_extractor.get_layer('conv5_block2_3_bn')(conv5_block2_3_conv)
conv5_block2_add = x_extractor.get_layer('conv5_block2_add')([conv5_block1_out, conv5_block2_3_bn])
conv5_block2_out = x_extractor.get_layer('conv5_block2_out')(conv5_block2_add)

conv5_block3_1_conv = x_extractor.get_layer('conv5_block3_1_conv')(conv5_block2_out)
conv5_block3_1_bn = x_extractor.get_layer('conv5_block3_1_bn')(conv5_block3_1_conv)
conv5_block3_1_relu = x_extractor.get_layer('conv5_block3_1_relu')(conv5_block3_1_bn)
conv5_block3_2_conv = x_extractor.get_layer('conv5_block3_2_conv')(conv5_block3_1_relu)
conv5_block3_2_bn = x_extractor.get_layer('conv5_block3_2_bn')(conv5_block3_2_conv)
conv5_block3_2_relu = x_extractor.get_layer('conv5_block3_2_relu')(conv5_block3_2_bn)
conv5_block3_3_conv = x_extractor.get_layer('conv5_block3_3_conv')(conv5_block3_2_relu)
conv5_block3_3_bn = x_extractor.get_layer('conv5_block3_3_bn')(conv5_block3_3_conv)
conv5_block3_add = x_extractor.get_layer('conv5_block3_add')([conv5_block2_out, conv5_block3_3_bn])
conv5_block3_out = x_extractor.get_layer('conv5_block3_out')(conv5_block3_add)

x = tf.keras.layers.GlobalAveragePooling2D(name='global_avg_pool')(conv5_block3_out)

# Fully connected classifier layers
x = tf.keras.layers.Flatten(name='flatten')(x)
x = tf.keras.layers.Dense(1024, activation='relu', name='fc1')(x)
x = tf.keras.layers.Dropout(0.5, name='dropout1')(x)
x = tf.keras.layers.Dense(512, activation='relu', name='fc2')(x)
x = tf.keras.layers.Dropout(0.5, name='dropout2')(x)

# Output layer with 4 classes (multi-class classification)
new_outputs = tf.keras.layers.Dense(4, activation='softmax', name='output_layer')(x)

# Construct the modified model
model = tf.keras.models.Model(inputs=x_extractor.inputs, outputs=new_outputs)

# Show the model summary
model.summary()

In [None]:
for i, layer in enumerate(model.layers):
  print(f'Layer {i}: Name = {layer.name}, Trainable = {layer.trainable}')

In [44]:
model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001) , 
                loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                metrics=['acc'] 
            )

In [None]:
tf.keras.utils.plot_model(
    model, 
    to_file=file_diagram_model, 
    show_shapes=True, 
    show_dtype=False, 
    show_layer_names=True, 
    dpi=90,
    rankdir='TB',
    expand_nested=True 
)

# Fitting Model

In [None]:
t_batch_size = 64
v_batch_size = 128

# Define early stopping callback
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',    # monitor 'val_loss', 'val_acc'
    patience=10,           # Number of epochs with no improvement after which training will be stopped
    restore_best_weights=True  # Restore model weights from the epoch with the best validation performance
)

# Fit the model with early stopping
history = model.fit(
    train_datagen.flow(x_train_resnet, y_train, batch_size=t_batch_size, subset='training'),
    epochs=50,
    verbose=2,
    validation_data=train_datagen.flow(x_train_resnet, y_train, batch_size=v_batch_size, subset='validation'),
    callbacks=[early_stopping]  # Include early stopping callback
)

In [None]:
results = model.evaluate(test_datagen.flow(x_test_resnet, y_test, batch_size = 64))
print(f'{model.metrics_names}: {results}')

# Save Model

In [48]:
# # save model
# model.save('./save/ResNet50_Base_model.h5')

# # save weight
# model.save_weights('./save/ResNet50_Base_weight.h5')

# Summarize history of accuracy

In [None]:
plt.figure(figsize = (15,5))
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('Train accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc = 'upper left')
plt.grid()
plt.show()

# Summarize history for loss

In [None]:
plt.figure(figsize = (15,5))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Train loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc = 'upper right')
plt.grid()
plt.show()

# Evaluate Model

In [None]:
# Get predictions for x_test_vgg16
y_pred = []
for i in range(len(x_test_resnet)):
    x_input = x_test_resnet[i][np.newaxis, ...]
    y_pred.append(np.argmax(model.predict(x_input)))

# Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
ax = sns.heatmap(cm, annot=True, fmt='g', xticklabels=label, yticklabels=label, linewidths=.5)
ax.set(xlabel='Predicted', ylabel='Actual')
plt.show()

# Classification Report
report = classification_report(y_test, y_pred, target_names=label)
print(report)

# Test Prediction

In [None]:
for i in range(5):
  y_pred = model.predict(x_test_resnet[i].reshape(1,224,224,3))
  plt.imshow(x_test[i])
  plt.title(f'x_test[{i}]: predict = {np.argmax(y_pred)} ({label[np.argmax(y_pred)]}) , actual = {y_test[i]} ({label[int(y_test[i])]})')
  plt.show()

# Grad-CAM

In [53]:
img_size = (224, 224)
preprocess_input = tf.keras.applications.resnet.preprocess_input
last_conv_layer_name = "conv5_block3_out"

In [None]:
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    grad_model = tf.keras.models.Model([model.inputs], [model.get_layer(last_conv_layer_name).output, model.output])
    with tf.GradientTape() as tape:
          last_conv_layer_output, preds = grad_model(img_array)
          if pred_index is None:
              pred_index = tf.argmax(preds[0])
          class_channel = preds[:, pred_index]
    grads = tape.gradient(class_channel, last_conv_layer_output)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()
    
def save_and_display_gradcam(img_path, heatmap, cam_path="./out/cam.jpg", alpha=0.8):
      img = img_path

      # Rescale heatmap to a range 0-255
      heatmap = np.uint8(255 * heatmap)

      # Use jet colormap to colorize heatmap
      jet = cmp.get_cmap("jet")

      # Use RGB values of the colormap
      jet_colors = jet(np.arange(256))[:, :3]
      jet_heatmap = jet_colors[heatmap]

      # Create an image with RGB colorized heatmap
      jet_heatmap = tf.keras.preprocessing.image.array_to_img(jet_heatmap)
      jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
      jet_heatmap = tf.keras.preprocessing.image.img_to_array(jet_heatmap)

      # Superimpose the heatmap on original image
      superimposed_img = jet_heatmap * alpha + img
      superimposed_img = tf.keras.preprocessing.image.array_to_img(superimposed_img)

      # Save the superimposed image
      superimposed_img.save(cam_path)

      # Display Grad CAM
      display(Image(cam_path))

for i in range(5):
  c = np.random.randint(0,len(x_test)-1)
  img_path = x_test[c]
  img_array = preprocess_input(img_path)
  model.layers[-1].activation = None
  preds = model.predict(img_array.reshape(-1,224,224,3))
  print(f"Predicted: {label[np.argmax(preds)]} | Actual: {label[y_test[c]]} ")
  heatmap = make_gradcam_heatmap(img_array.reshape(1,224,224,3), model, last_conv_layer_name)
  save_and_display_gradcam(img_path, heatmap)