In [None]:
import numpy as np
from collections import Counter
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
import matplotlib.pyplot as plt
from sklearn.svm import LinearSVC
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import GridSearchCV
from keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from kerastuner.tuners import RandomSearch
from keras.utils import to_categorical
from keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import TensorBoard
import pandas as pd

import matplotlib.pyplot as plt


RANDOM_STATE = 0

### Data load, explore and preprocess
1. shape and size view
2. nan data check
3. class similarity

In [None]:
# loading
X_train = np.load('dataset/Assignment2Data/X_train.npy')
X_test = np.load('dataset/Assignment2Data/X_test.npy')
y_train = np.load('dataset/Assignment2Data/y_train.npy')
y_test = np.load('dataset/Assignment2Data/y_test.npy')

# data size and shape
print(f"Train size: {X_train.shape[0]}\nTest size: {X_test.shape[0]}\nImage shape: {X_test.shape[1:]}\n")

# Check nan values
def check_nan(data) -> bool:
    return np.isnan(data).any()

print(f"Nan value exists detect\nX_train: {check_nan(X_train)}\ny_train: {check_nan(y_train)}\nX_test: {check_nan(X_test)}\ny_test: {check_nan(y_test)}\n")

# num of class
cnt_class = Counter(y_train)
print(f"num of class: {cnt_class}")
classes = list(cnt_class.keys())
counts = list(cnt_class.values())

# visual of class
plt.bar(classes, counts)
plt.title('Class Distribution')
plt.xlabel('Class')
plt.ylabel('Count')
plt.xticks(classes)
plt.show()

# single image show
image_index = 3
image = X_train[image_index]

plt.imshow(image, cmap='gray')
plt.title(f"Label: {y_train[image_index]}")
plt.axis('off')
plt.show()



In [None]:
'''
Preprocess
1. label one-hot encode (for FNN and CNN)
2. train-valid split and data reshape (for FNN and CNN)
'''

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=RANDOM_STATE)
X_train_re = X_train.reshape(-1, 28, 28, 1)
X_val_re = X_val.reshape(-1, 28, 28, 1)
X_test_re = X_test.reshape(-1, 28, 28, 1)

y_train_encoded = to_categorical(y_train, 11)
y_val_encoded = to_categorical(y_val, 11)
y_test_encoded = to_categorical(y_test, 11)


In [None]:
# Extract feature (for LinearSVC & similarity review)
# This cnn model is set just to extract feature for linearSVC and feature similarity review. it s not relevent to the MLP and CNN experiment.

input_shape = (28, 28, 1)

model = Sequential()

model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape))
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Flatten())

all_features = model.predict(X_train)
all_features_test = model.predict(X_test)

# average feature for each class
feature_dict = {key:[] for key in cnt_class.keys()}
class_avg_feature = {key:None for key in cnt_class.keys()}

for idx, feature in enumerate(all_features):
    feature_dict[y_train[idx]].append(feature)

for key, features in feature_dict.items():
    class_avg_feature[key] = np.mean(features, axis=0)

# calculate similarity
similarity_matrics = np.zeros((11, 11))

sigma = 1.0 # similarity function param

for i in range(11):
    for j in range(11):
        distance = np.linalg.norm(class_avg_feature[i] - class_avg_feature[j])
        similarity_matrics[i][j] = 1 / distance

# hot map visual
plt.imshow(similarity_matrics, cmap='hot', interpolation='nearest')
plt.colorbar()
plt.title('Similarity Matrix Heatmap')
plt.show()  


### Model build
1. Linear SVC
2. MLP
3. CNN

#### 1.Linear SVC  
model  
params search  
best model

In [None]:
# Linear SVC

SVC_clf = LinearSVC(dual="auto", random_state=RANDOM_STATE, tol=1e-3, C=1.0, verbose=True)

# training
SVC_clf.fit(all_features, y_train)

# Assessment
# accuracy, precision, recall, f1-score
y_pred = SVC_clf.predict(all_features_test)

acc_SVC = accuracy_score(y_test, y_pred)
pre_SVC = precision_score(y_test, y_pred, average='macro')
rec_SVC = recall_score(y_test, y_pred, average='macro')
f1_SVC = f1_score(y_test, y_pred, average='macro')

cm_SVC = confusion_matrix(y_test, y_pred)

print(f"Acc: {acc_SVC}\nPre: {pre_SVC}\nRec: {rec_SVC}\nF1: {f1_SVC}\n")
print(f"Confusion Matrix(SVC): {cm_SVC}")




In [None]:
'''
Params search
'''
# params grid
param_grid = {
    'C': [0.1, 1], 
    'penalty': ['l1', 'l2'], 
    'loss': ['squared_hinge'], 
    'dual': ['auto'],
    'tol': [1e-3, 1e-4]
}

# grid search
linear_svc = LinearSVC(random_state=RANDOM_STATE)

grid_search = GridSearchCV(linear_svc, param_grid, cv=3, scoring='accuracy')

grid_search.fit(all_features, y_train)

# best params set
print("Best parameters found: ", grid_search.best_params_)

# best model
best_linear_svc = grid_search.best_estimator_
test_accuracy = best_linear_svc.score(all_features_test, y_test)
print("Test set accuracy: ", test_accuracy)

In [None]:
# Visual of LinearSVC params search
results = pd.DataFrame(grid_search.cv_results_)

param_names = grid_search.param_grid.keys()

fig, axes = plt.subplots(nrows=len(param_names), ncols=1, figsize=(10, 5 * len(param_names)))

for i, param_name in enumerate(param_names):
    param_values = results[param_name].unique()
    
    for value in param_values:
        subset = results[results[param_name] == value]
        axes[i].plot(subset['param_C'], subset['mean_test_score'], marker='o', label=f'{param_name}={value}')
    
    axes[i].set_title(f'Grid Search Results for {param_name}')
    axes[i].set_xlabel(param_name)
    axes[i].set_ylabel('Mean Test Score')
    axes[i].legend()

plt.tight_layout()

plt.show()

In [None]:
# Data Enhance (for both MLP and CNN)
train_datagen = ImageDataGenerator(
    rotation_range=20,        
    # width_shift_range=0.2,    
    # height_shift_range=0.2,
    # featurewise_std_normalization=True,        
    # zoom_range=0.2,                 
)

train_generator = train_datagen.flow(X_train_re, y_train_encoded, batch_size=4)

#### 2.MLP  
model  
params search  
best model

In [None]:
# Training setting (for MLP)
train_size = 15142
val_size = 3786
test_size = 4732
batch_size = 4
epochs = 8
lr = 0.01
train_steps = train_size // batch_size
valid_steps = test_size // batch_size

In [None]:
# MLP model build

def FCNN():
        '''for single model training'''
        FCNN_model = Sequential()
        FCNN_model.add(Flatten(input_shape=(28, 28, 1)))  

        FCNN_model.add(Dense(128, activation='relu'))
        FCNN_model.add(Dropout(0.5))

        FCNN_model.add(Dense(64, activation='relu'))
        FCNN_model.add(Dropout(0.5))

        FCNN_model.add(Dense(11, activation='sigmoid'))

        optimizer = Adam(learning_rate=lr)

        FCNN_model.compile(loss='categorical_crossentropy',
                optimizer=optimizer,
                metrics=['categorical_accuracy'])
        return FCNN_model

def FCNN_tuning(hp):
        '''for params search'''
        model = Sequential()
        # flatten
        model.add(Flatten(input_shape=(28, 28, 1)))

        # 1th dense
        model.add(Dense(
                units=hp.Int('units', min_value=32, max_value=512, step=32),
                activation='relu'))
        # drop out
        model.add(Dropout(rate=hp.Float('dropout', min_value=0.0, max_value=0.5, step=0.1)))

        # 2th dense
        model.add(Dense(
                units=hp.Int('units', min_value=32, max_value=512, step=32),
                activation='relu'))

        # 3th dense output
        model.add(Dense(11, activation='softmax'))

        # compile
        model.compile(
                optimizer=Adam(
                hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])),
                loss='categorical_crossentropy',
                metrics=['categorical_accuracy'])

        return model

In [None]:
# MLP train & test
# train
callbacks = [EarlyStopping(monitor='val_accuracy', patience=2, verbose=1, mode='max')]

FCNN_model = FCNN()

FCNN_model.fit(
        train_generator,
        steps_per_epoch=train_steps,
        epochs=epochs,
        validation_data=(X_val_re, y_val_encoded),
        validation_steps=valid_steps,
        callbacks=callbacks
        )

# test
test_loss, test_accuracy = FCNN_model.evaluate(X_test_re, y_test_encoded, verbose=2)
y_pred_MLP = FCNN_model.predict(X_test_re)
cm_MLP = confusion_matrix(y_test_encoded, y_pred_MLP)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}\nConfusion Matrix:{cm_MLP}")

In [None]:
# params tuning
FCNN_tuner = RandomSearch(
    FCNN_tuning,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=3,
    directory='dir_FCNN',
    project_name='FCNN')

tensorboard_callback = TensorBoard(log_dir='./FCNN_logs')

FCNN_tuner.search(X_train_re, y_train_encoded, epochs=10, validation_data=(X_test_re, y_test_encoded), callbacks=[tensorboard_callback])

best_hps = FCNN_tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""
The hyperparameter search is complete. The optimal number of units in the first densely-connected
layer is {best_hps.get('units')} and the optimal learning rate for the optimizer
is {best_hps.get('learning_rate')}.
""")

# model built on best params set
FCNN_tuning_model = FCNN_tuner.hypermodel.build(best_hps)

# train
history = FCNN_tuning_model.fit(X_train_re, y_train_encoded, epochs=10, validation_data=(X_test_re, y_test_encoded))

# assessment
test_loss, test_acc = FCNN_tuning_model.evaluate(X_test_re, y_test_encoded)
print(f"Test accuracy: {test_acc}")

%tensorboard --logdir=./FCNN_logs

#### 3.CNN  
model  
params search  
best model

In [None]:
# Training setting
train_size = 15142
val_size = 3786
test_size = 4732
batch_size = 4
epochs = 12
lr = 0.001
train_steps = int(train_size / batch_size)
valid_steps = int(test_size / batch_size)

In [None]:
# CNN
def CNN():
    '''for single model training'''
    input_shape = (28, 28, 1)

    CNN_model = Sequential()

    CNN_model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape))
    CNN_model.add(MaxPooling2D(pool_size=(2, 2)))

    CNN_model.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
    CNN_model.add(MaxPooling2D(pool_size=(2, 2)))

    CNN_model.add(Flatten())

    CNN_model.add(Dense(128, activation='relu'))

    CNN_model.add(Dropout(0.5))

    CNN_model.add(Dense(11, activation='softmax'))

    optimizer = Adam(learning_rate=lr)

    CNN_model.compile(loss='categorical_crossentropy',
                optimizer=optimizer,
                metrics=['accuracy'])
    return CNN_model

def CNN_tuning(hp):
    '''for params tuning'''
    model = Sequential()
    model.add(Conv2D(
        filters=hp.Int('filters', min_value=32, max_value=128, step=16),
        kernel_size=hp.Choice('kernel_size', values=[3, 5]),
        activation='relu',
        input_shape=(28, 28, 1)))
    model.add(MaxPooling2D(pool_size=2))
    model.add(Conv2D(
        filters=hp.Int('filters', min_value=32, max_value=128, step=16),
        kernel_size=hp.Choice('kernel_size', values=[3, 5]),
        activation='relu'))
    model.add(MaxPooling2D(pool_size=2))
    model.add(Flatten())
    model.add(Dense(
        units=hp.Int('units', min_value=64, max_value=512, step=64),
        activation='relu'))
    model.add(Dense(11, activation='softmax'))
    
    # compile
    model.compile(
        optimizer=Adam(
            hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])),
        loss='categorical_crossentropy',
        metrics=['accuracy'])
    
    return model

In [None]:
# CNN train & test
# train
callbacks = [EarlyStopping(monitor='val_accuracy', patience=2, verbose=1, mode='max')]

CNN_model = CNN()

CNN_model.fit(
    train_generator,
    steps_per_epoch=train_steps,
    epochs=epochs,
    validation_data=(X_val_re, y_val_encoded),
    validation_steps=valid_steps,
    callbacks=callbacks
    )

# test
test_loss, test_accuracy = CNN_model.evaluate(X_test_re, y_test_encoded, verbose=2)
y_pred_CNN = CNN_model.predict(y_test_encoded)
cm_CNN = confusion_matrix(y_test_encoded, y_pred_CNN)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}\nConfusion Matrix: {cm_CNN}")

In [None]:
# params search
CNN_tuner = RandomSearch(
    CNN_tuning,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=3,
    directory='dir_CNN',
    project_name='CNN')

tensorboard_callback = TensorBoard(log_dir='./CNN_logs')

CNN_tuner.search(X_train_re, y_train_encoded, epochs=10, validation_data=(X_test_re, y_test_encoded), callbacks=[tensorboard_callback])

best_hps = CNN_tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""
The hyperparameter search is complete. The optimal number of units in the first densely-connected
layer is {best_hps.get('units')} and the optimal learning rate for the optimizer
is {best_hps.get('learning_rate')}.
""")

# model built on best params set
CNN_tuning_model = CNN_tuner.hypermodel.build(best_hps)

# train
history = CNN_tuning_model.fit(X_train_re, y_train_encoded, epochs=10, validation_data=(X_test_re, y_test_encoded))

# assessment
test_loss, test_acc = CNN_tuning_model.evaluate(X_test_re, y_test_encoded)
print(f"Test accuracy: {test_acc}")

%tensorboard --logdir=./CNN_logs