In [None]:
"""
zalando_transfer_20230604.py
David Nilsson - Prime Fitness Studio AB
2023-06-01
"""

In [None]:
# Importing libraries  
import tensorflow as tf
import tensorboard as tb
import subprocess
from tensorflow.keras.callbacks import TensorBoard
import tensorflow_datasets as tfds
import tensorflow.keras.utils
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.utils  import to_categorical
import keras_tuner as kt
import tensorflow.keras as keras
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D
# from tensorboard.plugins.profile import profiler_v2 as profiler # Does not work locally neither in Colab
import matplotlib.pyplot as plt
import numpy as np
import os
from datetime import datetime
from packaging import version
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
"""
To easier optimize the hyperparameters the function build_model() could be used.

"""
# Defining a Keras model to search optimized hyper parameters
def build_model(hp):
    model = keras.Sequential()
    model.add(layers.Flatten())
    # Tune the number of layers.
    for i in range(hp.Int("num_layers", 1, 3)):
        model.add(
            layers.Dense(
                # Tune number of units separately.
                units=hp.Int(f"units_{i}", min_value=32, max_value=512, step=32),
                activation=hp.Choice("activation", ["relu", "tanh"]),
            )
        )
    if hp.Boolean("dropout"):
        model.add(layers.Dropout(rate=0.25))
    model.add(layers.Dense(10, activation="softmax"))
    learning_rate = hp.Float("lr", min_value=1e-4, max_value=1e-2, sampling="log")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model

In [None]:
# Creating a folder to store the logs
log_dir = "logs/fit/" + datetime.now().strftime("%Y%m%d-%H%M%S")

In [None]:
# Creating a callback for the logs
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

Evaluating at: http://localhost:6006 by command: tensorboard --logdir logs/fit

In [None]:
print("tf.__version__: ", tf.__version__)
print("tf.__version__: ", tb.__version__)

In [None]:
"""
Does not work locally neither in Colab
"""
# Starting the profiling session to scan for memory losses
#tf.profiler.experimental.start('./PythonEnv')

In [None]:
# Load the TensorBoard notebook extension.
%load_ext tensorboard

In [None]:
# Launch TensorBoard and navigate to the Profile tab to view performance profile
%tensorboard --logdir=logs

subprocess.call("tensorboard --logdir logs", shell=True)

In [None]:
# Loading the dataset to use
# tfds.disable_progress_bar()
"""
train_images, val_images, test_images = tfds.load(
    "fashion_mnist",
    # Reserve 10% for validation and 10% for test
    split=["train[:40%]", "train[40%:50%]", "train[50%:60%]"],
    as_supervised=True,  # Include labels
)
"""

In [None]:
# Get Fashion-MNIST training and test data from Keras database (https://keras.io/datasets/)
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()

In [None]:
# Print som basic information of data set sizes and data sizes
train_no,x,y = train_images.shape
print('No training images:',train_no, ' with image size:',x,'x',y)
label_no = len(train_labels)
if (label_no != train_no) : 
  print('# labels do not match # training images')

In [None]:
test_no,x,y = test_images.shape
label_no = len(test_labels)
print('No test images:',test_no)
if (label_no != test_no) : 
  print('# labels do not match # test images')

In [None]:
"""
val_no,x,y = val_images.shape
label_no = len(val_labels)
print('No val images:',val_no)
if (label_no != val_no) : 
  print('# labels do not match # val images')
"""

In [None]:
classes = np.unique(train_labels)
num_classes = len(classes)
print('Training labels:', np.unique(train_labels), "; That is,", num_classes,"classes." )

In [None]:
# Transforming the grayscale images to RGB images with 3 channels
#train_ds = train_images.map(lambda x, y: (tf.repeat(x, 3, axis=-1), y))
train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_ds = train_ds.map(lambda x, y: (tf.repeat(x, 3, axis=-1), y))

In [None]:
#validation_ds = val_images.map(lambda x, y: (tf.repeat(x, 3, axis=-1), y))
#test_ds = test_images.map(lambda x, y: (tf.repeat(x, 3, axis=-1), y))
test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_ds = test_ds.map(lambda x, y: (tf.repeat(x, 3, axis=-1), y))

In [None]:
print("Number of training samples: %d" % tf.data.experimental.cardinality(train_ds))
#print("Number of validation samples: %d" % tf.data.experimental.cardinality(validation_ds))
print("Number of test samples: %d" % tf.data.experimental.cardinality(test_ds))

In [None]:
# Printing out the first 9 pictures of the dataset
plt.figure(figsize=(10, 10))
for i, (image, label) in enumerate(train_ds.take(9)):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(image)
    plt.title(int(label))
    plt.axis("off")

In [None]:
# Transforming the grayscale images to RGB images with 3 channels
train_images = tf.repeat(train_images[..., tf.newaxis], 3, -1)
test_images = tf.repeat(test_images[..., tf.newaxis], 3, -1)

In [None]:
# Creating train and test datasets
train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))

In [None]:
# Standardizing the loaded dataset, originally 150*150
size = (32, 32)

In [None]:
train_ds = train_ds.map(lambda x, y: (tf.image.resize(x, size), y))
#validation_ds = validation_ds.map(lambda x, y: (tf.image.resize(x, size), y))
test_ds = test_ds.map(lambda x, y: (tf.image.resize(x, size), y))

In [None]:
# Loading the data by batches to optimize speed of training and validation
batch_size = 1

In [None]:
train_ds = train_ds.cache().batch(batch_size).prefetch(buffer_size=10)
#validation_ds = validation_ds.cache().batch(batch_size).prefetch(buffer_size=10)
test_ds = test_ds.cache().batch(batch_size).prefetch(buffer_size=10)

In [None]:
# Randomizing the dataset and increasing size by augmentation
data_augmentation = keras.Sequential(
    [layers.RandomFlip("horizontal"), layers.RandomRotation(0.2),]  # 0.2 before
)

In [None]:
# Visualizing the first data-batch after augmentation
for images, labels in train_ds.take(1):
    plt.figure(figsize=(10, 10))
    first_image = images[0]
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        augmented_image = data_augmentation(
            tf.expand_dims(first_image, 0), training=True
        )
        plt.imshow(augmented_image[0].numpy().astype("int32"))
        plt.title(int(labels[0]))
        plt.axis("off")

In [None]:
# Creating a base-model / Xception and VGG16 is to big to train
base_model = keras.applications.MobileNet(
    weights="imagenet",  # Load weights pre-trained on ImageNet.
    input_shape=(32, 32, 3),
    include_top=False,
)  # Do not include the ImageNet classifier at the top.

In [None]:
# Freeze/unfreeze the base_model - Normally False
base_model.trainable = False

In [None]:
# Create new model on top
inputs = keras.Input(shape=(32, 32, 3))
# x = data_augmentation(inputs)  # Apply random data augmentation
x = inputs

In [None]:
# Pre-trained VGG16 weights requires that input be scaled
# from (0, 255) to a range of (-1., +1.), the rescaling layer
# outputs: `(inputs * scale) + offset`
scale_layer = keras.layers.Rescaling(scale=1 / 127.5, offset=-1)
x = scale_layer(x)

In [None]:
# The base model contains batchnorm layers. We want to keep them in inference mode
# when we unfreeze the base model for fine-tuning, so we make sure that the
# base_model is running in inference mode here.
x = base_model(x, training=False)
x = keras.layers.GlobalAveragePooling2D()(x)
x = keras.layers.Dropout(0.2)(x)  # Regularize with dropout
# outputs = keras.layers.Dense(1)(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)  # Modified output layer
model = keras.Model(inputs, outputs)

In [None]:
model.summary()
#saver.save(session, LOG_DIR/model.ckpt, step)

In [None]:
# Training the top-layer
model.compile(
    optimizer=keras.optimizers.RMSprop(1e-2),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy'],
)

In [None]:
epochs = 7
history = model.fit(train_ds, epochs=epochs, validation_data=test_ds, callbacks=[tensorboard_callback])

In [None]:
model.summary()
#saver.save(session, LOG_DIR/model.ckpt, step)

Unfreeze the base_model. Note that it keeps running in inference mode
since we passed `training=False` when calling it. This means that
the batchnorm layers will not update their batch statistics.
This prevents the batchnorm layers from undoing all the training
we've done so far.

In [None]:
# Unfreeze the last 1 layers of the base_model
for layer in base_model.layers[-1:]:
    layer.trainable = True
# base_model.trainable = True
model.summary()

In [None]:
model.compile(
    optimizer=keras.optimizers.RMSprop(1e-3),  # Low learning rate, try also RMSprop
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy'],
)

In [None]:
epochs = 7
history = model.fit(train_ds, epochs=epochs, validation_data=test_ds, callbacks=[tensorboard_callback])

In [None]:
model.summary()
#saver.save(session, LOG_DIR/model.ckpt, step)

In [None]:
x = keras.layers.Dropout(0.2)(x)  # Regularize with dropout
# outputs = keras.layers.Dense(1)(x)
outputs = keras.layers.Dense(10, activation="softmax")(x)  # Modified output layer
model = keras.Model(inputs, outputs)

In [None]:
model.summary()

In [None]:
# Training the top-layer
model.compile(
    optimizer=keras.optimizers.RMSprop(1e-3),  # Low learning rate, try also RMSprop
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy'],
)

In [None]:
epochs = 7
history = model.fit(train_ds, epochs=epochs, validation_data=test_ds, callbacks=[tensorboard_callback])

In [None]:
model.summary()
#saver.save(session, LOG_DIR/model.ckpt, step)

In [None]:
"""
# Unfreeze the base_model. Note that it keeps running in inference mode
# since we passed `training=False` when calling it. This means that
# the batchnorm layers will not update their batch statistics.
# This prevents the batchnorm layers from undoing all the training
# we've done so far.

# Unfreeze the last 3 layers of the base_model
for layer in base_model.layers[-3:]:
    layer.trainable = True
# base_model.trainable = True
model.summary()

model.compile(
    optimizer=keras.optimizers.Adam(1e-4),  # Low learning rate
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy'],
)

epochs = 15
history = model.fit(train_ds, epochs=epochs, validation_data=test_ds, callbacks=[tensorboard_callback])

model.summary()
#saver.save(session, LOG_DIR/model.ckpt, step)
"""

In [None]:
# Initializing a Keras tuner based on random search for the model
tuner = kt.RandomSearch(
    build_model,
    objective='val_loss',
    max_trials=5)

In [None]:
# Starting the search for the optimum hyperparameters for the model
#tuner.search(x_train, y_train, epochs=10, validation_data=(x_val, y_val))
tuner.search(train_ds, to_categorical(train_labels), epochs=10, validation_data=(test_ds, to_categorical(test_labels)))

In [None]:
best_model = tuner.get_best_models()[0]
best_model.build()
best_model.summary()

In [None]:
build_model(keras_tuner.HyperParameters())

In [None]:
"""
Evaluating and plotting the performance of the model
"""
epochrange = range(1, epochs + 1)
train_acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

In [None]:
train_loss = history.history['loss']
val_loss = history.history['val_loss']

In [None]:
plt.plot(epochrange, train_acc, 'bo', label='Training acc')
plt.plot(epochrange, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy (model 1)')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
plt.plot(epochrange, train_loss, 'bo', label='Training loss')
plt.plot(epochrange, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss (model 1)')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Evaluating the model
test_loss, test_acc = model.evaluate(test_ds)
print('Test accuracy: %.3f' % test_acc)