# Import dependencies.

In [None]:
import warnings

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from keras.datasets import mnist
from keras.utils import to_categorical
from keras.applications.densenet import DenseNet121
from keras.layers import Dense, GlobalAveragePooling2D
from keras.models import Model
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, TerminateOnNaN, ProgbarLogger, TensorBoard, LearningRateScheduler

# For runtime estimation
import time

from datetime import datetime
from skimage.transform import resize

Mute warnings

In [None]:
# # Suppress specific warning message
# warnings.filterwarnings(
#     action='ignore',
#     message='The name*',
# )
# 
# warnings.filterwarnings(
#     action='ignore',
#     message='tensorflow*',
# )
# 
# warnings.filterwarnings(
#     action='ignore',
#     message='WARNING*',
# )
# 
# warnings.filterwarnings("ignore")
# disable_interactive_logging();

In [None]:
# Set random seed for reproducibility
np.random.seed(42)

# Getting data, observations
## Get dataset

In [None]:
# Load MNIST dataset
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

Show original images

In [None]:
# Sample 25 mnist digits from train dataset
indexes = np.random.randint(0, train_images.shape[0], size=25)
images = train_images[indexes]
labels = train_labels[indexes]

# Plot the 25 mnist digits
plt.figure(figsize=(5,5))

for i in range(len(indexes)):
    plt.subplot(5, 5, i + 1)
    image = images[i]
    plt.imshow(image, cmap='gray')
    plt.axis('off')
    
plt.show()
plt.savefig("mnist-samples.png")
plt.close('all')

## Preprocessing
Reshape data

In [None]:
# Resize images to match the required input size of DenseNet (32x32)
train_images_resized = np.array([resize(img, (32, 32)) for img in train_images])
test_images_resized = np.array([resize(img, (32, 32)) for img in test_images])

Resize image

In [None]:
# Normalize pixel values to be between 0 and 1
train_images_resized = train_images_resized.astype('float32') / 255
test_images_resized = test_images_resized.astype('float32') / 255

Replicate channels

In [None]:
# Replicate single channel across three channels to match DenseNet input shape
train_images_resized = np.repeat(train_images_resized[..., np.newaxis], 3, axis=-1)
test_images_resized = np.repeat(test_images_resized[..., np.newaxis], 3, axis=-1)

Make labels

In [None]:
# One-hot encode labels
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)

#  Transfer learning
## Get and modify pre-trained model

Get pre-trained keras model

In [None]:
# Load pre-trained DenseNet model without the top layer
# base_model = DenseNet121(
base_model = DenseNet121(
    weights='imagenet',
    include_top=False,
    input_shape=(32, 32, 3),
)
# DenseNet stands for “Densely Connected Convolutional Networks,” and it is so named because it connects every layer to every other layer in a feedforward fashion.

Show model.

In [None]:
# Get model summary
base_model.summary()

Show model blocks.

In [None]:
# Get model blocks
[_.name for _ in base_model.layers]

In [None]:
print(f'Model has: {len(base_model.layers)} layers')

Add custom classification layer.

In [None]:
# Get output tensor of the last layer in the base model
output_tensor = base_model.output

output_tensor

Add layer to the end of model

In [None]:
# Add global average pooling operation for spatial data
output_tensor = GlobalAveragePooling2D()(output_tensor)

#  Add a fully connected (dense) layer on top of the previous x tensor
output_tensor = Dense(
    units=256,
    activation='relu'
)(output_tensor)
# ReLU activation introduces non-linearity by mapping negative values to zero and leaving positive values unchanged.

# Add last layer with 10 units, which represents the number of output classes in a classification task
predictions = Dense(
    units=10,
    activation='softmax'
)(output_tensor)
#Softmax normalizes the output values across the units to represent class probabilities, ensuring that the sum of probabilities across all classes equals 1.0.

Combine model.

In [None]:
# Combine the base model with custom layers
model = Model(
    inputs=base_model.input,
    outputs=predictions
)

# Freeze layers in the base model
for layer in base_model.layers:
    layer.trainable = False

Compile model

In [None]:
# Compile the model
model.compile(
    optimizer=Adam(
        learning_rate=0.01
    ),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

Add callbacks.

In [None]:
# Get time log
logs = "logs/" + datetime.now().strftime("%Y%m%d-%H%M%S")

# Get TensorBoard 
tboard_callback = TensorBoard(
    log_dir = logs,
    histogram_freq = 1,
    profile_batch = '500,520'
)

# Define early stopping callback
early_stopping = EarlyStopping(
    monitor='val_loss',  # Monitor validation loss
    patience=2,          # Number of epochs with no improvement after which training will be stopped
    restore_best_weights=False  # Restore weights from the epoch with the best validation loss
)
# Define terminate if Nan result appeared
terminate_on_nan = TerminateOnNaN()

# Define progress bar with metrics
progbar_logger = ProgbarLogger(
    count_mode="samples",
    stateful_metrics=['acc'],
)

# Define a learning rate scheduler function
def lr_scheduler(epoch, lr):
    decay_rate = 0.01
    decay_step = 1
    if epoch % decay_step == 0 and epoch:
        return lr * decay_rate
    return lr

# Define a learning rate scheduler callback
lr_scheduler_callback = LearningRateScheduler(lr_scheduler)

Train model

In [None]:
start_time = time.time()

# Train the model
history = model.fit(
    train_images_resized,
    train_labels,
    epochs=5,
    batch_size=32, # Hyperparameter to reduce tim but loose accuracy
    validation_data=(test_images_resized, test_labels),
    callbacks = [
        early_stopping,
        terminate_on_nan,
        progbar_logger,
        lr_scheduler_callback,
    ]
)


Get runtime.

In [None]:
print("--- %s seconds ---" % (time.time() - start_time))

Load board.

In [None]:
# Load the TensorBoard notebook extension.
%load_ext tensorboard

# Launch TensorBoard and navigate to the Profile tab to view performance profile
%tensorboard --logdir=logs --port=6012

# !!! Got message like 'Reusing TensorBoard on port XXXX' - change port !!!

# Get graph
## We can get loss as a trend.

In [None]:
# Plot train loss
sns.lineplot(
    x=range(1, len(history.history['loss']) + 1),
    y=history.history['loss'],
    label='Train',
)

# Plot validation loss
sns.lineplot(
    x=range(1, len(history.history['val_loss']) + 1),
    y=history.history['val_loss'],
    label='Test',
)

plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(loc='upper right')

# Evaluate the model
test_loss, test_acc = model.evaluate(test_images_resized, test_labels)
print(f'Test accuracy: {test_acc: .2f}')

When the training loss is higher than the test loss in machine learning, it typically indicates that the model is overfitting the training data.