# Part 2: Image Processing & Transfer learning
This notebook contains all the models for basic modelling, investigating loss functions, activation functions, skip connection, autoencoding models and transfer learning.

## Data Import

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Add, Concatenate, UpSampling2D, Dropout
from keras.models import Model, Sequential, load_model
from keras.callbacks import EarlyStopping
from keras.losses import SparseCategoricalCrossentropy
from keras.utils import to_categorical

In [None]:
# load Block 1 data
x_block1 = np.load("p2_data/block1/train/x_train.npy")
y_block1 = np.load("p2_data/block1/train/y_train.npy")
x_val_block1 = np.load("p2_data/block1/val/x_val.npy")
y_val_block1 = np.load("p2_data/block1/val/y_val.npy")

# normalize pixel values to 0-1
x_train_block1 = x_block1.astype('float32') / 255
x_val_block1 = x_val_block1.astype('float32') / 255

## Global
Variables and functions to be used all throughout this notebook

In [None]:
# function for base cnn model
def create_model(activation_func):
    model = Sequential([
        Conv2D(16, kernel_size=(3,3), activation=activation_func, input_shape=(32,32,3)),
        MaxPooling2D((2,2)),
        Conv2D(32, kernel_size=(3,3), activation=activation_func),
        MaxPooling2D((2,2)),
        Conv2D(64, kernel_size=(3,3), activation=activation_func),
        Flatten(),
        Dense(512, activation=activation_func),
        Dropout(0.5),
        Dense(256, activation=activation_func),
        Dropout(0.5),
        Dense(128, activation=activation_func),
        Dropout(0.5),
        Dense(50, activation="softmax") 
    ])
    return model

# define early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# function to plot accuracy & loss trend during training
def training_results(result):
    fig, axs = plt.subplots(1, 2, figsize=(12, 6))

    # plot accuracy
    axs[0].plot(result.history['accuracy'], label='accuracy')
    axs[0].plot(result.history['val_accuracy'], label='val_accuracy')
    axs[0].set_xlabel('Epoch')
    axs[0].set_ylabel('Accuracy')
    axs[0].set_ylim([0.0, 1.0])
    axs[0].legend(loc='lower right')

    # plot loss
    axs[1].plot(result.history['loss'], label='loss')
    axs[1].plot(result.history['val_loss'], label='val_loss')
    axs[1].set_xlabel('Epoch')
    axs[1].set_ylabel('Loss')
    axs[1].set_ylim([0.0, 5.0])
    axs[1].legend(loc='upper right')

    plt.show()


## Basic Modelling - Block 1
Classify images using CNN with 5 hidden layers, investigate use of activation functions, different losses & skip connections

### Base CNN model

In [None]:
# create base model
activ_func = "relu"
base_model = create_model(activ_func)

# compile & fit model
base_model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
result = base_model.fit(x_train_block1, y_block1, epochs=10, batch_size=47, validation_split=0.1, callbacks=[early_stopping])

# evaluate model
test_loss, test_acc = base_model.evaluate(x_val_block1, y_val_block1)
print(test_acc)

# plot results
training_results(result)
base_model.save("models_p2/base.keras", save_format='tf')

### Investigating activation functions
Applying different activation functions to the hidden layers of the base cnn model, and to the output layer of the model

In [None]:
# Model 1 - No activation function hidden
activ_func1 = "None"
model1 = create_model(activ_func)

model1.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
result1 = model1.fit(x_train_block1, y_block1, epochs=10, batch_size=47, validation_split=0.1, callbacks=[early_stopping])

test_loss1, test_acc1 = model1.evaluate(x_val_block1, y_val_block1, verbose=2)
print(test_acc1)
training_results(result1)


In [None]:
# Model 2 - tanh hidden
activ_func2 = "tanh"
model2 = create_model(activ_func)

model2.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
result2 = model2.fit(x_train_block1, y_block1, epochs=10, batch_size=47, validation_split=0.1, callbacks=[early_stopping])

test_loss2, test_acc2 = model2.evaluate(x_val_block1, y_val_block1, verbose=2)
print(test_acc2)
training_results(result2)


In [None]:
# Model 3 - sigmoid hidden
activ_func3 = "sigmoid"
model3 = create_model(activ_func)

model3.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
result3 = model3.fit(x_train_block1, y_block1, epochs=10, batch_size=47, validation_split=0.1, callbacks=[early_stopping])

test_loss3, test_acc3 = model3.evaluate(x_val_block1, y_val_block1, verbose=2)
print(test_acc3)
training_results(result3)

In [None]:
# Model 4 - sigmoid activation output
model4 = Sequential([
        Conv2D(16, kernel_size=(3,3), activation="relu", input_shape=(32,32,3)),
        MaxPooling2D((2,2)),
        Conv2D(32, kernel_size=(3,3), activation="relu"),
        MaxPooling2D((2,2)),
        Conv2D(64, kernel_size=(3,3), activation="relu"),
        Flatten(),
        Dense(512, activation="relu"),
        Dropout(0.5),
        Dense(256, activation="relu"),
        Dropout(0.5),
        Dense(128, activation="relu"),
        Dropout(0.5),
        Dense(50, activation="sigmoid") 
    ])

model4.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
result4 = model4.fit(x_train_block1, y_block1, epochs=10, batch_size=47, validation_split=0.1, callbacks=[early_stopping])

test_loss4, test_acc4 = model4.evaluate(x_val_block1, y_val_block1, verbose=2)
print(test_acc4)
training_results(result4)

In [None]:
# Model 5 - no activation/raw logits output
model5 = Sequential([
        Conv2D(16, kernel_size=(3,3), activation="relu", input_shape=(32,32,3)),
        MaxPooling2D((2,2)),
        Conv2D(32, kernel_size=(3,3), activation="relu"),
        MaxPooling2D((2,2)),
        Conv2D(64, kernel_size=(3,3), activation="relu"),
        Flatten(),
        Dense(512, activation="relu"),
        Dropout(0.5),
        Dense(256, activation="relu"),
        Dropout(0.5),
        Dense(128, activation="relu"),
        Dropout(0.5),
        Dense(50) 
    ])

model5.compile(optimizer="adam", loss=SparseCategoricalCrossentropy(from_logits=True), metrics=["accuracy"])
result5 = model5.fit(x_train_block1, y_block1, epochs=10, batch_size=47, validation_split=0.1, callbacks=[early_stopping])

test_loss5, test_acc5 = model5.evaluate(x_val_block1, y_val_block1, verbose=2)
print(test_acc5)
training_results(result5)


### Investigating different loss functions
Testing base cnn model with different loss functions during model compilation

In [None]:
# one-hot encode  labels
y_block1_ohc = to_categorical(y_block1, num_classes=50)
y_val_block1_ohc = to_categorical(y_val_block1, num_classes=50)

# Model 6 - Categorical cross entropy
model6 = base_model
model6.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

result6 = model6.fit(x_train_block1, y_block1_ohc, epochs=10, batch_size=47, validation_split=0.1, callbacks=[early_stopping])

test_loss6, test_acc6 = model6.evaluate(x_val_block1, y_val_block1_ohc, verbose=2)
print(test_acc6)
training_results(result6)

### Investigating skip connections
Adding a residual block in the base cnn model

In [None]:
# function defining layers & residual block that will skip connections
def residual_block(x, filters, activation='relu'):
    shortcut = x
    x = Conv2D(filters, kernel_size=(3, 3), activation=activation, padding='same')(x)
    x = Conv2D(filters, kernel_size=(3, 3), activation=None, padding='same')(x)
    x = Add()([shortcut, x])
    return x

# function for model that will contain residual block
def model_skip_connect():
    inputs = Input(shape=(32, 32, 3))
    x = Conv2D(16, kernel_size=(3,3), activation="relu")(inputs)
    x = MaxPooling2D((2,2))(x)

    x = residual_block(x, filters=16)
    
    x = Conv2D(32, kernel_size=(3,3), activation="relu")(x)
    x = MaxPooling2D((2,2))(x)
    x = residual_block(x, filters=32)

    x = Flatten()(x)
    x = Dense(512, activation="relu")(x)
    x = Dropout(0.5)(x)
    x = Dense(256, activation="relu")(x)
    x = Dropout(0.5)(x)
    outputs = Dense(50, activation="softmax")(x)
    
    model = Model(inputs, outputs)
    return model

# create, compile & evaluate model
model7 = model_skip_connect()
model7.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

result7 = model7.fit(x_train_block1, y_block1, epochs=10, batch_size=47, validation_split=0.1, callbacks=[early_stopping])

test_loss7, test_acc7 = model7.evaluate(x_val_block1, y_val_block1, verbose=2)
print(test_acc7)
training_results(result7)

#model7.summary()
model7.save("models_p2/skip_connect.keras")

## Autoencoder modelling - Block 2
Construct autoencoder model to attempt to reproduce images from CIFAR100 dataset

In [None]:
# function to build autoencoder
def build_autoencoder(input_shape):
    encoder_input = Input(shape=input_shape)
    
    # encoder
    x = Conv2D(16, kernel_size=(3,3), activation='relu', padding="same")(encoder_input)
    x = MaxPooling2D((2, 2))(x)
    x = Conv2D(32, kernel_size=(3,3), activation='relu', padding="same")(x)
    encoded = MaxPooling2D((2, 2))(x)
    
    # decoder
    x = Conv2D(32, kernel_size=(3,3), activation='relu', padding="same")(encoded)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(16, kernel_size=(3,3), activation='relu', padding="same")(x)
    x = UpSampling2D((2, 2))(x)
    decoded = Conv2D(3, kernel_size=(3,3), activation='relu', padding="same")(x)
    
    # combine encoder & decoder
    autoencoder = Model(encoder_input, decoded)
    return autoencoder

# build autoencoder model
autoencode_model = build_autoencoder(input_shape=(32, 32, 3))
autoencode_model1 = build_autoencoder(input_shape=(32,32,3))

# compile & fit model
autoencode_model1.compile(optimizer="adam", loss='binary_crossentropy')
autoencode_model.compile(optimizer="adam", loss='mean_squared_error')

result8 = autoencode_model.fit(x_train_block1, x_train_block1, epochs=5, batch_size=47, shuffle=True, validation_split=0.1)
result9 = autoencode_model1.fit(x_train_block1, x_train_block1, epochs=5, batch_size=47, shuffle=True, validation_split=0.1)

# evaluate model
evaluation_loss = autoencode_model.evaluate(x_val_block1, x_val_block1, verbose=2)
print("Evaluation Loss:", evaluation_loss)

# evaluate model
evaluation_loss1 = autoencode_model1.evaluate(x_val_block1, x_val_block1, verbose=2)
print("Evaluation Loss:", evaluation_loss1)

# save model
autoencode_model.save("models_p2/autoencode.keras", save_format='tf')

In [None]:
# reconstruct images
def vis_reconstruct_img(model, original_img, n=3):
    reconstructed_images = model.predict(original_img)
    n = 3 
    plt.figure(figsize=(9, 4))
    for i in range(n):
        # og images
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(original_img[i])
        plt.title("Original")
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        # reconstructed images
        ax = plt.subplot(2, n, i + 1 + n)
        plt.imshow(reconstructed_images[i])
        plt.title("Reconstructed")
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    plt.show()

vis_reconstruct_img(autoencode_model, x_val_block1)
vis_reconstruct_img(autoencode_model1, x_val_block1)

## Transfer learning - Block 2
Using autoencoder model & skip connect model above, investigate use of transfer learning to build a network for Block 2 images - compare results with other models

In [None]:
# load train data - Block 2
x_train_block2 = np.load("p2_data/block2/train/x_train.npy")
y_train_block2 = np.load("p2_data/block2/train/y_train.npy")
x_val_block2 = np.load("p2_data/block2/val/x_val.npy")
y_val_block2 = np.load("p2_data/block2/val/y_val.npy")

# normalize pixel values
x_train_block2 = x_train_block2.astype('float32') / 255
x_val_block2 = x_val_block2.astype('float32') / 255

# load models
tf1_model = load_model("models_p2/skip_connect.keras")
tf2_model = load_model("models_p2/autoencode.keras")

# input layer
input_layer = Input(shape=(32, 32, 3))

# select convolutional layers of skip connect model
tf1_layers = [layer for layer in tf1_model.layers if isinstance(layer, Conv2D)]
tf1_output = input_layer
for layer in tf1_layers: 
    tf1_output = layer(tf1_output)
tf1_flattened = Flatten()(tf1_output)

# select layers in second model
for layer in tf2_model.layers: # to freeze
    layer.trainable = False
tf2_output = tf2_model(input_layer)
tf2_flattened = Flatten()(tf2_output)

# concat layers from the models
combined_output = Concatenate()([tf2_flattened, tf1_flattened])

# define transfer learning model
x = Dense(256, activation="relu")(combined_output)
x = Dropout(0.2)(x)
outputs2 = Dense(50, activation="softmax")(x)

# create transfer model
transfer_model = Model(input_layer, outputs2)

# compile & fit transfer model
transfer_model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

result9 = transfer_model.fit(x_train_block2, y_train_block2, epochs=5, validation_split=0.1,batch_size=35)

# evaluate model
transfer_loss, transfer_accuracy = transfer_model.evaluate(x_val_block2, y_val_block2)

print("Test Loss:", transfer_loss)
print("Test Accuracy:", transfer_accuracy)

training_results(result9)