# Justin Zarkovacki 2/28/2023

# Transfer Learning Notebook

# Prepare imports

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import random
from glob import glob
import os

import matplotlib
from matplotlib import pyplot as plt

import tensorflow as tf
from keras.models import Sequential
from keras.layers import Conv2D, Dropout, AveragePooling2D, Flatten, Dense, Rescaling
from keras import Input, models, backend as K
from tensorflow.keras import layers, models

from sklearn.metrics import confusion_matrix
from tqdm import tqdm

print("Done!")

Done!


# Function Definitions and Variables

In [None]:
epochs = 12
img_rows, img_cols = 28, 28  # Image dimensions

def load(f):
    return np.load(f)['arr_0']
    
def initialize_data(train_im_file, test_im_file, train_lb_file, test_lb_file):
    train_images = load(train_im_file)
    test_images = load(test_im_file)
    train_labels = load(train_lb_file)
    test_labels = load(test_lb_file)
    
    if K.image_data_format() == 'channels_first':
        train_images = train_images.reshape(train_images.shape[0], 1, img_rows, img_cols)
        test_images = test_images.reshape(test_images.shape[0], 1, img_rows, img_cols)
        input_shape = (1, img_rows, img_cols)
    else:
        train_images = train_images.reshape(train_images.shape[0], img_rows, img_cols, 1)
        test_images = test_images.reshape(test_images.shape[0], img_rows, img_cols, 1)
        input_shape = (img_rows, img_cols, 1)

    train_images = train_images.astype('float32')
    test_images = test_images.astype('float32')
    train_images /= 255
    test_images /= 255
    print('{} train samples, {} test samples'.format(len(train_images), len(test_images)))
    
    return tuple([train_images, test_images, train_labels, test_labels, input_shape])

# Helper to create the graphics
def create_visuals(graph_title, model_hist, test_images, test_labels):
    accuracy_data = model_hist.history['accuracy']
    val_accuracy_data = model_hist.history['val_accuracy']

    lower_bound = min(min(accuracy_data), min(val_accuracy_data))

    plt.plot(accuracy_data, label='Train Accuracy')
    plt.plot(val_accuracy_data, label = 'Validation Accuracy')

    plt.title(graph_title)
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.ylim([lower_bound - 0.01, 1])
    plt.legend(loc='lower right')

    print(accuracy_data[-1])
    print(val_accuracy_data[-1])
#     test_loss, test_acc = model_hist.evaluate(test_images, test_labels, verbose=2)

print("Done!")

Done!


## Load Data

In [None]:
prefix = '/content/drive/MyDrive/datasets//'
datasets = {
    "km_tr_i" : prefix + 'Kuzushiji-MNIST - train-imgs.npz',
    "km_te_i" : prefix + 'Kuzushiji-MNIST - test-imgs.npz',
    "km_tr_l" : prefix + 'Kuzushiji-MNIST - train-labels.npz',
    "km_te_l" : prefix + 'Kuzushiji-MNIST - test-labels.npz',
    "k49_tr_i" : prefix + 'Kuzushiji-49 - train-imgs.npz',
    "k49_te_i" : prefix + 'Kuzushiji-49 - test-imgs.npz',
    "k49_tr_l" : prefix + 'Kuzushiji-49 - train-labels.npz',
    "k49_te_l" : prefix + 'Kuzushiji-49 - test-labels.npz',
}

In [None]:
# Load KMNIST Data
dataset = initialize_data(datasets["km_tr_i"], datasets["km_te_i"], datasets["km_tr_l"], datasets["km_te_l"])
kmnist_train_images = dataset[0]
kmnist_test_images = dataset[1]
kmnist_train_labels = dataset[2]
kmnist_test_labels  = dataset[3]
kmnist_input_shape = dataset[4]
kmnist_classes = 10
kmnist_epochs = 15

In [None]:
# Load Kuzushiji-49 Data
dataset = initialize_data(datasets["k49_tr_i"], datasets["k49_te_i"], datasets["k49_tr_l"], datasets["k49_te_l"])
k49_train_images = dataset[0]
k49_test_images = dataset[1]
k49_train_labels = dataset[2]
k49_test_labels  = dataset[3]
k49_input_shape = dataset[4]
k49_classes = 49
k49_epochs = 15

batches = 128
num_trials = 15

print("Done!")

In [None]:
# Load Kanji data
kanji_classes = 150
kanji_epochs = 15
kanji_path = '/content/drive/MyDrive/datasets/final_dataset'

kanji_train = tf.keras.utils.image_dataset_from_directory(kanji_path, validation_split=0.3, seed=222,
                  subset="training", color_mode="grayscale", image_size=im_size, batch_size=bs)

kanji_val = tf.keras.utils.image_dataset_from_directory(kanji_path, validation_split=0.3, seed=222,
                  subset="validation", color_mode="grayscale", image_size=im_size, batch_size=bs)

print("Done!")

In [None]:
# Normalize data
normalization_layer = tf.keras.layers.Rescaling(1./255)

# Rescale Kanji training data
normalized_kanji_train = kanji_train.map(lambda x, y: (normalization_layer(x), y))
normalized_kanji_val = kanji_val.map(lambda x, y: (normalization_layer(x), y))

In [None]:
# Optimize Kanji dataset
AUTOTUNE = tf.data.AUTOTUNE

# Kanji Optimization
kanji_train = kanji_train.cache().prefetch(buffer_size=AUTOTUNE)
kanji_val = kanji_val.cache().prefetch(buffer_size=AUTOTUNE)

print("Done!")

# Define the Ensemble Model Design

In [None]:
# Define class
class EnsembleClassifier:

    model_names = ['angel_kanji_model', 'john_kanji_model', 'justin_kanji_model'] # names are gonna have to change
    model_name_pattern = '*model*'
    trained_models = {}
    missclassifications = {}
    y_true = []
    y_pred = []


    def __init__(self, dir) -> None:                                              # pass in list of model names instead of hard coded like ^
        working_dir = os.path.join(os.getcwd(), dir)                              # All os calls are probably gonna have to go

        for name in glob(os.path.join(working_dir, self.model_name_pattern)):     # os call
            model_dir = os.path.join(working_dir, name)                           # os call
            try:
                self.trained_models[name] = models.load_model(model_dir)
                print(f'LOADING MODEL: {name}\n')
                self.trained_models[name].summary()
                print('')
            except Exception as e:
                print(e)


    def predict(self, element):
        model_predictions = [None] * len(self.trained_models.keys())

        # Use each individual model to predict
        for i, (name, model) in enumerate(self.trained_models.items()):
            model_predictions[i] = model.predict(element)

        model_predictions = np.sum(np.array(model_predictions), axis=0)

        return np.argmax(model_predictions)
        

    def validate(self, validation_data):
        for i, (element, label) in tqdm(enumerate(validation_data), ncols=100, desc='Validation Progress'):
            true_label = validation_data.class_names[label[0].numpy()]
            prediction = self.predict(element)
            pred_label = validation_data.class_names[prediction]

            self.y_true.append(true_label)
            self.y_pred.append(pred_label)

            if pred_label != true_label:
                # print(f'Model confused: {pred_label} for {true_label}')

                if pred_label in self.missclassifications.keys():
                    self.missclassifications[pred_label] += 1
                else:
                    self.missclassifications[pred_label] = 1
        
        conf_matrix = confusion_matrix(self.y_true, self.y_pred)
        df_cm = pd.DataFrame(conf_matrix, 
                                index=[i for i in validation_data.class_names], 
                                columns=[i for i in validation_data.class_names])

        plt.figure(figsize=(40,40))
        ax = sns.heatmap(df_cm, annot=True, vmax=8)
        ax.set(xlabel="Predicted", ylabel="True", title=f'Ensemble Model Confusion Matrix for: {len(validation_data.class_names)} classes')
        ax.xaxis.tick_top()
        plt.xticks(rotation=90)
        plt.show()
        print('')
            

    def demo(self, validation_data):
        for i, (element, label) in enumerate(validation_data):
            true_label = validation_data.class_names[label[0].numpy()]
            prediction = self.predict(element)
            pred_label = validation_data.class_names[prediction]
            print(f'PRED: {pred_label}')
            print(f'TRUE: {true_label}')
            if pred_label != true_label:
                print(f'Model confused: {pred_label} for {true_label}')

            self.plot_image(element.numpy()[0].astype("uint8"), true_label)

            if i >= 10:
                break
    

    def plot_image(self, image, label):
        plt.figure(figsize=(3, 3))
        plt.imshow(image, cmap='gray', vmin=0, vmax=255)
        plt.title(label)
        plt.axis("off")
        plt.show()
        

    def plot_missclassifications(self):
        plt.bar(self.missclassifications.keys(), 
                self.missclassifications.values(), 
                1.0, color='r')
        plt.show()



# Run KMNIST Ensemble

In [None]:
if __name__ == "__main__":

    kmnist_val = tf.keras.utils.image_dataset_from_directory(
                                        './Code/datasets/class_final_dataset',    # this is gonna have to change
                                        validation_split=0.3,
                                        subset="validation",
                                        seed=132,
                                        image_size=(28, 28),
                                        color_mode = "grayscale",
                                        batch_size=1)

    my_ensemble_model = EnsembleClassifier('./Code/testing_models')               # so is this
    # my_ensemble_model.demo(new_kkanji_class_final_dataset_val)
    # print()
    my_ensemble_model.validate(new_kkanji_class_final_dataset_val)
    # 96.13% accuracy on validation data


# Run K49 Ensemble

In [None]:
if __name__ == "__main__":

    kmnist_val = tf.keras.utils.image_dataset_from_directory(
                                        './Code/datasets/class_final_dataset',    # this is gonna have to change
                                        validation_split=0.3,
                                        subset="validation",
                                        seed=132,
                                        image_size=(28, 28),
                                        color_mode = "grayscale",
                                        batch_size=1)

    my_ensemble_model = EnsembleClassifier('./Code/testing_models')               # so is this
    # my_ensemble_model.demo(new_kkanji_class_final_dataset_val)
    # print()
    my_ensemble_model.validate(new_kkanji_class_final_dataset_val)
    # 96.13% accuracy on validation data


# Run Kanji Ensemble

In [None]:
if __name__ == "__main__":

    kmnist_val = tf.keras.utils.image_dataset_from_directory(
                                        './Code/datasets/class_final_dataset',    # this is gonna have to change
                                        validation_split=0.3,
                                        subset="validation",
                                        seed=132,
                                        image_size=(28, 28),
                                        color_mode = "grayscale",
                                        batch_size=1)

    my_ensemble_model = EnsembleClassifier('./Code/testing_models')               # so is this
    # my_ensemble_model.demo(new_kkanji_class_final_dataset_val)
    # print()
    my_ensemble_model.validate(new_kkanji_class_final_dataset_val)
    # 96.13% accuracy on validation data
