In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers, models
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model
# from tensorflow.keras.callbacks import EarlyStopping
import seaborn as sns
from sklearn.utils.class_weight import compute_class_weight
from IPython.display import clear_output
import warnings
warnings.filterwarnings('ignore')

In [None]:
SEED = 12
BATCH_SIZE = 64
EPOCHS = 1
LR =  0.0001
NUM_CLASSES = 14
train_dir = "./drive/MyDrive/train_dataset2.npz"
test_dir = "./drive/MyDrive/test_dataset2.npz"
IMG_HEIGHT = 64
IMG_WIDTH = 64

In [None]:
# Load dataset
data_train = np.load(train_dir)
data_test = np.load(test_dir)
X_train = data_train['X']
X_test = data_test['X']
y_train = data_train['y']
y_test = data_test['y']
CLASS_LABELS = ['Abuse','Arrest','Arson','Assault','Burglary','Explosion','Fighting',"Normal",'RoadAccidents','Robbery','Shooting','Shoplifting','Stealing','Vandalism']

print(f"Loaded dataset:")
print(f"X_train shape: {X_train.shape}")
print(f"X_test shape: {X_test.shape}")


Loaded dataset:
X_train shape: (354626, 64, 64, 3)
X_test shape: (41397, 64, 64, 3)


In [None]:
from collections import Counter

# Find class distributions
class_counts = Counter(y_train)
print("Class Distribution:", class_counts)

# Separate samples by class
class_samples = {cls: X_train[y_train == cls] for cls in np.unique(y_train)}


Class Distribution: Counter({7: 150000, 12: 30000, 9: 28000, 4: 18000, 6: 18000, 11: 18000, 8: 15000, 13: 13626, 1: 13000, 2: 13000, 5: 12000, 3: 10000, 0: 9000, 10: 7000})


In [None]:
preprocess_fun = tf.keras.applications.densenet.preprocess_input
# Define data augmentation for each class type
minority_augmenter = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    brightness_range=(0.8, 1.2)
)

moderate_augmenter = ImageDataGenerator(
    rotation_range=10,
    width_shift_range=0.1,
    horizontal_flip=True
)

# No augmentation for the majority class
no_augmenter = ImageDataGenerator()

test_datagen = ImageDataGenerator(rescale = 1./255,
                                  preprocessing_function=preprocess_fun
                                 )

In [None]:
# Function to augment a class
def augment_class(augmenter, class_samples, target_size):
    augmented_samples = []
    for img in class_samples:
        # Reshape image for augmentation (add batch dimension)
        img = np.expand_dims(img, axis=0)
        # Generate augmented images
        for _ in range(target_size - len(class_samples)):  # Generate only the required number of samples
            augmented_img = next(augmenter.flow(img, batch_size=1))[0]
            augmented_samples.append(augmented_img)
    return np.array(augmented_samples)

# Augment minority and moderate classes
balanced_data = []
balanced_labels = []

for cls, samples in class_samples.items():
    if class_counts[cls] < 50:  # Example: Minority class
        augmented = augment_class(minority_augmenter, samples, target_size=100)  # Target 100 samples
        balanced_data.append(np.concatenate([samples, augmented]))
    elif 50 <= class_counts[cls] < 200:  # Moderate class
        augmented = augment_class(moderate_augmenter, samples, target_size=200)  # Target 200 samples
        balanced_data.append(np.concatenate([samples, augmented]))
    else:  # Majority class, no augmentation
        balanced_data.append(samples)

    # Create labels for augmented data
    balanced_labels.append(np.full((len(balanced_data[-1]),), cls))


In [None]:
# Combine all augmented data and labels
X_train_balanced = np.concatenate(balanced_data, axis=0)
y_train_balanced = np.concatenate(balanced_labels, axis=0)

print("New Balanced Class Distribution:", Counter(y_train_balanced))


In [None]:
from sklearn.utils import shuffle

X_train_balanced, y_train_balanced = shuffle(X_train_balanced, y_train_balanced, random_state=42)


In [None]:
train_datagen = ImageDataGenerator(
    rotation_range=10,
    horizontal_flip=True,
    rescale=1./255
)

train_generator = train_datagen.flow(
    X_train_balanced,
    y_train_balanced,
    batch_size=32
)

test_generator = test_datagen.flow_from_directory(directory = test_dir,
                                                   target_size = (IMG_HEIGHT ,IMG_WIDTH),
                                                    batch_size = BATCH_SIZE,
                                                    shuffle  = False ,
                                                    color_mode = "rgb",
                                                    class_mode = "categorical",
                                                    seed = SEED
                                                  )

In [None]:
fig = px.bar(x = CLASS_LABELS,
             y = [list(train_generator.classes).count(i) for i in np.unique(train_generator.classes)] ,
             color = np.unique(train_generator.classes) ,
             color_continuous_scale="Emrld")
fig.update_xaxes(title="Classes")
fig.update_yaxes(title = "Number of Images")
fig.update_layout(showlegend = True,
    title = {
        'text': 'Train Data Distribution ',
        'y':0.95,
        'x':0.5,
        'xanchor': 'center',
        'yanchor': 'top'})
fig.show()

In [None]:
fig = px.bar(x = CLASS_LABELS,
             y = [list(test_generator.classes).count(i) for i in np.unique(test_generator.classes)] ,
             color = np.unique(train_generator.classes) ,
             color_continuous_scale="Emrld")
fig.update_xaxes(title="Classes")
fig.update_yaxes(title = "Number of Images")
fig.update_layout(showlegend = True,
    title = {
        'text': 'Test Data Distribution ',
        'y':0.95,
        'x':0.5,
        'xanchor': 'center',
        'yanchor': 'top'})
fig.show()

In [None]:
def plot_sample(X,y,index):
    plt.figure(figsize=(3,2))
    plt.imshow(X[index])
    plt.xlabel(CLASS_LABELS[index])

plot_sample(X_train,y_train,1)

In [None]:
def create_hybrid_vit_model(input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), num_classes=NUM_CLASSES):
    # 1. CNN Feature Extractor
    cnn_input = layers.Input(shape=input_shape)
    x = layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu')(cnn_input)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Flatten()(x)
    cnn_features = layers.Dense(128, activation='relu')(x)

    # 2. Transformer Input: Patch Embeddings
    image_size = input_shape[0]
    patch_size = 6
    num_patches = (image_size // patch_size) ** 2
    projection_dim = 64

    # Adjust the CNN output to match num_patches * projection_dim
    adjusted_dim = num_patches * projection_dim
    cnn_features = layers.Dense(adjusted_dim, activation='relu')(cnn_features)

    # Reshape into patches
    patches = layers.Reshape((num_patches, projection_dim))(cnn_features)

    # 3. Positional Encoding
    position_embedding = tf.constant(tf.random.uniform((1, num_patches, projection_dim)))
    embedded_patches = patches + position_embedding

    # 4. Transformer Layers
    for _ in range(8):  # 8 transformer layers
        attention_output = layers.MultiHeadAttention(num_heads=4, key_dim=projection_dim)(
            embedded_patches, embedded_patches
        )
        attention_output = layers.Add()([attention_output, embedded_patches])  # Residual connection
        attention_output = layers.LayerNormalization()(attention_output)

        mlp_output = layers.Dense(projection_dim * 2, activation='relu')(attention_output)
        mlp_output = layers.Dense(projection_dim)(mlp_output)
        embedded_patches = layers.Add()([mlp_output, attention_output])  # Residual connection
        embedded_patches = layers.LayerNormalization()(embedded_patches)

    # 5. Classification Head
    representation = layers.GlobalAveragePooling1D()(embedded_patches)
    mlp_head = layers.Dense(2048, activation='relu')(representation)
    mlp_head = layers.Dense(1024, activation='relu')(mlp_head)
    output = layers.Dense(num_classes, activation='softmax')(mlp_head)

    hybrid_vit_model = Model(inputs=cnn_input, outputs=output, name="Hybrid_CNN_ViT")
    return hybrid_vit_model


In [None]:
# Instantiate and compile the model
hybrid_vit = create_hybrid_vit_model(input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), num_classes=NUM_CLASSES)
hybrid_vit.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LR),
                   loss='sparse_categorical_crossentropy',
                   metrics=['accuracy'])


# Summarize the model
hybrid_vit.summary()

class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train_balanced),
    y=y_train_balanced
)
class_weights_dict = {i: class_weights[i] for i in range(len(class_weights))}

# datagen = ImageDataGenerator(
#     rotation_range=15,
#     width_shift_range=0.1,
#     height_shift_range=0.1,
#     horizontal_flip=True
# )
# datagen.fit(X_train)


# Train the model
# early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
history = hybrid_vit.fit(
    x=train_generator,
    validation_data=test_generator,
    epochs=EPOCHS,
    # callbacks=[early_stopping],
    class_weight=class_weights_dict
)



In [None]:
# Plot accuracy
plt.figure(figsize=(10, 6))
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

# Plot loss
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()


In [None]:
# Generate predictions and convert them to class indices
y_pred = hybrid_vit.predict(X_test)
y_pred_classes = [np.argmax(element) for element in y_pred]

# Plot confusion matrix
cm = confusion_matrix(y_test, y_pred_classes)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

# Print classification report
print("classification_report\n", classification_report(y_test, y_pred_classes))


In [None]:
# Save the model
hybrid_vit.save('./drive/MyDrive/FYP/hybrid_vit_model.h5')


In [None]:
def plot_sample_with_prediction(X, y_true, y_pred, index):
    plt.figure(figsize=(3, 2))
    plt.imshow(X[index])
    plt.title(f"True: {class_names[y_true[index]]}, Pred: {class_names[y_pred[index]]}")
    plt.show()

plot_sample_with_prediction(X_test, y_test, y_pred_classes, 661)
