In [None]:
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
from torch import optim
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns

from utils import *

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

%load_ext autoreload
%autoreload 2

In [None]:
real_data_path = "../data/data.npy"
real_data = np.load(real_data_path)

ai_data_path = "../data/data_ai.npy"
ai_data = np.load(ai_data_path)

combined_data = np.concatenate([real_data, ai_data], axis = 0)

# we apply stratified sampling
X_train, X_test, y_train, y_test = train_test_split(combined_data[:,:-1], combined_data[:,-1], test_size=0.2, stratify=combined_data[:,-1])

In [None]:
train_data = DataLoader([X_train, y_train], shuffle=True, batch_size=8, shape=[128,128], device=DEVICE)
test_data = DataLoader([X_test, y_test], shape=[128,128], device=DEVICE)

real_data = DataLoader("../data/data.npy", shape=[128,128], device=DEVICE)
ai_data = DataLoader("../data/data_ai.npy", shape=[128,128], device=DEVICE)

In [None]:
train_data, test_data

In [None]:
# import pandas as pd

# huge_data = pd.read_csv("../data/fer2013.csv")

# huge_data = huge_data[huge_data.emotion.isin([0, 3, 4, 5])].drop(["Usage"], axis = 1)
# map_labels = {0:0, 3:1, 4:2, 5:3}
# huge_data.emotion = huge_data.emotion.map(map_labels)

# num_rows_to_drop = int(0.75 * len(huge_data))
# rows_to_drop = np.random.choice(huge_data.index, num_rows_to_drop, replace=False)
# huge_data = huge_data.drop(index=rows_to_drop)


# huge_data.pixels = huge_data.pixels.apply(lambda x: np.array(x.split(" ")))



# huge_data.info()

In [None]:
# huge = np.concatenate([np.stack(huge_data.to_numpy()[:,1]).astype("float") / 255, huge_data.to_numpy()[:,0].reshape(-1,1)], axis = 1).astype("float")


# X_train, X_test, y_train, y_test = train_test_split(huge[:,:-1], huge[:,-1], test_size=0.2, stratify=huge[:,-1])

# train_data = DataLoader([X_train, y_train], shuffle=True, batch_size=64, shape=[48,48], device=DEVICE)
# test_data = DataLoader([X_test, y_test], shape=[48,48], device=DEVICE)


In [None]:
# we had to keep it in different file so we can use it in live test folder 
from model import *
    
model = CNN128()
sum(p.numel() for p in model.parameters())

In [None]:
N, C, W, H = train_data.shape
N_test = test_data.size

n_epoch = 100

model = CNN128()
optimizer = optim.Adam(model.parameters(), lr = 5e-3)
criterion = nn.CrossEntropyLoss(reduction="sum")

model.to(DEVICE)

losses = {"train": list(), "test": list()}

best_test_loss = float('inf')

for epoch in range(n_epoch):
    model.train()
    running_loss = .0
    for X_batch, y_batch in train_data:

        y_pred = model(X_batch.float())

        loss = criterion(y_pred, y_batch.long())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss = loss.item()
    
    running_loss /= N


    # test the model
    model.eval()
    predictions = model(test_data.X.float())
    pred_class = torch.argmax(predictions, axis = 1)
    test_acc = (pred_class == test_data.y).float().mean()
    test_loss = criterion(predictions, test_data.y.long()).item() / N_test
    
    print(f"Epoch: {epoch+1:>2}, Train loss: \033[92m{running_loss:.4f}\033[0m, Test loss: \033[94m{test_loss:.4f}\033[0m, Test accuracy: {colorize_accuracy(test_acc)}{test_acc:.4f}\033[0m.")

    # save the best model
    if test_loss < best_test_loss:
        best_test_loss == test_loss
        best_model = model.state_dict()

    losses["train"].append(running_loss)
    losses["test"].append(test_loss)


# load the best model
model.load_state_dict(best_model)


In [None]:
emotion_map = {0.:"angry", 1.:"happy", 2.:"sad", 3.:"shocked"}

predictions = model(test_data.X.float()).argmax(axis = 1)


fig, axs = plt.subplots(5, 5, figsize=(20, 20))

axs = axs.flatten()
# Loop through each image and plot it
for i in range(test_data.size):
    if i == 25:
        break
    axs[i].imshow(test_data.X[i].cpu().squeeze(), cmap='gray')  # Assuming grayscale images
    axs[i].axis('off')  # Turn off axis labels
    axs[i].set_title(f"predicted class {emotion_map[int(predictions[i])]}")  # Add a title for each subplot

plt.show()

In [None]:
test_model(model, test_data, criterion)

In [None]:
train_data.y.unique(return_counts=True), test_data.y.unique(return_counts=True)

In [None]:
class CustomTensorDataset(TensorDataset):
    """TensorDataset with support of transforms."""
    def __init__(self, tensors, transform=None):
        assert all(tensors[0].size(0) == tensor.size(0) for tensor in tensors)
        self.tensors = tensors
        self.transform = transform

    def __getitem__(self, index):
        x = self.tensors[0][index]

        if self.transform:
            x = self.transform(x.numpy()) 

        y = self.tensors[1][index]
        return x, y

    def __len__(self):
        return self.tensors[0].size(0)


In [None]:
from torchvision import transforms

data_transforms = transforms.Compose([
    transforms.ToPILImage(), 
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
    transforms.ToTensor(),  
])


train_dataset_test = CustomTensorDataset(tensors=(X_train_tensor, y_train_tensor), transform=data_transforms)
test_dataset_test = TensorDataset(X_test_tensor, y_test_tensor)

train_loader_test = DataLoader(train_dataset_test, batch_size=8, shuffle=True)
test_loader_test = DataLoader(test_dataset_test, batch_size=8, shuffle=False)




In [None]:
import pandas as pd

df = pd.read_csv("../data/fer2013.csv")

img_array = df.pixels.apply(lambda x: np.array(x.split(' ')))
img_array = np.stack(img_array, axis=0)
img_array.shape



## Save the best model

In [None]:
torch.save(model.state_dict(), "../model/model_1")