In [None]:
import tensorflow as tf
print(tf.__version__)
#number of gpus available
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [30]:
# Core scientific stack
import numpy as np
import matplotlib.pyplot as plt

# Deep learning framework 
import keras
from keras import Sequential
from keras import layers
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.initializers import HeUniform
from keras.models import load_model
from keras import activations

# Evaluation tools
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import seaborn as sns


**Load The Preprocessed Data**

In [None]:
X_train = np.load("/mnt/g/which one is it/data/GTSRB/processed/X_train.npy", allow_pickle=True)
y_train = np.load("/mnt/g/which one is it/data/GTSRB/processed/y_train.npy", allow_pickle=True)
X_val   = np.load("/mnt/g/which one is it/data/GTSRB/processed/X_val.npy", allow_pickle=True)
y_val   = np.load("/mnt/g/which one is it/data/GTSRB/processed/y_val.npy", allow_pickle=True)
X_test  = np.load("/mnt/g/which one is it/data/GTSRB/processed/X_test.npy", allow_pickle=True)
y_test  = np.load("/mnt/g/which one is it/data/GTSRB/processed/y_test.npy", allow_pickle=True)

print("Train:", X_train.shape, y_train.shape)
print("Val:", X_val.shape, y_val.shape)
print("Test:", X_test.shape, y_test.shape)

**Random Seed**

In [32]:
# Set seed for reproducibility
import random
seed_value = 7331
random.seed(seed_value)
np.random.seed(seed_value)
tf.random.set_seed(seed_value)

**Model Path and Load**

In [None]:
keras_model_path = '/mnt/g/which one is it/models/best_model.keras'
cnn_model = load_model(keras_model_path)
cnn_model.summary()




**Feature Layer Name**

In [35]:
feature_layer_name = 'batch_normalization_4'  

**Custom positional embedding layer**

In [None]:
# Positional embedding that Keras can track
@keras.utils.register_keras_serializable(package="Custom", name="AddPositionEmbedding")
class AddPositionEmbedding(layers.Layer):
    def __init__(self, num_patches, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.num_patches = num_patches
        self.embed_dim   = embed_dim
        self.pos = self.add_weight(
            name="pos_embedding",
            shape=(1, num_patches, embed_dim),
            initializer="random_normal",
            trainable=True)

    def call(self, tokens):
        return tokens + self.pos

    def get_config(self):
        config = super().get_config()
        config.update({
            "num_patches": self.num_patches,
            "embed_dim":   self.embed_dim,
        })
        return {**config}

    @classmethod
    def from_config(cls, config):
        return cls(**config)

**SwiGLU Layer**

In [None]:
@keras.utils.register_keras_serializable(package="Custom", name="SwiGLU")       
class SwiGLULayer(layers.Layer):
    def __init__(self, units, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.dense_gate = layers.Dense(units, name="dense_gate")
        self.dense_linear = layers.Dense(units, name="dense_linear")
        
    def call(self, x):
        # Apply the first linear transformation and the sigmoid linear unit (SiLU) activation
        gate = self.dense_gate(x)
        activated_gate = activations.silu(gate)
        
        # Apply the second linear transformation
        linear = self.dense_linear(x)
        
        # Element-wise multiplication of the activated gate and the linear output
        return activated_gate * linear
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "units": self.units,
        })
        return {**config}

    @classmethod
    def from_config(cls, config):
        return cls(**config)

**Transformer block implementation**

In [38]:
# One Transformer encoder block
@keras.utils.register_keras_serializable(package="Custom", name="TransformerBlock")
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ffn_dim, rate=0.1 ,**kwargs):
        super().__init__(**kwargs)
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = SwiGLULayer(units=embed_dim)
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.ffn_dim = ffn_dim
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.rate = rate
        
    def call(self, inputs, training=False):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "ffn_dim": self.ffn_dim,
            "rate": self.rate,
        })
        return {**config}
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)
        


 **Hybrid model builder function using QAT**

In [39]:

import tensorflow_model_optimization as tfmot

def build_cnn_vit_hybrid(
        cnn_model,
        feature_layer_name,
        num_transformer_layers=3,
        num_heads=8,
        mlp_dim=1024,
        num_classes=43):
    # 1. Freeze or fine-tune the CNN as you prefer
    cnn_model.trainable = True      
    
    # 2. Feature extractor up to the chosen layer
    features = cnn_model.get_layer(feature_layer_name).output
    H, W, C = features.shape[1], features.shape[2], features.shape[3]
    
    # 3. Flatten spatial grid → tokens  &  add positional encoding
    x = layers.Reshape((H * W, C))(features) 
    x = AddPositionEmbedding(H * W, C)(x)

    # 4. Stack ViT encoder blocks
    for _ in range(num_transformer_layers):
        x = TransformerBlock(C, num_heads, mlp_dim)(x)

    # 5. Token pooling & classification head
    x = layers.GlobalAveragePooling1D()(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    return keras.Model(cnn_model.layers[0].input, outputs, name="CNN_ViT_hybrid")

def create_quant_aware_hybrid_model(cnn_model, feature_layer_name):
    quantize_model = tfmot.quantization.keras.quantize_model
    q_aware_model = quantize_model(build_cnn_vit_hybrid(cnn_model, feature_layer_name))
    return q_aware_model    





**Data Generator and Augmentation**

In [40]:
import albumentations as A
import numpy as np


transform = A.Compose([
    # Standard geometric transformations
    A.Affine(
        translate_percent=(-0.1, 0.1), # Shifts the image up to 10%
        scale=(0.85, 1.15),           # Zooms in/out by 15%
        rotate=(-15, 15),             # Rotates the image up to 15 degrees
        p=0.75
    ),
    A.Perspective(scale=(0.05, 0.1), p=0.2),

    # Image quality and noise simulations
    A.OneOf([
        A.GaussNoise(p=0.4),
        A.MotionBlur(blur_limit=(3, 7)),
    ], p=0.4),

    # Color and lighting variations
    A.OneOf([
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2),
        A.RandomGamma(gamma_limit=(80, 120)),
        A.HueSaturationValue(hue_shift_limit=10, sat_shift_limit=20, val_shift_limit=10),
    ], p=0.5),

    # Advanced simulations of real-world conditions
    A.OneOf([
        A.RandomFog(fog_coef_range=(0.1, 0.3), p=1),
        A.RandomRain(drop_length=15, drop_width=1, blur_value=3, p=1),
        A.RandomSunFlare(p=1),
    ], p=0.2),

    # Occlusion simulation
    A.CoarseDropout(
        num_holes_range=(1, 5),          
        hole_height_range=(0.01, 0.03),  
        hole_width_range=(0.01, 0.03),   
        fill=0,                    
        p=0.1
    ),
])


def augment_generator(X, y, batch_size=32):
    n = len(X)
    while True:
        # Shuffle full dataset once per epoch
        indices = np.random.permutation(n)
        for i in range(0, n, batch_size):
            batch_idx = indices[i:i+batch_size]
            X_batch, y_batch = [], []
            for j in batch_idx:
                augmented = transform(image=X[j])["image"]
                X_batch.append(augmented)
                y_batch.append(y[j])
            yield np.array(X_batch), np.array(y_batch)


train_gen = augment_generator(X_train, y_train, batch_size=16)

**Architecture and compile**

In [41]:
q_aware_model = build_cnn_vit_hybrid(
        cnn_model,
        feature_layer_name=feature_layer_name,
        num_transformer_layers=3,
        num_heads=8,
        mlp_dim=1024,
        num_classes=43)

q_aware_model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-4),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Callbacks
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=6,
    restore_best_weights=True
)

checkpoint = ModelCheckpoint(
    filepath='CNN_ViT_hybrid.keras',
    monitor='val_loss',
    save_best_only=True
)

reduce_lr = keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-6

)

**Model Shape Validation**

In [None]:
# Ensure end-to-end shapes line up
dummy = tf.random.normal([1, 80, 80, 3])
pred  = q_aware_model(dummy)
print("Logits shape:", pred.shape)   # should be (1, num_classes)
# Expected output: Logits shape: (1, 43)

**Training**

In [None]:
history = q_aware_model.fit(
    train_gen,
    steps_per_epoch=len(X_train) // 16,
    batch_size=16,
    epochs=50,
    validation_data=(X_val, y_val),
    callbacks=[early_stop, checkpoint, reduce_lr],
    verbose=1
)



**Testing**

In [None]:
test_loss, test_acc = q_aware_model.evaluate(X_test, y_test, verbose=2)
print('\nTest accuracy:', test_acc)
print('Test loss:', test_loss)

**Plot Loss and Accuracy**

In [None]:
# Accuracy
plt.figure(figsize=(10,5))
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Training vs Validation Accuracy')
plt.legend()
plt.show()

# Loss
plt.figure(figsize=(10,5))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.show()

**Classification-Report and Confusion Matrix**

In [None]:
# 3️⃣ Confusion Matrix
y_pred = q_aware_model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)

cm = confusion_matrix(y_true, y_pred_classes)
plt.figure(figsize=(15,15))
sns.heatmap(cm, annot=False, cmap='Blues', xticklabels=np.unique(y_true), yticklabels=np.unique(y_true))
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# 4️⃣ Classification Report
report = classification_report(y_true, y_pred_classes, digits=3)
print("Classification Report:\n", report)

**ROC & AUC**

In [None]:
roc_auc_dict = {}
# 5️⃣ ROC Curves and AUC
for i in range(43):
    fpr, tpr, _ = roc_curve(y_test[:, i], y_pred[:, i])
    roc_auc = auc(fpr, tpr)
    roc_auc_dict[i] = roc_auc

    plt.plot(fpr, tpr, label=f'Class {i} (AUC = {roc_auc:.2f})')

**Training Summary Table**

In [None]:
import pandas as pd

summary = {
    "Dataset": ["Training", "Validation", "Test"],
    "Accuracy": [
        history.history['accuracy'][-1],      # last training acc
        history.history['val_accuracy'][-1],  # last val acc
        test_acc                               # from model.evaluate()
    ],
    "Loss": [
        history.history['loss'][-1],          # last training loss
        history.history['val_loss'][-1],      # last val loss
        test_loss                              # from model.evaluate()
    ]
}

df_summary = pd.DataFrame(summary)
print(df_summary)


**Results Combined**

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

# --- Load data ---
history_dict = history.history  # from model.fit()
y_val_true = np.argmax(y_val, axis=1)
y_val_pred = np.argmax(q_aware_model.predict(X_val), axis=1)

# Confusion matrix
cm = confusion_matrix(y_val_true, y_val_pred, labels=range(43))

# Class distribution
y_train_labels = np.argmax(y_train, axis=1)
y_val_labels   = np.argmax(y_val, axis=1)
y_test_labels  = np.argmax(y_test, axis=1)
all_labels = np.concatenate([y_train_labels, y_val_labels, y_test_labels])
counts = np.bincount(all_labels, minlength=43)

# --- Plot ---
fig, axes = plt.subplots(1, 3, figsize=(22,6))

# 1. Training curves
axes[0].plot(history_dict["accuracy"], label="Train Acc")
axes[0].plot(history_dict["val_accuracy"], label="Val Acc")
axes[0].set_title("Training/Validation Accuracy")
axes[0].set_xlabel("Epochs")
axes[0].set_ylabel("Accuracy")
axes[0].legend()

# 2. Confusion matrix (heatmap)
sns.heatmap(cm, ax=axes[1], cmap="Blues", cbar=False)
axes[1].set_title("Confusion Matrix (Validation)")
axes[1].set_xlabel("Predicted")
axes[1].set_ylabel("True")

# 3. Class distribution
axes[2].bar(range(43), counts, color="steelblue")
axes[2].set_title("Samples per Class (Train+Val+Test)")
axes[2].set_xlabel("Class ID")
axes[2].set_ylabel("Count")

plt.tight_layout()
plt.show()
