In [None]:
# import libraries
import os
import time
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, LayerNormalization, Input, Add, Conv2D, Reshape, GlobalAveragePooling1D, Dropout, Flatten
from tensorflow.keras.activations import gelu
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.keras.datasets import mnist, cifar100
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
from tensorflow.keras.applications import VGG16
from sklearn.model_selection import train_test_split
from vit_keras import vit, utils
import matplotlib.pyplot as plt
import numpy as np
import shutil
from PIL import Image
from sklearn.metrics import accuracy_score
from datasets import load_dataset

In [None]:
from helper_functions import *
from mlp import *

In [None]:
# setup file directory
file_path = "/home/ecbm4040/e4040-2023fall-project-mlpm-hb2776-dg3370-amp2365"

# from google.colab import drive
# drive.mount('/content/drive')
import os
os.chdir(file_path)

In [None]:
# check availability of GPU
print(tf.__version__)

if tf.test.is_gpu_available():
    print("GPU is available.")
    print("Available GPUs:")
    for gpu in tf.config.list_physical_devices('GPU'):
        print(gpu)
else:
    print("CPU is available.")

# Load the Imagenet 1000 (mini) dataset

In [None]:
# Define a function to load a dataset from folders using ImageDataGenerator
def load_dataset_from_folders(main_folder, image_size=(224, 224), batch_size=16):
    # Create an ImageDataGenerator with rescaling
    datagen = ImageDataGenerator(rescale=1./255)

    # Generate a flow of data from the specified directory
    dataset = datagen.flow_from_directory(
        main_folder,
        target_size=image_size,
        batch_size=batch_size,
        class_mode='sparse'
    )

    return dataset

# Define the path to the training folder and load the training dataset
train_folder_path = os.path.join(file_path, "imagenet-mini", "train")
train_dataset = load_dataset_from_folders(train_folder_path)

# Display information about the training dataset
print("Number of batches:", len(train_dataset))
print("Batch shape:", train_dataset[0][0].shape)

# Define the path to the validation folder and load the validation dataset
val_folder_path = os.path.join(file_path, "imagenet-mini", "val")
val_dataset = load_dataset_from_folders(val_folder_path)

# Display information about the validation dataset
print("Number of batches:", len(val_dataset))
print("Batch shape:", val_dataset[0][0].shape)

# Define the input shape and number of classes for the ImageNet dataset
image_net_input_shape = (224, 224, 3)
image_net_num_classes = 945  # Note: Please check this value again

# pretrain MLP mixer on Imagenet (1000)

In [None]:
# Function for gradient clipping
def clip_norm(gradients, clip_value):
    # Clip gradients to a specified range
    return K.clip(gradients, -clip_value, clip_value)


# Learning Rate Scheduler
class CosineAnnealingScheduler(Callback):
    def __init__(self, T_max, eta_max, eta_min=0, verbose=0):
        # Cosine annealing learning rate scheduler
        super(CosineAnnealingScheduler, self).__init__()
        self.T_max = T_max
        self.eta_max = eta_max
        self.eta_min = eta_min
        self.verbose = verbose

    def on_epoch_begin(self, epoch, logs=None):
        # Callback at the beginning of each epoch
        if not hasattr(self.model.optimizer, 'lr'):
            raise ValueError('Optimizer must have a "lr" attribute.')
        lr = self.eta_min + 0.5 * (self.eta_max - self.eta_min) * (1 + np.cos(np.pi * epoch / self.T_max))
        K.set_value(self.model.optimizer.lr, lr)
        if self.verbose > 0:
            print('\nEpoch %05d: CosineAnnealingScheduler setting learning rate to %s.' % (epoch + 1, lr))

class WarmUpLearningRateScheduler(Callback):
    def __init__(self, warmup_batches, init_lr, verbose=0):
        # Warm-up learning rate scheduler
        super(WarmUpLearningRateScheduler, self).__init__()
        self.warmup_batches = warmup_batches
        self.init_lr = init_lr
        self.verbose = verbose
        self.current_batch = 0

    def on_batch_begin(self, batch, logs=None):
        # Callback at the beginning of each batch
        if self.current_batch <= self.warmup_batches:
            lr = self.current_batch * self.init_lr / self.warmup_batches
            K.set_value(self.model.optimizer.lr, lr)
            if self.verbose > 0:
                print('\nBatch %05d: WarmUpLearningRateScheduler setting learning rate to %s.' % (self.current_batch + 1, lr))
        self.current_batch += 1

# Set hyperparameters
warmup_batches = 500
init_lr = 0.001

# Create the callbacks for cosine annealing and warm-up
cosine_annealing = CosineAnnealingScheduler(T_max=100, eta_max=0.001, eta_min=0.0001, verbose=1)
warmup_lr = WarmUpLearningRateScheduler(warmup_batches=warmup_batches, init_lr=init_lr, verbose=1)

In [None]:
# Define hyperparameters and model architecture for MLP model
epsilon = 1e-3
input_shape = image_net_input_shape
number_of_mixers = 24
token_mixing_num_mlps = 512
channel_mixing_num_mlps = 4096
patch_size = 16
hidden_dims = 1024
num_classes = image_net_num_classes

# Display separator for clarity
print(" ")
print("+" * 50)
print(" ")

# Create MLP model using specified parameters
mlp_model = makeModel(
    input_shape=input_shape,
    number_of_mixers=number_of_mixers,
    token_mixing_num_mlps=token_mixing_num_mlps,
    channel_mixing_num_mlps=channel_mixing_num_mlps,
    patch_size=patch_size,
    hidden_dims=hidden_dims,
    num_classes=num_classes,
    dropout=0.5
)

# Compile the model using the Adam optimizer and specified learning rate
clip_value = 1.0
mlp_model.compile(optimizer=Adam(learning_rate=0.001, clipnorm=clip_value),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

# Display model summary
mlp_model.summary()

# Train the MLP model using the specified datasets and callbacks
mlp_history = mlp_model.fit(train_dataset, epochs=30, validation_data=val_dataset, callbacks=[cosine_annealing, warmup_lr])

# Save the trained model and its history
save_data(file_path, mlp_model, mlp_history, "mlp_imnet_mini")