## TODO
1 - increase the number of epochs
2 - uncomment the dense layer (128)

## notes : model2 problem is not in the model architecture (varified this by trying on bachelor model architecture, got similar results) so the problem is most likely in the data, or some where else in the code

# **Milestone 2**

### Packages installation

In [11]:
!pip install kagglehub
!pip install imagehash
!pip install tensorflow
!pip install pandas
!pip install scikit-learn
!pip install matplotlib
!pip install tqdm
!pip install cupy

print("done installing packages")

done installing packages


### imports

In [12]:
import os
import kagglehub
import random
from PIL import Image
import imagehash
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
from tqdm import tqdm 
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
import cupy as cp

print("done importing packages")


done importing packages


## **Data preparaion**

### Dataset path

In [13]:
if 'COLAB_GPU' in os.environ:
    dataset_folder = kagglehub.dataset_download('hussainghoraba/emotions-dataset')
    DATASET_PATH = os.path.join(dataset_folder, 'Dataset')
elif 'KAGGLE_URL_BASE' in os.environ:
    DATASET_PATH = '/kaggle/input/emotions-dataset/Dataset'
elif 'VSCODE_PID' in os.environ:
    DATASET_PATH = './Dataset'
else:
    raise Exception('Unknown environment')

print("done setting up dataset path")

done setting up dataset path


### Set random seed & some global variables

In [14]:
RANDOM_SEED = 42
TAREGT_SIZE = (512, 512)
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)
os.environ['PYTHONHASHSEED'] = str(RANDOM_SEED)
print("done setting up random seed")

done setting up random seed


### Load dataset into memory without dups, and with correct size, and equalize the number of images in each class


In [None]:
from matplotlib.pylab import f


total_images_count = sum(len(files) for _, _, files in os.walk(DATASET_PATH))
print(f"Total images in dataset: {total_images_count}")
data = []
dups_pairs = set()

# load only a small percentage of the dataset, for faster testing while developing
DATASET_PERC_TO_USE = 1

num_images_in_smallest_category = min(len(os.listdir(os.path.join(DATASET_PATH, folder))) for folder in os.listdir(DATASET_PATH))
num_of_images_to_use_in_each_category = int(num_images_in_smallest_category * DATASET_PERC_TO_USE)

print(f"Number of images in the smallest category: {num_images_in_smallest_category}")

num_of_categories = len(os.listdir(DATASET_PATH))

total_images_to_load = int(num_images_in_smallest_category * num_of_categories * DATASET_PERC_TO_USE)

with tqdm(total=total_images_to_load, desc="Loading images into memory...") as pbar:
    for subfolder in os.listdir(DATASET_PATH):
        subfolder_path = os.path.join(DATASET_PATH, subfolder)
        subfolder_hashes = {}

        all_category_images = os.listdir(subfolder_path)
        # we must use the same number of images from each category to avoid bias
        images_to_load = random.sample(all_category_images, num_of_images_to_use_in_each_category)
        
        for img_file in images_to_load:
            img_path = os.path.join(subfolder_path, img_file)
            with Image.open(img_path) as img:
                img = img.convert("RGB").resize(TAREGT_SIZE)
                img_arr = np.array(img)
                img_hash = imagehash.phash(img)
            if img_hash not in subfolder_hashes.keys():
                data.append({"img_path": img_path, "label": subfolder, "img_arr": img_arr})
                # key : hash, value : img_path
                subfolder_hashes[img_hash] = img_path
            else:
                existing_duplicate = subfolder_hashes[img_hash]
                dups_pairs.add((img_path, existing_duplicate))
            pbar.update(1)
        
df = pd.DataFrame(data)

# display dups
for dup_pair in dups_pairs:
    print(f"Duplicate images found: {dup_pair[0]} and {dup_pair[1]}")
    img1 = Image.open(dup_pair[0])
    plt.subplot(1, 2, 1)
    plt.imshow(img1)
    plt.title(os.path.basename(dup_pair[0]))
    plt.axis('off')
    img2 = Image.open(dup_pair[1])
    plt.subplot(1, 2, 2)
    plt.imshow(img2)
    plt.title(os.path.basename(dup_pair[1]))
    plt.axis('off')
    plt.show()
print("done loading images")

Total images in dataset: 2125
Number of images in the smallest category: 297


Loading images into memory...: 100%|██████████| 356/356 [00:04<00:00, 77.59it/s]

done loading images





### Train/Test/Val split

In [16]:
train_ratio = 0.7
val_ratio = 0.2
test_ratio = 0.1

train_df, temp_df = train_test_split(df, test_size=(1 - train_ratio), stratify=df['label'], random_state=RANDOM_SEED)
val_df, test_df = train_test_split(temp_df, test_size=(test_ratio / (test_ratio + val_ratio)), stratify=temp_df['label'], random_state=RANDOM_SEED)

# Print the sizes of each split
print(f"Training set size: {len(train_df)}")
print(f"Validation set size: {len(val_df)}")
print(f"Test set size: {len(test_df)}")

print("\ntraining set:")
print(train_df['label'].value_counts())

print("\nvalidation set:")
print(val_df['label'].value_counts())

print("\ntest set:")
print(test_df['label'].value_counts())


print("done splitting dataset into train, val, test")

Training set size: 249
Validation set size: 71
Test set size: 36

training set:
label
Happy      63
Neutral    62
Sad        62
Angry      62
Name: count, dtype: int64

validation set:
label
Neutral    18
Sad        18
Angry      18
Happy      17
Name: count, dtype: int64

test set:
label
Neutral    9
Sad        9
Angry      9
Happy      9
Name: count, dtype: int64
done splitting dataset into train, val, test


## **Model 1**

In [None]:
class ConvLayer:
    def __init__(self, filter_size=3, num_filters=5, filter_weights=None):
        self.filter_size = filter_size
        self.num_filters = num_filters

        if filter_weights is not None:
            self.filters = filter_weights
        else:
            # random weights
            self.filters = cp.random.randn(num_filters, filter_size, filter_size, 3) * 0.1
            
    def forward(self, input_image):
        input_image = cp.asarray(input_image)
        filters = cp.asarray(self.filters)

        input_height, input_width, input_channels = input_image.shape
        output_height = input_height - self.filter_size + 1
        output_width = input_width - self.filter_size + 1

        output = cp.zeros((output_height, output_width, input_channels))

        for f in range(self.num_filters):
            for h in range(output_height):
                for w in range(output_width):
                    image_window = input_image[h:h+self.filter_size, w:w+self.filter_size, :]
                    output[h, w, f] = cp.sum(image_window * filters[f])
        return output
    

class PoolingLayer:
    def __init__(self, pool_size=2, pool_type='MAX'):
        pool_type = pool_type.upper() # convert to uppercase for easier usage
        if pool_type not in ['MAX', 'AVERAGE']:
            raise ValueError("pool_type must be either 'MAX' or 'AVERAGE'")
        self.pool_size = pool_size
        self.pool_type = pool_type

    def forward(self, input_image):
        input_image = cp.asarray(input_image)
        input_height, input_width, input_channels = input_image.shape
        output_height = input_height // self.pool_size
        output_width = input_width // self.pool_size

        output = cp.zeros((output_height, output_width, input_channels))

        for h in range(output_height):
            for w in range(output_width):
                image_window = input_image[h*self.pool_size:(h+1)*self.pool_size, w*self.pool_size:(w+1)*self.pool_size, :]
                if self.pool_type == 'MAX':
                    output[h, w] = cp.max(image_window, axis=(0, 1))
                elif self.pool_type == 'AVERAGE':
                    output[h, w] = cp.mean(image_window, axis=(0, 1))
        return output

def Relu(cp_arr):
    return cp.maximum(0, cp_arr)


### testing

In [None]:
def create_filter_3d(filter_2d):
    return cp.stack([filter_2d] * 3, axis=-1)

filters_2d = [
    cp.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]]),
    cp.array([[0, 0, 0], [0, 1, 0], [0, 0, 0]]),
    cp.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]]),
    cp.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]]),
    cp.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]])
]

filters_3d = [create_filter_3d(f) for f in filters_2d]

# # Function to load and preprocess an image
# def load_image(url=None, path=None, target_size=(512, 512)):
#     """Load an image from URL or local path and preprocess it."""
#     if url:
#         response = requests.get(url)
#         img = Image.open(BytesIO(response.content))
#     elif path:
#         img = Image.open(path)
#     else:
#         # Create a sample image if no source is provided
#         img = Image.new('RGB', target_size, color=(73, 109, 137))

#     # Resize the image
#     img = img.resize(target_size)

#     # Convert to numpy array
#     img_array = np.array(img) / 255.0  # Normalize to [0, 1]

#     return img_array

# # Test with a sample image (you can replace this URL with your own image)
# # Using a sample image URL - replace with your own image or add code to load a local file
# sample_image_url = "https://images.wallpapersden.com/image/download/cat-green-eyed-muzzle_aWhnZ5SZmpqtpaSklGpmZ61qZmc.jpg"
# try:
#     img = load_image(url=sample_image_url)
# except:
#     # If the URL doesn't work, create a simple test image
#     img = np.zeros((512, 512, 3))
#     # Add some shapes for testing
#     img[100:400, 100:400, 0] = 1.0  # Red square
#     img[200:300, 200:300, 1] = 1.0  # Green square inside red

# Initialize a ConvLayer with the predefined filters
conv_layer = ConvLayer(filter_size=3, num_filters=5, filter_weights=filters_3d)

# # Apply convolution
# feature_maps = conv_layer.forward(img)

# # Visualize the original image and the feature maps
# plt.figure(figsize=(15, 8))

# # Original image
# plt.subplot(2, 3, 1)
# plt.imshow(img)
# plt.title('Original Image')
# plt.axis('off')

# # Feature maps
# filter_names = ['Box Filter (a)', 'Identity Filter (b)', 'Sobel X (c)', 'Sobel Y (d)', 'Sharpening (e)']

# for i in range(5):
#     plt.subplot(2, 3, i+2)
#     # Normalize the feature map for better visualization
#     feature_map = feature_maps[:, :, i]
#     feature_map = (feature_map - feature_map.min()) / (feature_map.max() - feature_map.min() + 1e-8)
#     plt.imshow(feature_map, cmap='viridis')
#     plt.title(f'Filter {i+1}: {filter_names[i]}')
#     plt.axis('off')

# plt.tight_layout()
# plt.show()

# # Test individual filters
# print("\nTesting individual filters:")
# for i, (filter_name, filter_3d) in enumerate(zip(filter_names, [filter_a_3d, filter_b_3d, filter_c_3d, filter_d_3d, filter_e_3d])):
#     # Create a ConvLayer with a single filter
#     single_filter = np.expand_dims(filter_3d, axis=0)
#     conv_layer_single = ConvLayer(filter_size=3, num_filters=1, filter_weights=single_filter)

#     # Apply convolution
#     feature_map = conv_layer_single.forward(img)

#     print(f"Filter {i+1} ({filter_name}) applied successfully.")

print("done model 1")

## **Model 2**

In [18]:
# At the start of page 5 in the project description, point 1 says : build a CNN with 3 convolutional layers,
# but point 2 says we should use 5 filters, how ? we only have 3 conv layers.
# so we decided to ignore point 1 and use 5 conv layers instead of 3
num_classes = len(df['label'].unique())
def create_model2():
    model = models.Sequential([
    
    # first convolutional "Block", which consists of 5 convolutional layers
    layers.Conv2D(32, (3, 3), activation='relu', padding='valid', input_shape=(TAREGT_SIZE[0], TAREGT_SIZE[1], 3)),
    layers.Conv2D(64, (3, 3), activation='relu', padding='valid'),
    layers.Conv2D(64, (3, 3), activation='relu', padding='valid'),
    layers.Conv2D(32, (5, 5), activation='relu', padding='valid'),
    layers.Conv2D(16, (7, 7), activation='relu', padding='valid'),
    layers.MaxPooling2D((2, 2)),

    # second convolutional "Block", which consists of 5 convolutional layers
    layers.Conv2D(32, (3, 3), activation='relu', padding='valid'),
    layers.Conv2D(64, (3, 3), activation='relu', padding='valid'),
    layers.Conv2D(64, (3, 3), activation='relu', padding='valid'),
    layers.Conv2D(32, (5, 5), activation='relu', padding='valid'),
    layers.Conv2D(16, (7, 7), activation='relu', padding='valid'),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(32, (3, 3), activation='relu', padding='valid'),
    layers.Conv2D(64, (3, 3), activation='relu', padding='valid'),
    layers.Conv2D(64, (3, 3), activation='relu', padding='valid'),
    layers.Conv2D(32, (5, 5), activation='relu', padding='valid'),
    layers.Conv2D(16, (7, 7), activation='relu', padding='valid'),
    layers.MaxPooling2D((2, 2)),

        layers.Flatten(),
        # 128 is OUR CHOICE for the number of neurons in the hidden layer (not specified in the project description)
        layers.Dense(128, activation='sigmoid'),
        layers.Dense(num_classes, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                loss='categorical_crossentropy', 
                metrics=['accuracy'])
    return model

model2 = create_model2()


print("done building model 2")

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
E0000 00:00:1745517226.804323   45992 cuda_executor.cc:1228] INTERNAL: CUDA Runtime error: Failed call to cudaGetRuntimeVersion: Error loading CUDA libraries. GPU will not be used.: Error loading CUDA libraries. GPU will not be used.
W0000 00:00:1745517226.805601   45992 gpu_device.cc:2341] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...


done building model 2


### Callbacks (to prevent overfitting)

In [19]:
learning_rate_reduction = ReduceLROnPlateau(monitor = 'val_accuracy',
                                            patience=2,
                                            factor=0.5,
                                            min_lr = 0.00001,
                                            verbose = 1)

early_stoping = EarlyStopping(monitor='val_loss',patience= 3,restore_best_weights=True,verbose=0)

### Training

In [20]:
train_datagen = ImageDataGenerator(rescale=1./255)
val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)
batch_size = 16
epochs = 10

train_gen = train_datagen.flow_from_dataframe(
    dataframe=train_df,
    target_size=TAREGT_SIZE,
    x_col='img_path',  
    y_col='label',    
    batch_size=batch_size,
    class_mode='categorical', 
    shuffle=True
)

val_gen = val_datagen.flow_from_dataframe(
    dataframe=val_df,
    target_size=TAREGT_SIZE,
    x_col='img_path',
    y_col='label',
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False
)
print(f"images sizes passed to image data generator: {train_gen.image_shape}")

test_gen = test_datagen.flow_from_dataframe(
    dataframe=test_df,
    target_size=TAREGT_SIZE,
    x_col='img_path',
    y_col='label',
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False
)


model2_history = model2.fit(
    train_gen,
    validation_data=val_gen,
    callbacks=[early_stoping, learning_rate_reduction],
    epochs=epochs,
)

test_loss, test_acc = model2.evaluate(test_gen)
print(f"Val accuracy: {model2_history.history['val_accuracy'][-1]:.2f}")
print(f"Test accuracy: {test_acc:.2f}")




print("done training model 2")


Found 249 validated image filenames belonging to 4 classes.
Found 71 validated image filenames belonging to 4 classes.
images sizes passed to image data generator: (512, 512, 3)
Found 36 validated image filenames belonging to 4 classes.


  self._warn_if_super_not_called()


Epoch 1/10


2025-04-24 19:53:50.500252: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 532684800 exceeds 10% of free system memory.
2025-04-24 19:53:50.630866: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 1057030144 exceeds 10% of free system memory.
2025-04-24 19:53:51.074721: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 1048723456 exceeds 10% of free system memory.
2025-04-24 19:53:51.923996: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 516104192 exceeds 10% of free system memory.
2025-04-24 19:53:53.061490: W external/local_xla/xla/tsl/framework/cpu_allocator_impl.cc:83] Allocation of 251920384 exceeds 10% of free system memory.


[1m12/16[0m [32m━━━━━━━━━━━━━━━[0m[37m━━━━━[0m [1m2:05[0m 31s/step - accuracy: 0.2060 - loss: 1.5715

KeyboardInterrupt: 

In [None]:
# # Get class indices and reverse mapping
# class_indices = test_gen.class_indices
# class_labels = list(class_indices.keys())

# # Get predictions for the test set
# preds = model.predict(test_gen)
# predicted_classes = np.argmax(preds, axis=1)
# true_classes = test_gen.classes
# filenames = test_gen.filenames

# # Display 10 random images with predictions
# num_images = 10
# indices = np.random.choice(len(filenames), num_images, replace=False)

# plt.figure(figsize=(20, 10))

# for i, idx in enumerate(indices):
#     img_path = os.path.join(test_dir, filenames[idx])
#     img = tf.keras.preprocessing.image.load_img(img_path, target_size=img_size)
#     img_array = tf.keras.preprocessing.image.img_to_array(img) / 255.0

#     plt.subplot(2, 5, i + 1)
#     plt.imshow(img_array)
#     plt.axis('off')
#     true_label = class_labels[true_classes[idx]]
#     predicted_label = class_labels[predicted_classes[idx]]
#     title_color = "green" if true_label == predicted_label else "red"
#     plt.title(f"True: {true_label}\nPred: {predicted_label}", color=title_color)

# plt.tight_layout()
# plt.show()

print("done displaying predictions")


# **Milestone 3**

## **model 1**


### Accuracy vs iterations


In [None]:
# to be implemented

### K-fold cross-validation

In [None]:
# to be implemented

###  Confusion matrix

In [None]:
# to be implemented

## **model 2**


### Accuracy vs iterations


In [None]:
plt.plot(model2_history.history['accuracy'], label='train accuracy')
plt.plot(model2_history.history['val_accuracy'], label='val accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

### K-fold cross-validation

In [None]:

k = 4
kf = KFold(n_splits=k, shuffle=True, random_state=RANDOM_SEED)

fold_accuracies = []

for fold, (train_idx, val_idx) in enumerate(kf.split(df)):
    print(f"\nStarting fold {fold + 1}/{k}...")
    
    train_data = df.iloc[train_idx]
    val_data = df.iloc[val_idx]
    
    train_gen = train_datagen.flow_from_dataframe(
        dataframe=train_data,
        x_col='img_path',
        y_col='label',
        target_size=TAREGT_SIZE,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle=True
    )
    
    val_gen = val_datagen.flow_from_dataframe(
        dataframe=val_data,
        x_col='img_path',
        y_col='label',
        target_size=TAREGT_SIZE,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle=False
    )
    
    model2 = create_model2()
    
    model2_history = model2.fit(
        train_gen,
        validation_data=val_gen,
        callbacks=[early_stoping, learning_rate_reduction],
        epochs=epochs,
    )
    
    val_accuracy = model2.evaluate(val_gen)[1]
    print(f"\nFold {fold + 1} - Validation Accuracy: {val_accuracy:.4f}")
    fold_accuracies.append(val_accuracy)


average_accuracy = sum(fold_accuracies) / k
print(f"Average Accuracy: {average_accuracy:.1f}")





###  Confusion matrix

In [None]:
def display_confusion_matrix(model, val_gen, plt_title):
    y_true = val_gen.classes
    print(f"y_true : {y_true}")
    y_pred = model.predict(val_gen)
    # print(f"y_pred : {y_pred}")
    y_pred_classes = y_pred.argmax(axis=1)
    
    cm = confusion_matrix(y_true, y_pred_classes)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=val_gen.class_indices.keys())
    disp.plot(cmap='viridis')
    plt.title(plt_title)
    plt.show()

display_confusion_matrix(model2, val_gen, "Confusion Matrix for model 2")