# ItalianSignNet study

### Importing Required Libraries

In [None]:
!pip cache purge
!pip install -q tensorflow-model-optimization
!pip install tf-keras

In [None]:
import numpy as np
import pandas as pd
import os
import cv2
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_model_optimization as tfmot
#import tf_keras as keras
import tensorflow.keras as keras
from PIL import Image
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import accuracy_score

np.random.seed(42)
%load_ext tensorboard

### Boiterplate

In [None]:
data_dir = '/kaggle/input/dataset-2025-04-23-eps-100-changed/dataset_20250423_200322_eps_100_changed/'
train_path = f'{data_dir}/train'
test_path = f'{data_dir}/test'
val_set = f'{data_dir}/validation'

# Resizing the images to 30x30x3
IMG_HEIGHT = 30
IMG_WIDTH = 30
channels = 3

NUM_CATEGORIES = len(os.listdir(train_path))
lr = 0.001
epochs = 30

classes = { 0:'Speed limit (20km/h)',
            1:'Speed limit (30km/h)', 
            2:'Speed limit (50km/h)', 
            3:'Speed limit (60km/h)', 
            4:'Speed limit (70km/h)', 
            5:'Speed limit (80km/h)', 
            6:'End of speed limit (80km/h)', 
            7:'Speed limit (100km/h)', 
            8:'Speed limit (120km/h)', 
            9:'No passing', 
            10:'No passing veh over 3.5 tons', 
            11:'Right-of-way at intersection', 
            12:'Priority road', 
            13:'Yield', 
            14:'Stop', 
            15:'No vehicles', 
            16:'Veh > 3.5 tons prohibited', 
            17:'No entry', 
            18:'General caution', 
            19:'Dangerous curve left', 
            20:'Dangerous curve right', 
            21:'Double curve', 
            22:'Bumpy road', 
            23:'Slippery road', 
            24:'Road narrows on the right', 
            25:'Road work', 
            26:'Traffic signals', 
            27:'Pedestrians', 
            28:'Children crossing', 
            29:'Bicycles crossing', 
            30:'Beware of ice/snow',
            31:'Wild animals crossing', 
            32:'End speed + passing limits', 
            33:'Turn right ahead', 
            34:'Turn left ahead', 
            35:'Ahead only', 
            36:'Go straight or right', 
            37:'Go straight or left', 
            38:'Keep right', 
            39:'Keep left', 
            40:'Roundabout mandatory', 
            41:'End of no passing', 
            42:'End no passing veh > 3.5 tons' }

### Visualizing The Dataset

In [None]:
folders = os.listdir(train_path)

train_number = []
class_num = []

for folder in folders:
    train_files = os.listdir(train_path + '/' + folder)
    train_number.append(len(train_files))
    class_num.append(classes[int(folder)])
    
# Sorting the dataset on the basis of number of images in each class
zipped_lists = zip(train_number, class_num)
sorted_pairs = sorted(zipped_lists)
tuples = zip(*sorted_pairs)
train_number, class_num = [ list(tuple) for tuple in  tuples]

# Plotting the number of images in each class
plt.figure(figsize=(10,5))  
plt.bar(class_num, train_number)
plt.xticks(class_num, rotation='vertical')
plt.show()

## Collecting the Training Data

In [None]:
image_data = []
image_labels = []

for i in range(NUM_CATEGORIES):
    path = data_dir + '/train/' + f"{i:02d}"

    # Skip folder of label that not exists 
    if not os.path.isdir(path):
        continue
    images = os.listdir(path)

    for img in images:
        try:
            image = cv2.imread(path + '/' + img)
            image_fromarray = Image.fromarray(image, 'RGB')
            resize_image = image_fromarray.resize((IMG_HEIGHT, IMG_WIDTH))
            image_data.append(np.array(resize_image))
            image_labels.append(f"{i:02d}")
        except:
            print("Error in " + img)

# Changing the list to numpy array
image_data = np.array(image_data)
image_labels = np.array(image_labels)

print(image_data.shape, image_labels.shape)

### Shuffling the training data

In [None]:
shuffle_indexes = np.arange(image_data.shape[0])
np.random.shuffle(shuffle_indexes)
image_data = image_data[shuffle_indexes]
image_labels = image_labels[shuffle_indexes]

## Collecting validation set

In [None]:
X_train = image_data.astype('float32') / 255.

val_data = []
val_labels = []
for label_name in os.listdir(val_set):
    label_dir = os.path.join(val_set, label_name)
    if not os.path.isdir(label_dir):
        continue
    for img_file in os.listdir(label_dir):
        img_path = os.path.join(label_dir, img_file)
        img = cv2.imread(img_path)
        img = Image.fromarray(img, 'RGB').resize((IMG_HEIGHT, IMG_WIDTH))
        val_data.append(np.array(img))
        val_labels.append(int(label_name))

X_val = np.array(val_data, dtype='float32') / 255.

print("X_train.shape", X_train.shape)
print("X_val.shape",   X_val.shape)

## One hot encoding the labels

In [None]:
y_train = keras.utils.to_categorical(image_labels, NUM_CATEGORIES)
val_labels = np.array(val_labels) 
y_val = keras.utils.to_categorical(val_labels, NUM_CATEGORIES)
print("y_train.shape", y_train.shape)
print("y_val.shape",   y_val.shape)

## Making the model

In [None]:
model = keras.models.Sequential([ 
    keras.layers.Conv2D(filters=16, kernel_size=(3,3), activation='relu', input_shape=(IMG_HEIGHT,IMG_WIDTH,channels)),
    keras.layers.Conv2D(filters=32, kernel_size=(3,3), activation='relu'),
    keras.layers.MaxPool2D(pool_size=(2, 2)),
    keras.layers.BatchNormalization(axis=-1),
    
    keras.layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu'),
    keras.layers.Conv2D(filters=128, kernel_size=(3,3), activation='relu'),
    keras.layers.MaxPool2D(pool_size=(2, 2)),
    keras.layers.BatchNormalization(axis=-1),
    
    keras.layers.Flatten(),
    keras.layers.Dense(512, activation='relu'),
    keras.layers.BatchNormalization(),
    keras.layers.Dropout(rate=0.5),
    
    keras.layers.Dense(43, activation='softmax')
])
model.summary()

In [None]:
def improved_model(input_shape=(IMG_HEIGHT,IMG_WIDTH,channels), num_classes=43):
    inputs = keras.Input(shape=input_shape)
    x = keras.layers.SeparableConv2D(32, (3,3), activation='relu', padding='same')(inputs)
    x = keras.layers.BatchNormalization()(x)
    # Blocco residual 1
    res = x
    x = keras.layers.SeparableConv2D(32,(3,3),activation='relu',padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.Add()([x, res])
    x = keras.layers.MaxPool2D()(x)

    # Blocco residual 2
    res = x
    x = keras.layers.SeparableConv2D(32,(3,3),activation='relu',padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.SeparableConv2D(32,(3,3),activation='relu',padding='same')(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.Add()([x, res])
    x = keras.layers.MaxPool2D()(x)

    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dense(128, activation='relu')(x)
    x = keras.layers.Dropout(0.5)(x)
    outputs = keras.layers.Dense(num_classes, activation='softmax')(x)

    model = keras.Model(inputs, outputs, name="improved_cnn")
    return model

model_imp = improved_model()
model_imp.summary()

In [None]:
model_slim = keras.models.Sequential([
    keras.layers.Conv2D(16,(3,3),activation='relu',input_shape=(IMG_HEIGHT,IMG_WIDTH,channels),padding='same'),
    keras.layers.MaxPool2D(),
    keras.layers.Conv2D(32,(3,3),activation='relu',padding='same'),
    keras.layers.MaxPool2D(),
    keras.layers.Conv2D(64,(3,3),activation='relu',padding='same'),
    keras.layers.MaxPool2D(),
    keras.layers.GlobalAveragePooling2D(),
    keras.layers.Dense(43, activation='softmax')
], name="ultra_light_cnn")

model_slim.summary()

### MobileNetV2

In [None]:
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras import layers, models

IMG_HEIGHT_MOBILENETV2 = 32
IMG_WIDTH_MOBILENETV2 = 32

def get_X_train_MobileNetV2():
    image_data = []
    image_labels = []
    
    for i in range(NUM_CATEGORIES):
        path = data_dir + '/train/' + f"{i:02d}"
    
        # Skip folder of label that not exists 
        if not os.path.isdir(path):
            continue
        images = os.listdir(path)
    
        for img in images:
            try:
                image = cv2.imread(path + '/' + img)
                image_fromarray = Image.fromarray(image, 'RGB')
                resize_image = image_fromarray.resize((IMG_HEIGHT_MOBILENETV2, IMG_WIDTH_MOBILENETV2))
                image_data.append(np.array(resize_image))
                image_labels.append(f"{i:02d}")
            except:
                print("Error in " + img)
    
    # Changing the list to numpy array
    image_data = np.array(image_data)
    image_labels = np.array(image_labels)
    return image_data.astype('float32') / 255. , image_labels

def get_X_val_MobileNetV2():
    val_data = []
    val_labels = []
    for label_name in os.listdir(val_set):
        label_dir = os.path.join(val_set, label_name)
        if not os.path.isdir(label_dir):
            continue
        for img_file in os.listdir(label_dir):
            img_path = os.path.join(label_dir, img_file)
            img = cv2.imread(img_path)
            img = Image.fromarray(img, 'RGB').resize((IMG_HEIGHT_MOBILENETV2, IMG_WIDTH_MOBILENETV2))
            val_data.append(np.array(img))
            val_labels.append(int(label_name))
    
    X_val = np.array(val_data, dtype='float32') / 255.
    return X_val, val_labels

X_train_MobileNetV2, image_labels_MobileNetV2 = get_X_train_MobileNetV2()
X_val_MobileNetV2, val_labels_MobileNetV2 = get_X_val_MobileNetV2()

y_train_MobileNetV2 = keras.utils.to_categorical(image_labels_MobileNetV2, NUM_CATEGORIES)
val_labels_MobileNetV2 = np.array(val_labels) 
y_val_MobileNetV2 = keras.utils.to_categorical(val_labels_MobileNetV2, NUM_CATEGORIES)

base = MobileNetV2(input_shape=(IMG_HEIGHT_MOBILENETV2,IMG_WIDTH_MOBILENETV2,channels), include_top=False, weights=None)
x = base.output
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(43, activation='softmax')(x)
model_mnv2 = models.Model(inputs=base.input, outputs=outputs, name="mobilenet_v2")

model_mnv2.summary()

## Augmenting the data and training the model

In [None]:
from tensorflow.keras.callbacks import EarlyStopping
from keras.callbacks import LearningRateScheduler 
from keras.callbacks import ModelCheckpoint

# Data augmentation
aug = ImageDataGenerator(
    rotation_range=10,
    zoom_range=0.15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.15,
    horizontal_flip=False,
    vertical_flip=False,
    fill_mode="nearest"
)
augmented_train = aug.flow(X_train, y_train, batch_size=32)

# Monitor learning by validation accuracy
early_stop = EarlyStopping(
    monitor='val_accuracy',
    patience=10,                
    restore_best_weights=True,
    verbose=1                 
)

checkpoint = ModelCheckpoint('model.weights.h5', save_best_only=True, save_weights_only=True, monitor='val_loss', mode='min')

def lr_decay(epoch):
    return lr * (0.5 ** (epoch // (epochs * 0.5)))

if os.path.isfile('/kaggle/input/28-04-25-weights/model.weights.h5'):
    model.load_weights('/kaggle/input/28-04-25-weights/model.weights.h5')
else:
    opt = Adam(learning_rate=lr)
    model.compile(
        loss='categorical_crossentropy',
        optimizer=opt,
        metrics=['accuracy']
    )
    
    # Training con EarlyStopping
    history = model.fit(
        aug.flow(X_train, y_train, batch_size=32),
        epochs=epochs,
        validation_data=(X_val, y_val),
        callbacks=[early_stop, LearningRateScheduler(lr_decay), checkpoint]
    )

In [None]:
opt = Adam(learning_rate=lr)

model.compile(
    loss='categorical_crossentropy',
    optimizer=opt,
    metrics=['accuracy']
)

# Training con EarlyStopping
history_model = model.fit(
    augmented_train,
    epochs=epochs,
    validation_data=(X_val, y_val),
    callbacks=[early_stop, LearningRateScheduler(lr_decay), checkpoint]
)
model.save_weights('model.weights.h5')

In [None]:
opt = Adam(learning_rate=lr)

model_imp.compile(
    loss='categorical_crossentropy',
    optimizer=opt,
    metrics=['accuracy']
)

# Training con EarlyStopping
history_model_imp = model_imp.fit(
    augmented_train,
    epochs=50,
    validation_data=(X_val, y_val),
    callbacks=[LearningRateScheduler(lr_decay), checkpoint]
)
model_imp.save_weights('model_imp.weights.h5')

In [None]:
opt = Adam(learning_rate=lr)

model_slim.compile(
    loss='categorical_crossentropy',
    optimizer=opt,
    metrics=['accuracy']
)

# Training con EarlyStopping
history_model_slim = model_slim.fit(
    augmented_train,
    epochs=50,
    validation_data=(X_val, y_val),
    callbacks=[LearningRateScheduler(lr_decay), checkpoint]
)
model_slim.save_weights('model_slim.weights.h5')

In [None]:
opt = Adam(learning_rate=lr)

model_mnv2.compile(
    loss='categorical_crossentropy',
    optimizer=opt,
    metrics=['accuracy']
)

# Training con EarlyStopping
history_model_mnv2 = model_mnv2.fit(
    aug.flow(X_train_MobileNetV2, y_train_MobileNetV2, batch_size=32),
    epochs=30,
    validation_data=(X_val_MobileNetV2, y_val_MobileNetV2),
    callbacks=[LearningRateScheduler(lr_decay), checkpoint]
)
model_mnv2.save_weights('model_mnv2.weights.h5')

In [None]:
fig, axs = plt.subplots(1, 4, figsize=(24, 5))

histories = [history_model, history_model_imp, history_model_slim, history_model_mnv2]
models = [model, model_imp, model_slim, model_mnv2]
titles = ['Training original model', 'Training improved model', 'Training slim model', 'MobileNetV2']

for i, (hist, model, title) in enumerate(zip(histories, models, titles)):
    pd.DataFrame(hist.history).plot(ax=axs[i])
    axs[i].set_title(f"{title}\nParams: {model.count_params():,}")
    axs[i].grid(True)
    axs[i].set_ylim(0, 1)

plt.tight_layout()
plt.show()

## Evaluating the model

In [None]:
pd.DataFrame(history.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

# Quantization Optimitation 

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT] # default to 8bit
quant_model = converter.convert()

_, quant_model_file = tempfile.mkstemp('.tflite')

with open(quant_model_file, 'wb') as f:
  f.write(quant_model)

print('Saved quant TFLite model to:', quant_model_file)

In [None]:
def get_gzipped_model_size(file):
  # Returns size of gzipped model, in bytes.
  import os
  import zipfile

  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)

In [None]:
print("Size of gzipped baseline Keras model:\t%.2f bytes" % (get_gzipped_model_size(model_file)))
print("Size of gzipped quant Keras model:\t%.2f bytes" % (get_gzipped_model_size(quant_model_file)))

## Loading the test data and running the predictions

In [None]:
import glob

imgs = []
labels = []

for label_name in os.listdir(test_path):
    label_dir = os.path.join(test_path, label_name)
    if os.path.isdir(label_dir):
        # Cerca immagini dentro la cartella della label
        for img_path in glob.glob(os.path.join(label_dir, '*')):
            imgs.append(img_path)
            labels.append(label_name)

labels = np.array(labels).astype(int)

In [None]:
def get_predictions(model):
    data = []

    for img in imgs:
        image = cv2.imread(img)
        image_fromarray = Image.fromarray(image, 'RGB')
        resize_image = image_fromarray.resize((IMG_HEIGHT, IMG_WIDTH))
        data.append(np.array(resize_image))
    X_test = np.array(data)
    X_test = X_test/255
    
    pred = np.argmax(model.predict(X_test), axis=-1)
    return pred, X_test

In [None]:
#Accuracy with the test data
print('Test Data accuracy original model: ',accuracy_score(labels, get_predictions(model)[0])*100)
print('Test Data accuracy improved model: ',accuracy_score(labels, get_predictions(model_imp)[0])*100)
print('Test Data accuracy improved model: ',accuracy_score(labels, get_predictions(model_slim)[0])*100)

# Weight Clustering

In [None]:
cluster_weights = tfmot.clustering.keras.cluster_weights
CentroidInitialization = tfmot.clustering.keras.CentroidInitialization

clustering_params = {
  'number_of_clusters': 16,
  'cluster_centroids_init': CentroidInitialization.LINEAR
}

# Cluster a whole model
clustered_model = cluster_weights(model_imp, **clustering_params)

# Use smaller learning rate for fine-tuning clustered model
opt = keras.optimizers.Adam(learning_rate=1e-5)

clustered_model.compile(
  loss='categorical_crossentropy',
  optimizer=opt,
  metrics=['accuracy'])

clustered_model.summary()


In [None]:
# Fine-tune model
clustered_model.fit(
  aug.flow(X_train, y_train, batch_size=500),
  epochs=1,
  validation_data=(X_val, y_val)
)

## Visualizing the confusion matrix

In [None]:
from sklearn.metrics import confusion_matrix
cf = confusion_matrix(labels,  get_predictions(model_imp))

In [None]:
import seaborn as sns
df_cm = pd.DataFrame(cf, index = classes,  columns = classes)
plt.figure(figsize = (20,20))
sns.heatmap(df_cm, annot=True)

## Classification report

In [None]:
from sklearn.metrics import classification_report
print(classification_report(labels, get_predictions(model)[0]))
print(classification_report(labels, get_predictions(model_imp)[0]))
print(classification_report(labels, get_predictions(model_slim)[0]))

## Predictions on Test Data

In [None]:
plt.figure(figsize = (25, 25))

pred, X_test = get_predictions(model_imp)

start_index = 0
for i in range(25):
    plt.subplot(5, 5, i + 1)
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])
    prediction = pred[start_index + i]
    actual = labels[start_index + i]
    col = 'g'
    if prediction != actual:
        col = 'r'
    plt.xlabel('Actual={} || Pred={}'.format(actual, prediction), color = col)
    plt.imshow(X_test[start_index + i])
plt.show()