### IMPORTING NECESSARY LIBRARIES

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler, random_split, TensorDataset
from torchvision.transforms import ToTensor, transforms
from sklearn.metrics import classification_report
import torchvision.models as models
import torch.optim.lr_scheduler as lr_scheduler
from tqdm.auto import tqdm
import os
import numpy as np
from PIL import Image
import warnings
import matplotlib.pyplot as plt
from pathlib import Path
warnings.filterwarnings("ignore")

In [None]:
train_dir = "/kaggle/input/facial-expression-recognition-2013-csv-files/Images/train"

In [None]:
test_dir = "/kaggle/input/facial-expression-recognition-2013-csv-files/Images/test"

### CREATING A CUSTOM DATASET CLASS

In [None]:
class CustomDataset(Dataset):
    def __init__(self, root_dir, transform = None, weights = None):
        self.root_dir = root_dir
        self.transform = transform
        
        self.image_paths = [str(i) for i in Path(self.root_dir).rglob("*jpg")]
        #print(self.image_paths)
        
        self.labels = [Path(i).parent.name for i in self.image_paths]
        
        
        self.classes = sorted(set(self.labels))
        #print(self.classes)
        
        self.class2idx = dict(zip(self.classes, range(len(self.classes))))
        self.labels = [self.class2idx[i] for i in self.labels]
        #print(self.labels)
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("L")
        label = self.labels[idx]
        
        if self.transform is not None:
            image = self.transform(image)
        
        return image, label

#### DEFINING TRANSFORMS

In [None]:
transform = transforms.Compose(
    [
    transforms.Resize((48, 48)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.GaussianBlur(kernel_size=5, sigma=(0.1, 2.0)),
    transforms.ToTensor(),
]
)

#### INITIALIZATIONS

In [None]:
RANDOM_SEED = 46
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 128

#### SPLITTIING THE TRAIN DATASET USING RANDOM SPLIT TO GET VALIDATION SET

In [None]:
train_dataset = CustomDataset(root_dir=train_dir, transform=transform)

In [None]:
train_size = int(0.9 * len(train_dataset))
val_size = len(train_dataset) - train_size

In [None]:
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

In [None]:
test_dataset = CustomDataset(root_dir=test_dir, transform=transform)

#### LOADING THE DATA WITH THE DATALOADER

In [None]:
train_dataloader = DataLoader(train_dataset,
                              batch_size=BATCH_SIZE,
                              shuffle=True
                              )

In [None]:
val_dataloader = DataLoader(val_dataset,
                           batch_size=BATCH_SIZE,
                           shuffle=False)

In [None]:
test_dataloader = DataLoader(test_dataset,
                             batch_size=BATCH_SIZE, 
                             shuffle=False)

In [None]:
# checking the shape
first_batch = next(iter(train_dataloader))

In [None]:
for batch_idx, (inputs, labels) in enumerate(train_dataloader):
    print(f"Batch {batch_idx + 1}:")
    print(f"Input shape: {inputs.shape}, Labels shape: {labels.shape}")
    break

In [None]:
images, labels = first_batch

In [None]:
images.shape

In [None]:
labels.shape

#### HYPERPARAMETER

In [None]:
in_channels = 1
num_labels = 7
learning_rate = 1e-4
NUM_EPOCHS = 50

## RESNET MODEL(PRETRAINED)

In [None]:
class ResNet(nn.Module):
    def __init__(self, num_classes):
        super(ResNet, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        for param in self.resnet.parameters():
            param.requires_grad = True
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        in_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(in_features, num_labels)
        
    def forward(self, x):
        return self.resnet(x)

#### INSTANTIATE THE RESNET MODEL

In [None]:
model = ResNet(num_labels)

In [None]:
model.to(device)

#### LOSS FUNCTION AND OPTIMIZER

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

#### TRAINING

In [None]:
train_losses = []

# EARLY STOPPING PARAMETERS
patience = 5
best_val_loss = float("inf")
current_patience = 0

for epoch in range(NUM_EPOCHS):
    running_loss = 0.0
    print(f"Epoch [{epoch+1}/{NUM_EPOCHS}]")
    
    running_correct = 0
    running_total = 0
    for batch_idx, (inputs, labels) in tqdm(enumerate(train_dataloader), total = len(train_dataloader)):
        #get data to cuda if possible
        inputs = inputs.to(device, dtype = torch.float32)
        labels = labels.to(device, dtype = torch.long) 
        
        # forward pass
        # optimizer.zero_grad()
        running_outputs = model(inputs)
        loss = criterion(running_outputs, labels)
        
        running_loss += loss.item()
        train_losses.append(loss.item())
        
        running_, running_predicted = torch.max(running_outputs, dim=1)
        running_total += labels.size(0)
        running_correct += (running_predicted == labels).sum().item()
        
        #back propagation
        loss.backward()
        
        # gradient descent
        optimizer.step()
        optimizer.zero_grad()
        
    # update learning rate scheduler
    scheduler.step()
        
    epoch_loss = running_loss / len(train_dataloader)
    running_accuracy = 100 * (running_correct / running_total)
    
    # validation accuracy
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0


    with torch.no_grad():
        for batch_idx, (inputs, labels) in tqdm(enumerate(val_dataloader), total=len(val_dataloader)):
            inputs = inputs.to(device, dtype=torch.float32)
            labels = labels.to(device, dtype=torch.long)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            # Get predictions
            _, predicted = torch.max(outputs, dim=1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

        # Calculate the average validation loss after the loop
        val_loss /= len(val_dataloader)
        
        # Check improvement
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            current_patience = 0
        else:
            current_patience += 1

        # Print accuracy and check for early stopping
        val_accuracy = 100 * (val_correct / val_total)
        print(f"Epoch {epoch + 1}, Training Accuracy: {running_accuracy:.2f}%, Validation Accuracy: {val_accuracy:.2f}%, Loss: {epoch_loss:.4f}, Validation Loss: {val_loss:.4f}")

        # If early stopping criteria met
        if current_patience >= patience:
            print(f"Early stopping after {epoch + 1} epochs.")
            break

#### TESTING

In [None]:
test_loss = 0.0
correct_pred = 0.0
total_samples = 0.0
test_losses = []
all_predictions = []
all_labels = []

model.eval()
with torch.no_grad():
    for inputs, labels in test_dataloader:
        inputs = inputs.to(device, dtype = torch.float32)
        labels = labels.to(device, dtype = torch.long)
        
        # forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # get loss
        test_loss = loss.item()
        test_losses.append(test_loss)

        # get prediction
        _, predicted = torch.max(outputs, dim=1)
        total_samples += labels.size(0)

        # calculate accuracy
        correct_pred += (predicted == labels).sum()

        # get predicted and actual labels
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

test_loss /= len(test_dataloader.dataset)
accuracy = correct_pred / total_samples

# view loss and accuracy
print("Test Loss: {:.3f} | Accuracy: {:.3f}".format(test_loss, accuracy))

### CHECKING OTHER EVALUATION METRICS

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [None]:
accuracy = accuracy_score(all_labels, all_predictions)
precision = precision_score(all_labels, all_predictions, average = "weighted")
recall = recall_score(all_labels, all_predictions, average = "weighted")
f1 = f1_score(all_labels, all_predictions, average = "weighted")

#### RESULTS OF RESNET

In [None]:
print(f"Accuracy: {accuracy:.3f}, precision: {precision:.3f}, recall: {recall:.3f}, F1: {f1:.3f}")

#### SUMMARY OF RESNET

In [None]:
report = classification_report(labels, predicted)

#### RESULTS VISUALIZATION

In [None]:
# plotting the epoch-to-train and test trend
plt.figure(figsize = (10, 5))
plt.plot(train_losses, label = "Training loss")
plt.plot(test_losses, label = "Test loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Training and Testing Loss trends")
plt.legend()
plt.grid(True)
plt.show()

## CUSTOM CONVOLUTION NEURAL NETWORK

In [None]:
class ConvolutionNeuralNetwork(nn.Module):
    def __init__(self, in_channels = None,out_channels = None):
        super(ConvolutionNeuralNetwork, self). __init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(32))
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=1, bias=False))
        
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64))
        
        self.relu = nn.ReLU()
        
        self.layer4 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU())
        
        self.avgpool = nn.AvgPool2d(7)
        
        self.fc1 = nn.Linear(128 * 21 * 21, num_labels)
        
    def forward(self, x):
        x = F.relu(self.layer1(x)) 
        x = self.pool1(self.layer2(x))
        x = self.relu(self.layer3(x))
        x = self.layer4(x)
        x = self.avgpool(x)
        # print(x.shape)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        return x

#### INSTANTIATE THE  CNN MODEL

In [None]:
model = ConvolutionNeuralNetwork()

In [None]:
model.to(device)

#### TRAINING

In [None]:
train_losses = []

# EARLY STOPPING PARAMETERS
patience = 5
best_val_loss = float("inf")
current_patience = 0

for epoch in range(NUM_EPOCHS):
    running_loss = 0.0
    print(f"Epoch [{epoch+1}/{NUM_EPOCHS}]")
    
    running_correct = 0
    running_total = 0
    for batch_idx, (inputs, labels) in tqdm(enumerate(train_dataloader), total = len(train_dataloader)):
        #get data to cuda if possible
        inputs = inputs.to(device, dtype = torch.float32)
        labels = labels.to(device, dtype = torch.long) 
        
        # forward pass
        # optimizer.zero_grad()
        running_outputs = model(inputs)
        loss = criterion(running_outputs, labels)
        
        running_loss += loss.item()
        train_losses.append(loss.item())
        
        running_, running_predicted = torch.max(running_outputs, dim=1)
        running_total += labels.size(0)
        running_correct += (running_predicted == labels).sum().item()
        
        #back propagation
        loss.backward()
        
        # gradient descent
        optimizer.step()
        optimizer.zero_grad()
        
    # update learning rate scheduler
    scheduler.step()
        
    epoch_loss = running_loss / len(train_dataloader)
    running_accuracy = 100 * (running_correct / running_total)
    
    # validation accuracy
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0


    with torch.no_grad():
        for batch_idx, (inputs, labels) in tqdm(enumerate(val_dataloader), total=len(val_dataloader)):
            inputs = inputs.to(device, dtype=torch.float32)
            labels = labels.to(device, dtype=torch.long)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            # Get predictions
            _, predicted = torch.max(outputs, dim=1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

        # Calculate the average validation loss after the loop
        val_loss /= len(val_dataloader)
        
        # Check improvement
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            current_patience = 0
        else:
            current_patience += 1

        # Print accuracy and check for early stopping
        val_accuracy = 100 * (val_correct / val_total)
        print(f"Epoch {epoch + 1}, Training Accuracy: {running_accuracy:.2f}%, Validation Accuracy: {val_accuracy:.2f}%, Loss: {epoch_loss:.4f}, Validation Loss: {val_loss:.4f}")

        # If early stopping criteria met
        if current_patience >= patience:
            print(f"Early stopping after {epoch + 1} epochs.")
            break
    print(f"Epoch {epoch+1}, Accuracy: {accuracy:.2f}%")

In [None]:
test_loss = 0.0
correct_pred = 0.0
total_samples = 0.0
test_losses = []
all_predictions = []
all_labels = []

model.eval()
with torch.no_grad():
    for inputs, labels in test_dataloader:
        inputs = inputs.to(device, dtype = torch.float32)
        labels = labels.to(device, dtype = torch.long)
        
        # forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # get loss
        test_loss = loss.item()
        test_losses.append(test_loss)

        # get prediction
        _, predicted = torch.max(outputs, dim=1)
        total_samples += labels.size(0)

        # calculate accuracy
        correct_pred += (predicted == labels).sum()

        # get predicted and actual labels
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

test_loss /= len(test_dataloader.dataset)
accuracy = correct_pred / total_samples

# view loss and accuracy
print("Test Loss: {:.3f} | Accuracy: {:.3f}".format(test_loss, accuracy))

### CHECKING OTHER EVALUATION METRICS

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [None]:
accuracy = accuracy_score(all_labels, all_predictions)
precision = precision_score(all_labels, all_predictions, average = "weighted")
recall = recall_score(all_labels, all_predictions, average = "weighted")
f1 = f1_score(all_labels, all_predictions, average = "weighted")

#### RESULTS OF THE CNN MODEL

In [None]:
print(f"Accuracy: {accuracy:.3f}, precision: {precision:.3f}, recall: {recall:.3f}, F1: {f1:.3f}")

#### SUMMARY OF CNN

In [None]:
report = classification_report(labels, predicted)

#### RESULTS VISUALIZATION

In [None]:
# plotting the epoch-to-train and test trend
plt.figure(figsize = (10, 5))
plt.plot(train_losses, label = "Training loss")
plt.plot(test_losses, label = "Test loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Training and Testing Loss trends")
plt.legend()
plt.grid(True)
plt.show()