In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D, Flatten, Conv2D, MaxPooling2D, BatchNormalization, Input
from tensorflow.keras.applications import VGG19, ResNet50, MobileNetV2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, LambdaCallback

In [None]:
# Load data
train = pd.read_csv('data/train.csv')
test = pd.read_csv('data/test.csv')
val = pd.read_csv('data/val.csv')

# Define emotion dictionary and labels
emotion_dict = {0: 'Angry', 1: 'Disgust', 2: 'Fear', 3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'}
emotion_labels = list(emotion_dict.values())

In [None]:
# Preprocess data
def preprocess_data(df):
    df['pixels'] = df['pixels'].apply(lambda x: np.array(x.split(), dtype='float32').reshape(48, 48, 1) / 255.0)
    return np.stack(df['pixels'].values)

X_train = preprocess_data(train)
X_test = preprocess_data(test)
X_val = preprocess_data(val)

y_train = tf.keras.utils.to_categorical(train['emotion'], num_classes=7)
y_test = tf.keras.utils.to_categorical(test['emotion'], num_classes=7)
y_val = tf.keras.utils.to_categorical(val['emotion'], num_classes=7)

In [None]:
# Data augmentation with resizing in the pipeline
datagen_cnn = ImageDataGenerator(rotation_range=20, width_shift_range=0.2, 
                                 height_shift_range=0.2, shear_range=0.2,
                                 zoom_range=0.2, horizontal_flip=True)

datagen_vgg_resnet = ImageDataGenerator(rotation_range=20, width_shift_range=0.2, 
                                        height_shift_range=0.2, shear_range=0.2,
                                        zoom_range=0.2, horizontal_flip=True, 
                                        preprocessing_function=lambda x: tf.image.resize(x, (224, 224)))

datagen_mobilenet = ImageDataGenerator(rotation_range=20, width_shift_range=0.2, 
                                       height_shift_range=0.2, shear_range=0.2,
                                       zoom_range=0.2, horizontal_flip=True, 
                                       preprocessing_function=lambda x: tf.image.resize(x, (160, 160)))

In [None]:
# Model creation functions
def create_vgg19_model():
    base_model = VGG19(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    x = GlobalAveragePooling2D()(base_model.output)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.5)(x)
    predictions = Dense(7, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=predictions)
    base_model.trainable = False
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_resnet50_model():
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    x = GlobalAveragePooling2D()(base_model.output)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.5)(x)
    predictions = Dense(7, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=predictions)
    base_model.trainable = False
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_mobilenetv2_model():
    base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(160, 160, 3))
    x = GlobalAveragePooling2D()(base_model.output)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.5)(x)
    predictions = Dense(7, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=predictions)
    base_model.trainable = False
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def create_custom_cnn_model():
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=(48, 48, 1)),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(7, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
# Callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=6, min_lr=0.0001)
print_callback = LambdaCallback(on_epoch_end=lambda epoch, 
                                logs: print(f"Epoch {epoch+1}: Loss={logs['loss']:.4f}, Accuracy={logs['accuracy']:.4f}"))

In [None]:
# Function to resize images according to model requirements
def resize_for_model(X, model_name):
    if model_name == 'VGG19' or model_name == 'ResNet50':
        return tf.image.resize(np.repeat(X, 3, axis=-1), (224, 224)).numpy()
    elif model_name == 'MobileNetV2':
        return tf.image.resize(np.repeat(X, 3, axis=-1), (160, 160)).numpy()
    else:  # CustomCNN
        return X  # No resizing  (48x48x1) 

In [None]:
# Train models
models = {
    'CustomCNN': create_custom_cnn_model(),
    'VGG19': create_vgg19_model(),
    'ResNet50': create_resnet50_model(),
    'MobileNetV2': create_mobilenetv2_model()
}

histories = {}

# Train models
for name, model in models.items():
    print(f"Training {name}...")

    # Resizing the training and validation data
    X_train_resized = resize_for_model(X_train, name)
    X_val_resized = resize_for_model(X_val, name)
    
    if name == 'VGG19' or name == 'ResNet50':
        history = model.fit(
            datagen_vgg_resnet.flow(X_train_resized, y_train, batch_size=64),
            epochs=30,
            validation_data=(X_val_resized, y_val),
            callbacks=[print_callback, early_stopping, reduce_lr]
        )
    elif name == 'MobileNetV2':
        history = model.fit(
            datagen_mobilenet.flow(X_train_resized, y_train, batch_size=64),
            epochs=30,
            validation_data=(X_val_resized, y_val),
            callbacks=[print_callback, early_stopping, reduce_lr]
        )
    else:
        history = model.fit(
            datagen_cnn.flow(X_train, y_train, batch_size=64),
            epochs=30,
            validation_data=(X_val, y_val),
            callbacks=[print_callback, early_stopping, reduce_lr]
        )

    histories[name] = history

In [None]:
# Evaluate models
# Evaluate models
for name, model in models.items():
    print(f"Evaluating {name}...")
    X_test_resized = resize_for_model(X_test, name)
    
    loss, accuracy = model.evaluate(X_test_resized, y_test)
    print(f"{name} accuracy: {accuracy:.2f}")
    
    # Save model
    model.save(f'{name}_model.h5')

In [None]:
# Plot confusion matrix
for name, model in models.items():
    X_test_resized = resize_for_model(X_test, name)
    y_pred = model.predict(X_test_resized)
    
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true_classes = np.argmax(y_test, axis=1)
    
    conf_matrix = confusion_matrix(y_true_classes, y_pred_classes)
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', 
                xticklabels=list(emotion_dict.values()), 
                yticklabels=list(emotion_dict.values()))
    plt.title(f'Confusion Matrix for {name}')
    plt.show()
