In [None]:
#importing neccesary utils and libraries, to optimize the train time, please trained all models 
# in this notebook using GPU

import os 
from os.path import join
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, Flatten, Dropout, MaxPooling2D, Conv2D, BatchNormalization, Activation
from tensorflow.keras.models import Sequential 
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import classification_report, confusion_matrix
import cv2
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import pandas as pd
from keras.utils import plot_model
from tqdm import tqdm
import seaborn as sns
from sklearn.model_selection import KFold, cross_val_score 
from sklearn.svm import SVC 
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.decomposition import PCA
from sklearn.model_selection import KFold
from sklearn.metrics import  ConfusionMatrixDisplay, accuracy_score, recall_score, precision_score, f1_score
from sklearn import preprocessing
from sklearn import metrics
import time


IMG_SIZE = 32
INPUT_SHAPE = (32, 32, 3)
BATCH_SIZE = 64
NUM_EPOCH  = 60 
NUM_CLASSES = 43 
NUM_FOLDS = 5

## Replace the path to your germanzip file in this section 
## e.g. : train = 'path/germanzip/2025_A2/train

train = '/kaggle/input/germanzip/2025_A2/train'
test  = '/kaggle/input/germanzip/2025_A2/test'
    

## Experiment Sections
1. Data Analysis
2. Data visualization and Augementation 
3. Support Vector Machine + Logisitc Regression 
4. Convolutional Neural Network
5. Ensemble Method

## 1. Data Analysis

In [None]:
# Label Overview
classes = { 0:'Speed limit (20km/h)',
            1:'Speed limit (30km/h)', 
            2:'Speed limit (50km/h)', 
            3:'Speed limit (60km/h)', 
            4:'Speed limit (70km/h)', 
            5:'Speed limit (80km/h)', 
            6:'End of speed limit (80km/h)', 
            7:'Speed limit (100km/h)', 
            8:'Speed limit (120km/h)', 
            9:'No passing', 
            10:'No passing veh over 3.5 tons', 
            11:'Right-of-way at intersection', 
            12:'Priority road', 
            13:'Yield', 
            14:'Stop', 
            15:'No vehicles', 
            16:'Veh > 3.5 tons prohibited', 
            17:'No entry', 
            18:'General caution', 
            19:'Dangerous curve left', 
            20:'Dangerous curve right', 
            21:'Double curve', 
            22:'Bumpy road', 
            23:'Slippery road', 
            24:'Road narrows on the right', 
            25:'Road work', 
            26:'Traffic signals', 
            27:'Pedestrians', 
            28:'Children crossing', 
            29:'Bicycles crossing', 
            30:'Beware of ice/snow',
            31:'Wild animals crossing', 
            32:'End speed + passing limits', 
            33:'Turn right ahead', 
            34:'Turn left ahead', 
            35:'Ahead only', 
            36:'Go straight or right', 
            37:'Go straight or left', 
            38:'Keep right', 
            39:'Keep left', 
            40:'Roundabout mandatory', 
            41:'End of no passing', 
            42:'End no passing veh > 3.5 tons' }

In [None]:
# Load train and test metadata
train_metadata = pd.read_csv(join(train, 'train_metadata.csv'))
test_metadata  = pd.read_csv(join(test,  'test_metadata.csv'))
train_metadata.head()


In [None]:
#Analysis on the distribution and reasoning for adding data 

num_instance = [0] * NUM_CLASSES
for _, row in tqdm(train_metadata.iterrows(), desc="Processing"):
    num_instance[row['ClassId']] += 1

frequency = np.array(num_instance)
labels = [classes[i] for i in range(NUM_CLASSES)]

plt.figure(figsize=(22, 10))
bars = plt.bar(range(NUM_CLASSES), frequency, tick_label=labels)
plt.xlabel('Class Label')
plt.ylabel('Frequency')
plt.title('Traffic Sign Class Distribution') 
plt.xticks(rotation=60)
plt.savefig('Class_distribution.png')


plt.tight_layout()
plt.show()

## 2. Image visualization and Augementation 

In [None]:
train_images, train_labels = [], []
for i, row in tqdm(train_metadata.iterrows(), desc=f"Processing"):
    img_path = os.path.join(train, row['image_path'])
    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (32, 32))  # Resize to 32x32
    train_images.append(image)
    train_labels.append(row['ClassId'])


In [None]:
from tensorflow.keras.utils import to_categorical
train_images = np.array(train_images)
train_labels = to_categorical(train_labels)

In [None]:
#Data Augmentation
aug = ImageDataGenerator(
    rescale = 1.0 / 255.0, 
    rotation_range = 10, 
    zoom_range=0.15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.15,
    horizontal_flip=False,
    vertical_flip=False,
    fill_mode="nearest",
    brightness_range = [0.95, 2.95]
)
# for gamma 
aug_1 = ImageDataGenerator(
    rescale = 1.0 / 255.0, 
    rotation_range = 10, 
    zoom_range=0.15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.15,
    horizontal_flip=False,
    vertical_flip=False,
    fill_mode="nearest",
)



In [None]:
import matplotlib.pyplot as plt

trainGenerator = aug.flow(train_images[3:4], train_labels[3:4], batch_size=32) 
plt.figure(figsize=(25, 25))
for i in range(10):
    plt.subplot(5, 5, i+1)
    x_batch, _= next(trainGenerator)
    image = x_batch[0]
    plt.imshow(x_batch[0])
    plt.axis('off')

plt.tight_layout()
plt.show()


In [None]:
print("Train Images Shape:", train_images.shape)
print("Train Labels Shape:", train_labels.shape)

## 3. Feature Extraction and simple classifier SVM, Logistic Regression, KNN 

## 3.1. Image processing 

In [None]:
from skimage.feature import hog
def compute_hog(img):
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    _, hog_image = hog(gray, visualize = True)
    return hog_image

In [None]:
train_features, train_hog_features = [], [] 

for i, row in tqdm(train_metadata.iterrows(), desc=f"Processing"):
    img_path = os.path.join(train, row['image_path'])
    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (32, 32))  # Resize to 32x32
    train_features.append(image)
    train_hog_features.append(compute_hog(image))

train_features     = np.array(train_features)
train_hog_features = np.array(train_hog_features)

train_features.resize(len(train_features), 3072)
train_hog_features.resize(len(train_features), 1024)
train_combine      = np.concatenate([train_features, train_hog_features], axis = 1)

In [None]:
test_features, test_hog_features = [], []
for i, row in tqdm(test_metadata.iterrows(),desc = f"Processing"): 
    img_path = os.path.join(test, row['image_path'])
    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (32, 32))
    test_features.append(image)
    test_hog_features.append(compute_hog(image))

test_features = np.array(test_features)
test_hog_features = np.array(test_hog_features)

test_features.resize(len(test_features), 3072)
test_hog_features.resize(len(test_features), 1024)
test_combine     = np.concatenate([test_features, test_hog_features], axis = 1)

In [None]:
train_class = []
for i, row in tqdm(train_metadata.iterrows(), desc=f"Processing"):
    train_class.append(row['ClassId'])

train_class = np.array(train_class)

## 3.2 Evaluation using K-Fold

In [None]:
def evaluation_on_simple_full(train_attr_name, train_attrs, model_name, model): 
  kf = KFold(n_splits = NUM_FOLDS, shuffle = True, random_state = 42)
  print("=" * 80)  
  result = {'accuracy'  : 0.0, 'precision' : 0.0, 'recall' : 0.0, 'f1_score' : 0.0} 
  for train_index, validation_index in kf.split(train_attrs): 
       train_image, train_label = train_attrs[train_index], train_class[train_index]
       validation_image, validation_label = train_attrs[validation_index], train_class[validation_index]
       train_image = preprocessing.scale(train_image)
       validation_image = preprocessing.scale(validation_image)
       model.fit(train_image, train_label) 
       predictions = model.predict(validation_image)
       ground_truth = validation_label
      
       result['accuracy']  += accuracy_score(ground_truth, predictions)
       result['precision'] += precision_score(ground_truth, predictions, average = 'weighted')
       result['recall']    += recall_score(ground_truth, predictions,    average = 'weighted')
       result['f1_score']  += f1_score(ground_truth, predictions,        average = 'weighted')

  result['accuracy']  /= NUM_FOLDS
  result['precision'] /= NUM_FOLDS
  result['recall']    /= NUM_FOLDS
  result['f1_score']  /= NUM_FOLDS
  print(f"Result of training on {train_attr_name} using {model_name} with full features")
  print(result)



In [None]:
for dataset, name in [(train_combine, "Original + HOG"),(train_features, "Original"), (train_hog_features, "HOG")]:
    evaluation_on_simple_full(name, 
                              dataset,   
                              "Logistic Regression", 
                               LogisticRegression(max_iter=200, multi_class='multinomial', solver='lbfgs', C = NUM_CLASSES))
    evaluation_on_simple_full(name,  
                              dataset, 
                              "SVM", 
                              SVC(kernel = 'rbf', C = NUM_CLASSES))



In [None]:
# this may take a while to train 
def evaluation_on_simple(train_attr_name, train_attrs, model_name, model, num): 
    kf = KFold(n_splits = NUM_FOLDS, shuffle = True, random_state = 42)
    selectors = {
      "PCA" : PCA(n_components = num),
      "Mutual Info" : SelectKBest(score_func=mutual_info_classif, k = num)
    }  
    for selector_name, selector in selectors.items(): 
      print("=" * 80)  
      result = {'accuracy'  : 0.0, 'precision' : 0.0, 'recall' : 0.0, 'f1_score' : 0.0} 
      for train_index, validation_index in kf.split(train_attrs): 
           train_image, train_label = train_attrs[train_index], train_class[train_index]
           validation_image, validation_label = train_attrs[validation_index], train_class[validation_index]

           train_image = preprocessing.scale(train_image)
           validation_image = preprocessing.scale(validation_image)
          
           train_image = selector.fit_transform(train_image, train_label)
           validation_image = selector.transform(validation_image)
           model.fit(train_image, train_label) 
           predictions = model.predict(validation_image)
           ground_truth = validation_label
           result['accuracy']  += accuracy_score(ground_truth, predictions)
           result['precision'] += precision_score(ground_truth, predictions, average = 'weighted')
           result['recall']    += recall_score(ground_truth, predictions,    average = 'weighted')
           result['f1_score']  += f1_score(ground_truth, predictions,        average = 'weighted')
    
      result['accuracy']  /= NUM_FOLDS
      result['precision'] /= NUM_FOLDS
      result['recall']    /= NUM_FOLDS
      result['f1_score']  /= NUM_FOLDS
      print(f"Result of training on {train_attr_name} using {model_name} with {selector_name}")
      print(result)



In [None]:
for dataset, name in [(train_combine, "Original + HOG"), (train_features, "Original"), (train_hog_features, "HOG")]:
    evaluation_on_simple(name, 
                         dataset,   
                         "Logistic Regression", 
                         LogisticRegression(max_iter=200, multi_class='multinomial', solver='lbfgs', C = NUM_CLASSES),
                         300)
    evaluation_on_simple(name,  
                         dataset, 
                         "SVM", 
                         SVC(kernel = 'rbf', C = NUM_CLASSES), 
                         300)


## 3.3. Critical Analysis

In [None]:
#Evaluation 
from sklearn.model_selection import train_test_split
from sklearn.metrics import  confusion_matrix, ConfusionMatrixDisplay, accuracy_score, classification_report

svm = SVC(kernel = 'rbf', C = NUM_CLASSES)
selector = SelectKBest(score_func=mutual_info_classif, k = 300)

train_size = int(len(train_combine) * 0.8)
X_train, y_train = train_combine[0 : train_size], train_class[0 : train_size]
X_test,  y_test  = train_combine[train_size:],    train_class[train_size:]
X_train = preprocessing.scale(X_train)
X_test  = preprocessing.scale(X_test)
x_train_features = selector.fit_transform(X_train, y_train) 
x_test_features  = selector.transform(X_test)
svm.fit(x_train_features, y_train)
y_pred = svm.predict(x_test_features)


In [None]:
print("accuracy (SVM):", accuracy_score(y_pred, y_test))
cf = confusion_matrix(y_pred, y_test)
df_cm = pd.DataFrame(cf, index = classes,  columns = classes)

plt.figure(figsize = (20,20))
sns.heatmap(df_cm, annot=True)
plt.savefig('svm_mat.png')

In [None]:
### See the error on the image 
from sklearn.metrics import  classification_report
print(classification_report(y_test, y_pred))

In [None]:
wrong_pred = [] 
for i in range(len(y_test)): 
  if y_test[i] != y_pred[i] :
    img_path = os.path.join(train, f"img_00{train_size + i + 1}.jpg")
    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (32, 32))  # Resize to 32x32
    wrong_pred.append([image, y_pred[i], y_test[i]])

plt.figure(figsize=(25, 25))
for i, (image, pred, real) in enumerate(wrong_pred[:25]): 
    plt.subplot(5, 5, i + 1)
    plt.imshow(image)
    plt.axis('off')  # optional: turn off axis
    plt.title(f"Pred: {pred}, Real: {real}")
    
plt.tight_layout()
plt.plot()

## 3.4. Export to test set

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import  confusion_matrix, ConfusionMatrixDisplay, accuracy_score, classification_report

def get_result(model, model_name, num, train_data, test_data): 
    selector    =  SelectKBest(score_func=mutual_info_classif, k = num)
    train_data = preprocessing.scale(train_data)
    test_data  = preprocessing.scale(test_data)
    train_image = selector.fit_transform(train_data, train_class)
    test_image = selector.transform(test_data)
    model.fit(train_image, train_class) 
    predictions = model.predict(test_image) 
    result = test_metadata.copy(deep = False)
    result = result.drop(columns = ['image_path'])
    for i, row in tqdm(result.iterrows(),desc = f"Processing"): 
        result.loc[i, 'ClassId'] = predictions[i] 
    result['ClassId'] = result['ClassId'].astype(int)   
    result.to_csv(f'Naive_{model_name}.csv', index = False)




In [None]:
get_result(LogisticRegression(max_iter=200, multi_class='multinomial', solver='lbfgs', C = NUM_CLASSES), 
           "LogisticRegression", 
           300, 
           train_combine, 
           test_combine)
get_result(SVC(kernel = 'rbf', C = NUM_CLASSES), 
           "SVM",
           300, 
           train_combine, 
           test_combine)

## 4. Convolutional Neuron Network Model (ALexNet Architecture)

## 4.1. Model architecture

In [None]:
def createAlexNet1(num_classes : int): 
  model = Sequential() 
  model.add(Conv2D(32, (3, 3), activation  = 'relu', input_shape = INPUT_SHAPE))
  model.add(Conv2D(64, (3, 3), activation  = 'relu'))  
  model.add(MaxPooling2D((2, 2)))    
  model.add(Conv2D(128, (3, 3), activation  = 'relu'))    
  model.add(Conv2D(256, (3, 3), activation  = 'relu'))
  model.add(MaxPooling2D((2, 2))) 
  model.add(Flatten())
  model.add(Dense(512, activation = 'relu'))   
  model.add(Dropout(0.5)) # Reduce overfitting 
  model.add(Dense(num_classes, activation = 'softmax'))
  return model 

model = createAlexNet1(NUM_CLASSES)
model.summary()

In [None]:
#This model is based on AlexNet Architecture but adding batch normalization layer 
def createAlexNet2(num_classes : int): 
  model = Sequential() 
  model.add(Conv2D(32, (3, 3), activation  = 'relu', input_shape = INPUT_SHAPE))  
  model.add(Conv2D(64, (3, 3), activation  = 'relu'))    
  model.add(MaxPooling2D((2, 2)))
  model.add(BatchNormalization(axis = -1))    
  model.add(Conv2D(128, (3, 3), activation  = 'relu'))     
  model.add(Conv2D(256, (3, 3), activation  = 'relu'))    
  model.add(MaxPooling2D((2, 2)))    
  model.add(BatchNormalization(axis = -1))    
  model.add(Flatten())
  model.add(Dense(512, activation = 'relu'))
  model.add(BatchNormalization())    
  model.add(Dropout(0.5)) # Reduce overfitting 
  model.add(Dense(num_classes, activation = 'softmax'))
  return model 

model = createAlexNet2(NUM_CLASSES)
model.summary()

## 4.2. Evaluation using K-Fold

In [None]:
def plot_training_history(history):
    """
    Plots training history including accuracy and loss.
    """
    plt.figure(figsize=(10, 5))
    
    # Accuracy plot
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'])
    
    # Loss plot
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'])
    
    plt.tight_layout()
    plt.show()

In [None]:
def train_and_evaluate_kfold(creator, data_augmented = True): 
    kf = KFold(n_splits = NUM_FOLDS, shuffle = True, random_state = 42)
    
    result = {'accuracy'  : 0.0, 'precision' : 0.0, 'recall' : 0.0, 'f1_score' : 0.0} 
    
    for train_index, validation_index in kf.split(train_images): 
       train_image, train_label = train_images[train_index], train_labels[train_index]
       validation_image, validation_label = train_images[validation_index], train_labels[validation_index] 
       cnn_model = creator(NUM_CLASSES)
       cnn_model.compile(optimizer = 'adam', 
                         loss = 'categorical_crossentropy', 
                         metrics=['accuracy'])
       if data_augmented: 
           trainGenerator = aug.flow(train_image, train_label, batch_size = BATCH_SIZE) 
           history = cnn_model.fit(trainGenerator, 
                                   steps_per_epoch = 100,
                                   batch_size = BATCH_SIZE, 
                                   epochs = NUM_EPOCH,      
                                   validation_data = (validation_image / 255.0, validation_label))
       else: 
           history = cnn_model.fit(x = train_image,  
                                   y = train_label, 
                                   batch_size = BATCH_SIZE, 
                                   epochs = NUM_EPOCH,      
                                   validation_data = (validation_image / 255.0, validation_label)) 
        
       predictions  = cnn_model.predict(validation_image / 255.0)
       predictions  = np.argmax(predictions, axis = -1)
       ground_truth = np.argmax(validation_label, axis = -1)
       result['accuracy']  += accuracy_score(ground_truth, predictions)
       result['precision'] += precision_score(ground_truth, predictions, average = 'weighted')
       result['recall']    += recall_score(ground_truth, predictions,    average = 'weighted')
       result['f1_score']  += f1_score(ground_truth, predictions,        average = 'weighted')
    
    result['accuracy']  /= NUM_FOLDS
    result['precision'] /= NUM_FOLDS
    result['recall']    /= NUM_FOLDS
    result['f1_score']  /= NUM_FOLDS
    return result
   

In [None]:
result = train_and_evaluate_kfold(createAlexNet1, False)
result 

In [None]:
result = train_and_evaluate_kfold(createAlexNet2, False)
result 

In [None]:
result = train_and_evaluate_kfold(createAlexNet1)
result 

In [None]:
result = train_and_evaluate_kfold(createAlexNet2)
result 

## 4.3 - Export to test set 

In [None]:
def prediction_test(creator, num): 
    trainGenerator = aug.flow(train_images, train_labels, batch_size=BATCH_SIZE) 
    cnn_model = creator(NUM_CLASSES)
    cnn_model.compile(optimizer = 'adam', 
                      loss   = 'categorical_crossentropy', 
                      metrics=['accuracy'])
    history = cnn_model.fit(trainGenerator, 
                            batch_size = BATCH_SIZE, 
                            steps_per_epoch=100,  
                            epochs = NUM_EPOCH)
    test_images = [] 
    for i, row in tqdm(test_metadata.iterrows(),desc = f"Processing"): 
        img_path = os.path.join(test, row['image_path'])
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (32, 32))
        test_images.append(image)
    
    test_images = np.array(test_images)
    predictions = cnn_model.predict(test_images / 255.0)
    result = test_metadata.copy(deep = False)
    result = result.drop(columns = ['image_path'])
    for i, row in tqdm(result.iterrows(),desc = f"Processing"): 
        result.loc[i, 'ClassId'] = np.argmax(predictions[i]) 
    result['ClassId'] = result['ClassId'].astype(int)   
    result.to_csv(f'new_resultAlexNet{num}.csv', index = False)

In [None]:
prediction_test(createAlexNet1, 1)

In [None]:
prediction_test(createAlexNet2, 2)

## 4.4. Critical Analysis 

In [None]:
from sklearn.model_selection import train_test_split

def evaluation_on_hold_out(creator, num = 1, data_augmentation=True):
    train_image,  validation_image, train_label, validation_label = train_test_split(train_images, train_labels, test_size = 0.2,random_state = 42)
    cnn_model = creator(NUM_CLASSES)
    cnn_model.compile(optimizer='adam', 
                      loss='categorical_crossentropy', 
                      metrics=['accuracy'])

    if data_augmentation:
        trainGenerator = aug.flow(train_image, train_label, batch_size=BATCH_SIZE) 
        history = cnn_model.fit(trainGenerator, 
                                steps_per_epoch=100,
                                epochs=NUM_EPOCH,      
                                validation_data=(validation_image / 255.0, validation_label))
    else:
        history = cnn_model.fit(x=train_image / 255.0, 
                                y=train_label, 
                                batch_size=BATCH_SIZE, 
                                epochs=NUM_EPOCH,      
                                validation_data=(validation_image / 255.0, validation_label)) 

    plot_training_history(history)

    predictions = cnn_model.predict(validation_image / 255.0)
    predictions = np.argmax(predictions, axis=-1)
    ground_truth = np.argmax(validation_label, axis=-1)

    misclassify_instance = [i for i in range(len(predictions)) if predictions[i] != ground_truth[i]]

    # Confusion matrix
    plt.figure(figsize = (15, 15))
    cf = confusion_matrix(ground_truth, predictions)
    df_cm = pd.DataFrame(cf, index=classes, columns=classes)
    plt.figure(figsize = (20,20))
    sns.heatmap(df_cm, annot=True)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.savefig(f"confusion_matrix_{num}.png")  
    plt.show()
    # Show misclassified images (up to 25)
    plt.figure(figsize=(12, 12))
    wrong_img = [] 
    for i in range(min(25, len(misclassify_instance))): 
        index = misclassify_instance[i]
        plt.subplot(5, 5, i + 1)
        plt.imshow(validation_image[index])
        plt.axis('off')
        plt.title(f"Pred: {predictions[index]}, Real: {ground_truth[index]}")
        wrong_img.append([validation_image[index], ground_truth[index]])
    plt.tight_layout()
    plt.show()
    return wrong_img

In [None]:
wrong_img1 = evaluation_on_hold_out(createAlexNet1, 1)

In [None]:
wrong_img2 = evaluation_on_hold_out(createAlexNet2, 2)

## 5. Ensemble Learning 

## 5.1. Image transformation 

In [None]:
def apply_filters(img):
    bilateral = cv2.bilateralFilter(img, 9, 75, 75)
    return bilateral


In [None]:
def smoothing(image): 
 kernel = np.ones((5 , 5), np.float32) / 25.0
 imgSmooth = cv2.filter2D(image,-1, kernel)
 return imgSmooth

In [None]:
import random

def adjust_gamma(image):
    gamma = random.uniform(1.5, 2.0)
    invGamma = 1.0 / gamma
    table = np.array([((i / 255.0) ** invGamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
    return cv2.LUT(image, table)

In [None]:
from tensorflow.keras.utils import to_categorical

train_labels = []
train_images, bilateral_images, smoothing_images, gamma_images = [], [], [], [] 
test_images, test_bilateral_images, test_smoothing_images, test_gamma_images = [], [], [], [] 

for i, row in tqdm(train_metadata.iterrows(), desc=f"Processing"):
    img_path = os.path.join(train, row['image_path'])
    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    bilateral_image = apply_filters(image)
    smooth_image = smoothing(image) 
    gamma_image  = adjust_gamma(image)
    
    image           = cv2.resize(image, (32, 32))
    bilateral_image = cv2.resize(bilateral_image, (32, 32))
    smooth_image    = cv2.resize(smooth_image, (32, 32))
    gamma_image     = cv2.resize(gamma_image, (32, 32))

    train_images.append(image)
    bilateral_images.append(bilateral_image)
    smoothing_images.append(smooth_image)
    gamma_images.append(gamma_image)
    train_labels.append(row['ClassId'])

# Convert lists to numpy arrays
train_images     = np.array(train_images)
bilateral_images = np.array(bilateral_images)
smoothing_images = np.array(smoothing_images)
gamma_images     = np.array(gamma_images)
train_labels = to_categorical(train_labels)

for i, row in tqdm(test_metadata.iterrows(),desc = f"Processing"): 
    img_path = os.path.join(test, row['image_path'])
    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    bilateral_image = apply_filters(image)
    smooth_image = smoothing(image)
    gamma_image  = adjust_gamma(image)

    image           = cv2.resize(image, (32, 32))
    bilateral_image = cv2.resize(bilateral_image, (32, 32))
    smooth_image    = cv2.resize(smooth_image, (32, 32))
    gamma_image     = cv2.resize(gamma_image, (32, 32))
    
    test_images.append(image)
    test_bilateral_images.append(bilateral_image)
    test_smoothing_images.append(smooth_image)
    test_gamma_images.append(gamma_image)

test_images           = np.array(test_images)
test_bilateral_images = np.array(test_bilateral_images) 
test_smoothing_images = np.array(test_smoothing_images) 
test_gamma_images     = np.array(test_gamma_images)

## 5.2 Evaluation using K-Fold

In [None]:
from sklearn.model_selection import KFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import numpy as np

def ensemble_method(creator): 
    kf = KFold(n_splits=NUM_FOLDS, shuffle=True, random_state=42)
    result_log = {'accuracy'  : 0.0, 'precision' : 0.0, 'recall' : 0.0, 'f1_score' : 0.0} 
    result_svm = {'accuracy'  : 0.0, 'precision' : 0.0, 'recall' : 0.0, 'f1_score' : 0.0} 
    # Only use the selected image sets
    image_sets = [
        (train_images, aug),         
        (bilateral_images, aug),
        (gamma_images, aug_1),
    ]  
    for train_index, validation_index in kf.split(train_images): 
        train_label = train_labels[train_index]
        validation_label = train_labels[validation_index]

        base_train_result = [] 
        base_valid_result = [] 
        
        for (image_set, aug_method) in image_sets: 
            train_input = image_set[train_index]
            validation_input = image_set[validation_index]
            
            cnn_model = creator(NUM_CLASSES)
            cnn_model.compile(optimizer='adam', 
                              loss='categorical_crossentropy', 
                              metrics=['accuracy'])
            
            trainGenerator = aug_method.flow(train_input, train_label, batch_size=BATCH_SIZE)
            history = cnn_model.fit(trainGenerator,
                                    steps_per_epoch=100,
                                    epochs=NUM_EPOCH, 
                                    validation_data = (validation_input / 255.0, validation_label))
            
            base_train_result.append(cnn_model.predict(train_input      / 255.0))
            base_valid_result.append(cnn_model.predict(validation_input / 255.0))
        
        # Combine outputs from base models
        train_features = np.hstack(base_train_result)
        valid_features = np.hstack(base_valid_result)
        train_label_int = np.argmax(train_label, axis=1)
        valid_label_int = np.argmax(validation_label, axis=1)
        
        ground_truth = valid_label_int

        # Logistic Regression 
        log_reg = LogisticRegression(multi_class='multinomial', solver='lbfgs', C=NUM_CLASSES, max_iter=1000)
        log_reg.fit(train_features, train_label_int)
        predictions = log_reg.predict(valid_features)
        print("Accuracy of logistic regression:", accuracy_score(ground_truth, predictions))
        result_log['accuracy']  += accuracy_score(ground_truth, predictions)
        result_log['precision'] += precision_score(ground_truth, predictions, average = 'weighted')
        result_log['recall']    += recall_score(ground_truth, predictions,    average = 'weighted')
        result_log['f1_score']  += f1_score(ground_truth, predictions,        average = 'weighted')

        # Support vector machine
        
        svm_model = SVC(kernel = 'rbf', C = NUM_CLASSES)
        svm_model.fit(train_features, train_label_int)
        predictions = svm_model.predict(valid_features)
        print("Accuracy of SVM:", accuracy_score(ground_truth, predictions))
        result_svm['accuracy']  += accuracy_score(ground_truth, predictions)
        result_svm['precision'] += precision_score(ground_truth, predictions, average = 'weighted')
        result_svm['recall']    += recall_score(ground_truth, predictions,    average = 'weighted')
        result_svm['f1_score']  += f1_score(ground_truth, predictions,        average = 'weighted')
    result_log['accuracy']  /= NUM_FOLDS
    result_log['precision'] /= NUM_FOLDS
    result_log['recall']    /= NUM_FOLDS
    result_log['f1_score']  /= NUM_FOLDS
    result_svm['accuracy']  /= NUM_FOLDS
    result_svm['precision'] /= NUM_FOLDS
    result_svm['recall']    /= NUM_FOLDS
    result_svm['f1_score']  /= NUM_FOLDS
    return result_log, result_svm 

In [None]:
result_log, result_svm = ensemble_method(createAlexNet1)

In [None]:
result_log

In [None]:
result_svm

## 5.3. Export to Output 

In [None]:
image_sets = [(train_images, test_images, aug), 
              (gamma_images, test_gamma_images, aug_1), 
              (bilateral_images, test_bilateral_images, aug)] 

def train_and_test(creator, num = 1):
    base_train_result = [] 
    base_tests_result = [] 
    
    for (train_input, tests_input, aug_method) in image_sets: 
        cnn_model = creator(NUM_CLASSES)
        cnn_model.compile(optimizer='adam', 
                          loss='categorical_crossentropy', 
                          metrics=['accuracy'])
        trainGenerator = aug_method.flow(train_input, train_labels, batch_size = BATCH_SIZE)
        history = cnn_model.fit(trainGenerator,
                                steps_per_epoch = 100,
                                epochs = NUM_EPOCH)
        
        base_train_result.append(cnn_model.predict(train_input / 255.0))
        base_tests_result.append(cnn_model.predict(tests_input / 255.0))
    
    # Combine outputs from base models
    train_features = np.hstack(base_train_result)
    tests_features = np.hstack(base_tests_result)
    train_label_int = np.argmax(train_labels, axis=1)
    
    # Logistic Regression 
    log_reg = LogisticRegression(multi_class='multinomial', solver='lbfgs', C=NUM_CLASSES, max_iter=1000)
    log_reg.fit(train_features, train_label_int)
    predictions = log_reg.predict(tests_features)
    result = test_metadata.copy(deep = False)
    result = result.drop(columns = ['image_path'])
    for i, row in tqdm(result.iterrows(),desc = f"Processing"): 
        result.loc[i, 'ClassId'] = predictions[i]
    result['ClassId'] = result['ClassId'].astype(int)   
    result.to_csv(f'result_ensemble_AlexNet{num}_with_log.csv', index = False)
    # SVM 
    svm_model = SVC(kernel = 'rbf', C = NUM_CLASSES)
    svm_model.fit(train_features, train_label_int)
    predictions = svm_model.predict(tests_features)
    result = test_metadata.copy(deep = False)
    result = result.drop(columns = ['image_path'])
    for i, row in tqdm(result.iterrows(),desc = f"Processing"): 
        result.loc[i, 'ClassId'] = predictions[i]
    result['ClassId'] = result['ClassId'].astype(int)   
    result.to_csv(f'result_ensemble_AlexNet{num}_with_svm.csv', index = False)
    


In [None]:
train_and_test(createAlexNet1)

## 5.4. Critical Analysis 

In [None]:
from sklearn.model_selection import KFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import numpy as np

def ensemble_method_evaluate(creator): 
    kf = KFold(n_splits=NUM_FOLDS, shuffle=True, random_state=42)

    # Only use the selected image sets
    image_sets = [
        (train_images, aug),        
        (bilateral_images, aug), 
        (gamma_images, aug_1)
    ]  
    for train_index, validation_index in kf.split(train_images):
        validation_image = train_images[validation_index]
        train_label = train_labels[train_index]
        validation_label = train_labels[validation_index]

        base_train_result = [] 
        base_valid_result = [] 
        
        for (image_set, aug_method) in image_sets: 
            train_input = image_set[train_index]
            validation_input = image_set[validation_index]
            
            cnn_model = creator(NUM_CLASSES)
            cnn_model.compile(optimizer='adam', 
                              loss='categorical_crossentropy', 
                              metrics=['accuracy'])
            
            trainGenerator = aug_method.flow(train_input, train_label, batch_size=BATCH_SIZE)
            history = cnn_model.fit(trainGenerator,
                                    steps_per_epoch=100,
                                    epochs=NUM_EPOCH, 
                                    validation_data = (validation_input / 255.0, validation_label))
            
            base_train_result.append(cnn_model.predict(train_input      / 255.0))
            base_valid_result.append(cnn_model.predict(validation_input / 255.0))
        
        # Combine outputs from base models
        train_features = np.hstack(base_train_result)
        valid_features = np.hstack(base_valid_result)
        train_label_int = np.argmax(train_label, axis=1)
        valid_label_int = np.argmax(validation_label, axis=1)
        
        # Meta-classifier
        log_reg = LogisticRegression(multi_class='multinomial', solver='lbfgs', C=NUM_CLASSES, max_iter=1000)
        log_reg.fit(train_features, train_label_int)
        predictions = log_reg.predict(valid_features)
        ground_truth = valid_label_int
        print("Accuracy:", accuracy_score(valid_label_int, predictions))
        
        misclassify_instance = [i for i in range(len(predictions)) if predictions[i] != valid_label_int[i]]
        correct_instance = [i for i in range(len(predictions)) if predictions[i] == valid_label_int[i]]
        plt.figure(figsize = (15, 15))
        ground_truth = valid_label_int
        cf = confusion_matrix(ground_truth, predictions)
        df_cm = pd.DataFrame(cf, index=classes, columns=classes)
        plt.figure(figsize = (20,20))
        sns.heatmap(df_cm, annot=True)
        plt.title("Confusion Matrix")
        plt.xlabel("Predicted")
        plt.ylabel("True")
        plt.savefig(f"confusion_matrix_ensemble.png")  
        plt.show()
        plt.figure(figsize=(12, 12))
        wrong_img = [] 
        for i in range(min(25, len(correct_instance))): 
            index = correct_instance[i]
            plt.subplot(5, 5, i + 1)
            plt.imshow(validation_image[index])
            plt.axis('off')
            plt.title(f"Pred: {predictions[index]}, Real: {ground_truth[index]}")
            wrong_img.append([validation_image[index], ground_truth[index]])
        plt.tight_layout()
        plt.show()
        # Show misclassified images (up to 25)
        plt.figure(figsize=(12, 12))
        wrong_img = [] 
        for i in range(min(25, len(misclassify_instance))): 
            index = misclassify_instance[i]
            plt.subplot(5, 5, i + 1)
            plt.imshow(validation_image[index])
            plt.axis('off')
            plt.title(f"Pred: {predictions[index]}, Real: {ground_truth[index]}")
            wrong_img.append([validation_image[index], ground_truth[index]])
        plt.tight_layout()
        plt.show()
        
        



In [None]:
ensemble_method_evaluate(createAlexNet1)

## 6. Convolutional Neural Networks Visualization

In [None]:
from sklearn.model_selection import train_test_split

def train_visualize(creator):
    train_image,  validation_image, train_label, validation_label = train_test_split(train_images, train_labels, test_size = 0.2,random_state = 42)
    cnn_model = creator(NUM_CLASSES)
    cnn_model.compile(optimizer='adam', 
                      loss='categorical_crossentropy', 
                      metrics=['accuracy'])

    trainGenerator = aug.flow(train_image, train_label, batch_size=BATCH_SIZE) 
    history = cnn_model.fit(trainGenerator, 
                            steps_per_epoch=100,
                            epochs=NUM_EPOCH,      
                            validation_data=(validation_image / 255.0, validation_label))

    cnn_model.save('model.h5')


In [None]:
train_visualize(createAlexNet1)

In [None]:
from tensorflow.keras.models import load_model
from keras import models
import matplotlib.pyplot as plt
import numpy as np

model = load_model('model.h5')

img_tensor = train_images[2] / 255.0
img_tensor = np.expand_dims(img_tensor, axis=0)
plt.imshow(img_tensor[0])



layer_outputs = [layer.output for layer in model.layers[:8]]
activation_model = models.Model(inputs = model.layers[0].input, outputs=layer_outputs)

activations = activation_model.predict(img_tensor)

In [None]:
first_layer_activation = activations[0]
plt.figure(figsize=(12, 12))
plt.suptitle("First Layer Activation", fontsize=16)
for i in range(10):
    plt.subplot(5, 5, i + 1)
    plt.imshow(first_layer_activation[0, :, :, i], cmap='viridis')
    plt.title(f"{i}th Channel Activation")
    plt.axis('off')

plt.tight_layout()
plt.savefig('CNN_OUTPUT_LAYER_1.png')
plt.show()

In [None]:
fourth_layer_activation = activations[3]
plt.figure(figsize=(12, 12))
plt.suptitle("Fourth Layer Activation", fontsize=16)
for i in range(10):
    plt.subplot(5, 5, i + 1)
    plt.imshow(fourth_layer_activation[0, :, :, i], cmap='viridis')
    plt.title(f"{i}th Channel Activation")
    plt.axis('off')

plt.tight_layout()
plt.savefig('CNN_OUTPUT_LAYER_4.png')
plt.show()