# Facial Expression Detection using Graph Neural Networks

## Steps Involved

1. **Data Acquisition and Load the Data**  
   Collect facial expression datasets containing landmark points and load them into your working environment.

2. **Data Preprocessing**  
   Clean the data by handling missing values, removing outliers, and ensuring the data is in the correct format for analysis.

3. **Normalization (Min-Max Normalization)**  
   Normalize the landmark points to a specific range (e.g., [0, 1]) to ensure that all features contribute equally to the model training.

4. **Graph Connection (Adjacency Matrix)**  
   Create an adjacency matrix to represent the connections between landmark points, defining the graph structure for the GNN.

5. **Graph Neural Network**  
   Design and implement a Graph Neural Network architecture suitable for processing the graph representation of facial landmarks.

6. **Training and Validation**  
   Split the dataset into training and validation sets. Train the GNN on the training set and validate its performance on the validation set.

7. **Confusion Matrix and Accuracy Metrics**  
   Evaluate the model's performance using a confusion matrix and other accuracy metrics (e.g., precision, recall, F1-score).

8. **Testing**  
   Test the trained model on a separate test dataset to assess its generalization capability.

9. **Performance Evaluation**  
   Analyze the performance of the model based on the test results, discussing strengths and weaknesses, and potential areas for improvement.

## Additional Considerations

- Consider data augmentation techniques to increase the diversity of the training dataset.
- Experiment with different GNN architectures and hyperparameters to optimize performance.
- Implement cross-validation to ensure the robustness of the model evaluation.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool, global_max_pool
from torch.utils.data import Dataset
# from torch_geometric.loader import DataLoader
from torch_geometric.data import DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch_geometric.data import Data
from scipy.spatial import Delaunay
import ast 

In [None]:
df = pd.read_csv('tif_df_train.csv')
df.head()

In [None]:
from sklearn.preprocessing import LabelEncoder

emotion_list = ["Angry", "Disgust", "Fear", "Happiness", "Neutral", "Sad", "Surprised"]

# Initialize the label encoder
encoder = LabelEncoder()

# Fit and transform the emotion list
df['encoded_label'] = encoder.fit_transform(df['Labels'])

print(df.head())

In [None]:
df.info()

In [None]:
# df['Landmarks'] = df['Landmarks'].apply(ast.literal_eval)

In [None]:
# import pandas as pd
# from sklearn.preprocessing import MinMaxScaler

# # Initialize MinMaxScaler
# scaler = MinMaxScaler()

# df['Norm_Landmarks'] = None 

# # Loop through each row in the 'Landmarks' column
# for i in range(len(df)):
#     x_value, y_value = [], []
#     normalized_data = []

#     # Extract x, y pairs from the current row's landmarks
#     for x, y in df['Landmarks'][i]:
#         # print(f"x = {x} and y = {y}")
#         x_value.append(x)
#         y_value.append(y)

#     # Normalize the x and y values
#     x_value = np.array(x_value).reshape(-1, 1)
#     y_value = np.array(y_value).reshape(-1, 1)

#     # Apply MinMaxScaler on x and y values
#     x_norm_value = scaler.fit_transform(x_value)
#     y_norm_value = scaler.fit_transform(y_value)

#     # Combine normalized x and y values
#     for x_norm, y_norm in zip(x_norm_value, y_norm_value):
#         normalized_data.append([x_norm[0], y_norm[0]])

#     # Store the normalized data back into the 'Norm_Landmarks' column
#     df.at[i, 'Norm_Landmarks'] = normalized_data

# # Display the DataFrame with normalized landmarks
# print(df['Norm_Landmarks'])

In [None]:
import numpy as np
from scipy.spatial import Delaunay

def delaunay_triangulation(landmarks):
    """
    Perform Delaunay triangulation and return triangles and edges.
    """
    tri = Delaunay(landmarks)
    triangles = tri.simplices
    edges = set()
    for triangle in triangles:
        for i in range(3):
            edge = tuple(sorted([triangle[i], triangle[(i + 1) % 3]]))
            edges.add(edge)
    return triangles, list(edges)

def construct_adjacency_matrix(landmarks):
    """
    Construct an adjacency matrix from the Delaunay triangulation of the landmarks.
    
    Args:
        landmarks (ndarray): Array of 2D points (n, 2) representing the landmarks.
    
    Returns:
        adjacency_matrix (ndarray): A binary adjacency matrix of shape (n, n).
    """
    _, edges = delaunay_triangulation(landmarks)
    num_landmarks = len(landmarks)
    adjacency_matrix = np.zeros((num_landmarks, num_landmarks), dtype=int)
    
    for edge in edges:
        i, j = edge
        adjacency_matrix[i, j] = 1
        adjacency_matrix[j, i] = 1  # Symmetric for undirected graph

    return adjacency_matrix

# Example usage
landmarks = np.array([[0, 0], [1, 0], [0, 1], [1, 1], [0.5, 0.5]])  # Sample landmarks
adjacency_matrix = construct_adjacency_matrix(landmarks)
print("Adjacency Matrix:")
print(adjacency_matrix)

In [None]:
# print(adj_mat.shape)

In [None]:
# landmark_1 = np.array(df['Landmarks'][1])
# adj_mat = construct_adjacency_matrix(landmark_1)
# print("Adjacency Matrix:")
# print(adj_mat)

In [None]:
def normalize_adjacency_matrix(A):
    A = A + np.eye(A.shape[0])  # Add self-connections
    D = np.diag(np.sum(A, axis=1))
    D_inv_sqrt = np.linalg.inv(np.sqrt(D))
    A_normalized = D_inv_sqrt @ A @ D_inv_sqrt
    return A_normalized


# adj_mat_norm = normalize_adjacency_matrix(adj_mat)
# print(adj_mat_norm)

In [None]:
# row_sum = [sum(row) for row in adj_mat]    

# for s in row_sum:
#     print(s, end=', ')

# print("\nNumber of Rows")
# print(len(row_sum))

In [None]:
from sklearn.model_selection import train_test_split

X = df['Landmarks']
y = df['encoded_label']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print(f" training shape = > {X_train.shape}, {y_train.shape}")
print(f" testing shape = > {X_test.shape}, {y_test.shape}")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool, global_max_pool

# Define the GCN-based model
class GCNModel(nn.Module):
    def __init__(self, num_nodes=51, num_features=2, num_classes=7):
        super(GCNModel, self).__init__()
        
        # Convolutional layers
        self.conv1 = GCNConv(num_features, 64)
        self.conv2 = GCNConv(64, 128)
        self.conv3 = GCNConv(128, 256)
        self.conv4 = GCNConv(256, 512)
        self.conv5 = GCNConv(512, 1024)
        self.conv6 = GCNConv(1024, 2048)
        
        # Pooling layers
        self.pool1 = global_mean_pool
        self.pool2 = global_max_pool
        
        # Fully connected layers after flattening
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)
        
    def forward(self, x, edge_index, batch):
        # Apply GCN layers with ReLU activations
        x = F.relu(self.conv1(x, edge_index))
        x = F.relu(self.conv2(x, edge_index))
        x = F.relu(self.conv3(x, edge_index))
        x = F.relu(self.conv4(x, edge_index))
        x = F.relu(self.conv5(x, edge_index))
        # x = F.relu(self.conv6(x, edge_index))
        
        # Pooling layers
        x = self.pool1(x, batch)  # Global mean pooling
        # x = self.pool2(x, batch)  # Global max pooling

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x

# Example of input:
# x (node features) shape = [num_nodes, num_features] = [68, 2]
# edge_index (adjacency matrix) shape = [2, num_edges] - the index of connected nodes
# batch: indicates which nodes belong to which graph (for multi-graph input)

In [None]:
class LandmarkModel(nn.Module):
    def __init__(self, input_size=136, hidden_size=256, num_classes=5):
        """
        Initializes the model. 
        - input_size: number of features (landmarks * 2, e.g., 68 landmarks with 2 coordinates per landmark)
        - hidden_size: size of hidden layers
        - num_classes: number of output classes (emotions)
        """
        super(LandmarkModel, self).__init__()

        # Input layer
        self.fc1 = nn.Linear(input_size, hidden_size)  # input_size: 136 for 68 landmarks * 2 (x, y)

        # Hidden layers
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)

        # Output layer (5 classes for emotion classification)
        self.fc4 = nn.Linear(hidden_size, num_classes)

        # Activation function
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)  # Dropout for regularization

    def forward(self, x):
        # Forward pass through the network
        x = self.relu(self.fc1(x))  # First hidden layer
        x = self.dropout(x)  # Apply dropout
        
        x = self.relu(self.fc2(x))  # Second hidden layer
        x = self.dropout(x)  # Apply dropout

        x = self.relu(self.fc3(x))  # Third hidden layer
        x = self.dropout(x)  # Apply dropout

        x = self.fc4(x)  # Output layer
        return x  # Raw scores for each class (for later softmax or cross-entropy)

# Step 2: Model Initialization
model = LandmarkModel(input_size=102, hidden_size=256, num_classes=7)

In [None]:
# # Create a list to store the data
# data_list = []

# for index, row in df.iterrows():
#     # Convert landmarks from string (assuming it's stored as a string of numbers)
#     landmarks = np.fromstring(row['Landmarks'], sep=' ').reshape(68, 2)  # Shape (68, 2)

#     # Convert landmarks to tensor
#     x = torch.tensor(landmarks, dtype=torch.float32)  # Shape [68, 2]

#     # Get the adjacency matrix for this sample
#     adjacency_matrix = generate_adjacency_matrix(row['Landmarks'])
    
#     # Convert adjacency matrix to edge_index (format expected by PyTorch Geometric)
#     edge_index = torch.tensor(np.array(np.nonzero(adjacency_matrix)), dtype=torch.long)  # Shape [2, num_edges]
    
#     # Get the label
#     y = torch.tensor([row['Label']], dtype=torch.long)  # Shape [1]
    
#     # Create Data object for PyG
#     data = Data(x=x, edge_index=edge_index, y=y)

## Import Landmarks and encoded labels 

In [None]:
# # Prepare data
# X = df['Landmarks']
# y = df['encoded_label']
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Function to process landmarks and adjacency matrix
def process_landmarks_and_adjacency(landmarks_str):
    # Parse the string to get the list of landmarks
    landmarks = np.array(ast.literal_eval(landmarks_str))  # Convert string to list using literal_eval
    adjacency_matrix = construct_adjacency_matrix(landmarks)  # Get adjacency matrix
    return landmarks, adjacency_matrix

# Your existing Dataset class definition remains the same:
class LandmarkDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        row = self.X.iloc[idx]
        
        # Process the landmarks and adjacency matrix
        landmarks, adjacency_matrix = process_landmarks_and_adjacency(row)  # Pass the string directly
        
        # Convert landmarks to tensor (Shape: 68, 2)
        x = torch.tensor(landmarks, dtype=torch.float)  
        
        # Convert adjacency matrix to edge_index (Shape: 2, num_edges)
        edge_index = torch.tensor(np.array(np.nonzero(adjacency_matrix)), dtype=torch.long) 
        
        # Get the label for this sample
        y = torch.tensor(self.y.iloc[idx], dtype=torch.long)  # Label
        
        return Data(x=x, edge_index=edge_index, y=y)

In [None]:
class LandmarkDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        row = self.X.iloc[idx]
        
        # Process the landmarks and adjacency matrix
        landmarks, adjacency_matrix = process_landmarks_and_adjacency(row)  # Pass the string directly
        
        # Convert landmarks to tensor (Shape: 68, 2)
        x = torch.tensor(landmarks, dtype=torch.float)  

        
        # Get the label for this sample
        y = torch.tensor(self.y.iloc[idx], dtype=torch.long)  # Label
        return x, y

In [None]:
# import torch
# import numpy as np
# import ast
# from torch.utils.data import Dataset, DataLoader
# from torch_geometric.data import Data

# # Function to process landmarks and adjacency matrix
# def process_landmarks_and_adjacency(landmarks_str):
#     # Parse the string to get the list of landmarks
#     print(f"landmarks_str: {landmarks_str}")  # Print input landmarks string
#     landmarks = np.array(ast.literal_eval(landmarks_str))  # Convert string to list using literal_eval
#     print(f"landmarks (after eval): {landmarks}")  # Print landmarks after parsing
    
#     adjacency_matrix = construct_adjacency_matrix(landmarks)  # Get adjacency matrix
#     print(f"adjacency_matrix: {adjacency_matrix}")  # Print adjacency matrix
    
#     return landmarks, adjacency_matrix

# # Your existing Dataset class definition remains the same:
# class LandmarkDataset(Dataset):
#     def __init__(self, X, y):
#         self.X = X
#         self.y = y

#     def __len__(self):
#         return len(self.X)

#     def __getitem__(self, idx):
#         row = self.X.iloc[idx]
#         print(f"Row {idx}: {row}")  # Print the row from X for debugging
        
#         # Process the landmarks and adjacency matrix
#         landmarks, adjacency_matrix = process_landmarks_and_adjacency(row)  # Pass the string directly
        
#         # Convert landmarks to tensor (Shape: 68, 2)
#         x = torch.tensor(landmarks, dtype=torch.float)  
#         print(f"x (landmarks as tensor): {x.shape} {x}")  # Print tensor of landmarks
        
#         # Convert adjacency matrix to edge_index (Shape: 2, num_edges)
#         edge_index = torch.tensor(np.array(np.nonzero(adjacency_matrix)), dtype=torch.long) 
#         print(f"edge_index (nonzero positions): {edge_index.shape} {edge_index}")  # Print edge_index
        
#         # Get the label for this sample
#         y = torch.tensor(self.y.iloc[idx], dtype=torch.long)  # Label
#         print(f"y (label): {y}")  # Print label
        
#         return Data(x=x, edge_index=edge_index, y=y)

# # Create DataLoader from PyTorch Geometric
# train_dataset = LandmarkDataset(X_train, y_train)
# test_dataset = LandmarkDataset(X_test, y_test)

# # Use the PyTorch Geometric DataLoader
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# # Debugging the DataLoader and the first batch
# for batch in train_loader:
#     print(f"Batch x: {batch.x.shape}, Batch edge_index: {batch.edge_index.shape}, Batch y: {batch.y.shape}")
#     break  # Print only the first batch

In [None]:
# Create DataLoader from PyTorch Geometric
train_dataset = LandmarkDataset(X_train, y_train)
test_dataset = LandmarkDataset(X_test, y_test)


# Use the PyTorch Geometric DataLoader
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Debugging the DataLoader and the first batch
# for batch in train_loader:
#     print(f"Batch x: {batch.x.shape}, Batch edge_index: {batch.edge_index.shape}, Batch y: {batch.y.shape}")
#     break  # Print only the first batch

In [None]:
# # Debugging the full train_loader
# for batch_idx, batch in enumerate(train_loader):
#     print(f"\nBatch {batch_idx + 1}:")
#     print(f"x shape: {batch.x.shape}, x: {batch.x}")
#     # print(f"edge_index shape: {batch.edge_index.shape}, edge_index: {batch.edge_index}")
#     # print(f"y shape: {batch.y.shape}, y: {batch.y}")
#     break

In [None]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)  # Probabilities of the correct class
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss

# Use Focal Loss instead of CrossEntropyLoss
criterion = FocalLoss(alpha=1, gamma=2, reduction='mean')

In [None]:
# Initialize model
# model = GCNModel(num_nodes=51, num_features=2, num_classes=7).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# criterion = nn.CrossEntropyLoss()

In [None]:
# Training loop
def train(model, loader, optimizer, criterion, device):
    model.train()
    train_loss = 0.0
    all_labels = []
    all_preds = []
    for batch in loader:
        batch = batch.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        out = model(batch.x, batch.edge_index, batch.batch)
        
        # Compute loss
        loss = criterion(out, batch.y)
                
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        # total_loss += loss.item()
        train_loss += loss.item() * batch.num_graphs  # Loss for the batch

        # Collect predictions and labels for metrics
        _, preds = torch.max(out, dim=1)
        all_labels.append(batch.y.cpu().numpy())
        all_preds.append(preds.cpu().numpy())

    train_loss /= len(train_loader.dataset)  # Average loss over the dataset

    # Flatten lists
    all_labels = np.concatenate(all_labels)
    all_preds = np.concatenate(all_preds)

    # Calculate accuracy, precision, recall, and F1 score
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f"Train Loss: {train_loss:.4f}") #, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

    return train_loss, accuracy, precision, recall, f1

In [None]:
if __name__ == '__main__':
    # Initialize model, optimizer, loss function, etc.
    # model = GCNModel()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
#     criterion = nn.CrossEntropyLoss()

    # Put the model on device (GPU if available)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    # Initialize the DataLoader and training loop
    train_losses = []
    test_losses = []
    accuracies = []
    precisions = []
    recalls = []
    f1_scores = []

    for epoch in range(1, 101):  # 100 epochs
            # print(f"Epoch {epoch}")
    
            # Train
            train_loss, accuracy, precision, recall, f1 = train(model, train_loader, optimizer, criterion, device)
            train_losses.append(train_loss)
            accuracies.append(accuracy)
            precisions.append(precision)
            recalls.append(recall)
            f1_scores.append(f1)

    print("train_loss")
    print (train_loss)
    print("accuracies")
    print(accuracies)
    print("precison")
    print(precision)
    print("recalls")
    print(recalls)
    print("f1_Score")
    print(f1_scores)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool, global_max_pool
from torch.utils.data import Dataset
from torch_geometric.data import DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch_geometric.data import Data
from scipy.spatial import Delaunay
import ast 

import warnings

# Suppress specific warning by category (e.g., deprecation warnings)
warnings.filterwarnings("ignore", category=UserWarning, message=".*deprecated.*")

# Load the dataframe
df = pd.read_csv('tif_df_train.csv')

# Initialize the label encoder
emotion_list = ["Angry", "Disgust", "Fear", "Happiness", "Neutral", "Sad", "Surprised"]
encoder = LabelEncoder()
df['encoded_label'] = encoder.fit_transform(df['Labels'])

# Function to perform Delaunay triangulation
def delaunay_triangulation(landmarks):
    tri = Delaunay(landmarks)
    triangles = tri.simplices
    edges = set()
    for triangle in triangles:
        for i in range(3):
            edge = tuple(sorted([triangle[i], triangle[(i + 1) % 3]]))
            edges.add(edge)
    return triangles, list(edges)

# Function to construct adjacency matrix from Delaunay triangulation
def construct_adjacency_matrix(landmarks):
    _, edges = delaunay_triangulation(landmarks)
    num_landmarks = len(landmarks)
    adjacency_matrix = np.zeros((num_landmarks, num_landmarks), dtype=int)
    
    for edge in edges:
        i, j = edge
        adjacency_matrix[i, j] = 1
        adjacency_matrix[j, i] = 1  # Symmetric for undirected graph

    return adjacency_matrix

# # Convert the landmarks into numpy arrays and adjacency matrices
# def process_landmarks_and_adjacency(row):
#     landmarks = np.fromstring(row['Landmarks'], sep=' ').reshape(68, 2)  # Shape (68, 2)
#     adjacency_matrix = construct_adjacency_matrix(landmarks)  # Get adjacency matrix
#     return landmarks, adjacency_matrix

# Prepare data
X = df['Landmarks']
y = df['encoded_label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Function to process landmarks and adjacency matrix
def process_landmarks_and_adjacency(landmarks_str):
    # Parse the string to get the list of landmarks
    landmarks = np.array(ast.literal_eval(landmarks_str))  # Convert string to list using literal_eval
    adjacency_matrix = construct_adjacency_matrix(landmarks)  # Get adjacency matrix
    return landmarks, adjacency_matrix

# Your existing Dataset class definition remains the same:
class LandmarkDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        row = self.X.iloc[idx]
        
        # Process the landmarks and adjacency matrix
        landmarks, adjacency_matrix = process_landmarks_and_adjacency(row)  # Pass the string directly
        
        # Convert landmarks to tensor (Shape: 68, 2)
        x = torch.tensor(landmarks, dtype=torch.float)  
        
        # Convert adjacency matrix to edge_index (Shape: 2, num_edges)
        edge_index = torch.tensor(np.array(np.nonzero(adjacency_matrix)), dtype=torch.long) 
        
        # Get the label for this sample
        y = torch.tensor(self.y.iloc[idx], dtype=torch.long)  # Label
        
        return Data(x=x, edge_index=edge_index, y=y)
        
# Create DataLoader from PyTorch Geometric
train_dataset = LandmarkDataset(X_train, y_train)
test_dataset = LandmarkDataset(X_test, y_test)

# Use the PyTorch Geometric DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define the GCN-based model
class GCNModel(nn.Module):
    def __init__(self, num_nodes=68, num_features=2, num_classes=7):
        super(GCNModel, self).__init__()
        
        # Convolutional layers
        self.conv1 = GCNConv(num_features, 64)
        self.conv2 = GCNConv(64, 128)
        self.conv3 = GCNConv(128, 256)
        self.conv4 = GCNConv(256, 512)
        self.conv5 = GCNConv(512, 1024)
        self.conv6 = GCNConv(1024, 2048)
        
        # Fully connected layers after flattening
        self.fc1 = nn.Linear(2048, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)
        
    def forward(self, x, edge_index, batch):
        # Apply GCN layers with ReLU activations
        x = F.relu(self.conv1(x, edge_index))
        x = F.relu(self.conv2(x, edge_index))
        x = F.relu(self.conv3(x, edge_index))
        x = F.relu(self.conv4(x, edge_index))
        x = F.relu(self.conv5(x, edge_index))
        
        # Pooling layers - applying both mean and max pooling
        x_mean = global_mean_pool(x, batch)  # Global mean pooling
        x_max = global_max_pool(x, batch)    # Global max pooling
        
        # Concatenate the pooled results
        x = torch.cat([x_mean, x_max], dim=1)
        
        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x
        
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

# Initialize model
model = GCNModel(num_nodes=51, num_features=2, num_classes=7).to(device)

# Optimizer and Loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# criterion = nn.CrossEntropyLoss()

# Training loop
def train(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    for batch in loader:
        batch = batch.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        out = model(batch.x, batch.edge_index, batch.batch)
        
        # Compute loss
        loss = criterion(out, batch.y)
        total_loss += loss.item()
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
    
    return total_loss / len(loader)

# Testing loop
def test(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in loader:
            batch = batch.to(device)
            out = model(batch.x, batch.edge_index, batch.batch)
            _, predicted = torch.max(out, dim=1)
            correct += (predicted == batch.y).sum().item()
            total += batch.y.size(0)
    
    return correct / total

# Training and evaluation
epochs = 50
for epoch in range(epochs):
    train_loss = train(model, train_loader, optimizer, criterion)
    test_acc = test(model, test_loader)
    
    print(f'Epoch {epoch+1}/{epochs}, Loss: {train_loss:.4f}, Test Accuracy: {test_acc*100:.2f}%')

In [None]:
def plot_confusion_matrix(y_true, y_pred, labels):
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title('Confusion Matrix')
    plt.show()

# Training and evaluation
epochs = 50
for epoch in range(epochs):
    train_loss = train(model, train_loader, optimizer, criterion)
    y_true, y_pred = test(model, test_loader)
    test_acc = (np.array(y_true) == np.array(y_pred)).mean() * 100
    
    print(f'Epoch {epoch+1}/{epochs}, Loss: {train_loss:.4f}, Test Accuracy: {test_acc:.2f}%')

    if epoch % 10 == 0:  # Every 10 epochs, plot confusion matrix
        plot_confusion_matrix(y_true, y_pred, emotion_list)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool, global_max_pool
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
# from torch_geometric.data import DataLoader
from torch_geometric.loader import DataLoader

# Define the GCN-based model
class GCNModel(nn.Module):
    def __init__(self, num_nodes=68, num_features=2, num_classes=7):
        super(GCNModel, self).__init__()

        # Convolutional layers
        self.conv1 = GCNConv(num_features, 64)
        self.conv2 = GCNConv(64, 128)
        self.conv3 = GCNConv(128, 256)
        self.conv4 = GCNConv(256, 512)
        self.conv5 = GCNConv(512, 1024)

        # Fully connected layers after flattening
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)

    def forward(self, x, edge_index, batch):
        # Apply GCN layers with ReLU activations
        x = F.relu(self.conv1(x, edge_index))
        x = F.relu(self.conv2(x, edge_index))
        x = F.relu(self.conv3(x, edge_index))
        x = F.relu(self.conv4(x, edge_index))
        x = F.relu(self.conv5(x, edge_index))

        # Pooling layers
        x = global_mean_pool(x, batch)  # Global mean pooling
        # x = global_max_pool(x, batch)  # Global max pooling

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)

        return x

# Training function with loss curve
def train(model, train_loader, optimizer, criterion, device):
    model.train()
    train_loss = 0.0
    all_labels = []
    all_preds = []

    for batch in train_loader:
        optimizer.zero_grad()

        # Move to device
        batch = batch.to(device)

        # Forward pass
        out = model(batch.x, batch.edge_index, batch.batch)
        loss = criterion(out, batch.y)

        # Backward pass
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * batch.num_graphs  # Loss for the batch

        # Collect predictions and labels for metrics
        _, preds = torch.max(out, dim=1)
        all_labels.append(batch.y.cpu().numpy())
        all_preds.append(preds.cpu().numpy())

    train_loss /= len(train_loader.dataset)  # Average loss over the dataset

    # Flatten lists
    all_labels = np.concatenate(all_labels)
    all_preds = np.concatenate(all_preds)

    # Calculate accuracy, precision, recall, and F1 score
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f"Train Loss: {train_loss:.4f}, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

    return train_loss, accuracy, precision, recall, f1


# Evaluation function with confusion matrix
def evaluate(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0.0
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for batch in test_loader:
            batch = batch.to(device)

            # Forward pass
            out = model(batch.x, batch.edge_index, batch.batch)
            loss = criterion(out, batch.y)
            test_loss += loss.item() * batch.num_graphs

            # Collect predictions and labels for metrics
            _, preds = torch.max(out, dim=1)
            all_labels.append(batch.y.cpu().numpy())
            all_preds.append(preds.cpu().numpy())

    test_loss /= len(test_loader.dataset)  # Average loss over the dataset

    # Flatten lists
    all_labels = np.concatenate(all_labels)
    all_preds = np.concatenate(all_preds)

    # Calculate accuracy, precision, recall, and F1 score
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f"Test Loss: {test_loss:.4f}, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

    # Generate confusion matrix
    cm = confusion_matrix(all_labels, all_preds)

    return test_loss, accuracy, precision, recall, f1, cm


# Main code to train and evaluate the model
if __name__ == "__main__":
    # Initialize model, optimizer, loss function, etc.
    model = GCNModel()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    # Put the model on device (GPU if available)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    # Initialize the DataLoader and training loop
    train_losses = []
    test_losses = []
    accuracies = []
    precisions = []
    recalls = []
    f1_scores = []

    # Assuming you have `train_loader` and `test_loader` already defined
    for epoch in range(1, 101):  # 100 epochs
        print(f"Epoch {epoch}")

        # Train
        train_loss, accuracy, precision, recall, f1 = train(model, train_loader, optimizer, criterion, device)
        train_losses.append(train_loss)
        accuracies.append(accuracy)
        precisions.append(precision)
        recalls.append(recall)
        f1_scores.append(f1)

        # Evaluate
        test_loss, accuracy, precision, recall, f1, cm = evaluate(model, test_loader, criterion, device)
        test_losses.append(test_loss)

        # # Plot loss curves
        # plt.plot(train_losses, label='Train Loss')
        # plt.plot(test_losses, label='Test Loss')
        # plt.xlabel('Epochs')
        # plt.ylabel('Loss')
        # plt.legend()
        # plt.title('Loss Curve')
        # plt.show()

        # # Confusion matrix
        # plt.figure(figsize=(10, 7))
        # sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=emotion_list, yticklabels=emotion_list)
        # plt.xlabel('Predicted')
        # plt.ylabel('True')
        # plt.title('Confusion Matrix')
        # plt.show()

    # Plotting performance metrics over epochs
    plt.figure(figsize=(12, 8))
    plt.subplot(2, 2, 1)
    plt.plot(accuracies, label='Accuracy')
    plt.title('Accuracy Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.subplot(2, 2, 2)
    plt.plot(precisions, label='Precision')
    plt.title('Precision Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Precision')
    plt.legend()

    plt.subplot(2, 2, 3)
    plt.plot(recalls, label='Recall')
    plt.title('Recall Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Recall')
    plt.legend()

    plt.subplot(2, 2, 4)
    plt.plot(f1_scores, label='F1 Score')
    plt.title('F1 Score Curve')
    plt.xlabel('Epochs')
    plt.ylabel('F1 Score')
    plt.legend()

    plt.tight_layout()
    plt.show()


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, global_mean_pool, global_max_pool
from torch.utils.data import Dataset
from torch_geometric.data import DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch_geometric.data import Data
from scipy.spatial import Delaunay
import ast
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import warnings

# Suppress specific warning by category (e.g., deprecation warnings)
warnings.filterwarnings("ignore", category=UserWarning, message=".*deprecated.*")

# Load the dataframe
df = pd.read_csv('tif_df_train.csv')

# Initialize the label encoder
emotion_list = ["Angry", "Disgust", "Fear", "Happiness", "Neutral", "Sad", "Surprised"]
encoder = LabelEncoder()
df['encoded_label'] = encoder.fit_transform(df['Labels'])

# Function to perform Delaunay triangulation
def delaunay_triangulation(landmarks):
    tri = Delaunay(landmarks)
    triangles = tri.simplices
    edges = set()
    for triangle in triangles:
        for i in range(3):
            edge = tuple(sorted([triangle[i], triangle[(i + 1) % 3]]))
            edges.add(edge)
    return triangles, list(edges)

# Function to construct adjacency matrix from Delaunay triangulation
def construct_adjacency_matrix(landmarks):
    _, edges = delaunay_triangulation(landmarks)
    num_landmarks = len(landmarks)
    adjacency_matrix = np.zeros((num_landmarks, num_landmarks), dtype=int)
    
    for edge in edges:
        i, j = edge
        adjacency_matrix[i, j] = 1
        adjacency_matrix[j, i] = 1  # Symmetric for undirected graph

    return adjacency_matrix

# Function to process landmarks and adjacency matrix
def process_landmarks_and_adjacency(landmarks_str):
    # Parse the string to get the list of landmarks
    landmarks = np.array(ast.literal_eval(landmarks_str))  # Convert string to list using literal_eval
    adjacency_matrix = construct_adjacency_matrix(landmarks)  # Get adjacency matrix
    return landmarks, adjacency_matrix

# Prepare data
X = df['Landmarks']
y = df['encoded_label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Dataset class for PyTorch
class LandmarkDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        row = self.X.iloc[idx]
        
        # Process the landmarks and adjacency matrix
        landmarks, adjacency_matrix = process_landmarks_and_adjacency(row)  # Pass the string directly
        
        # Convert landmarks to tensor (Shape: 68, 2)
        x = torch.tensor(landmarks, dtype=torch.float)  
        
        # Convert adjacency matrix to edge_index (Shape: 2, num_edges)
        edge_index = torch.tensor(np.array(np.nonzero(adjacency_matrix)), dtype=torch.long) 
        
        # Get the label for this sample
        y = torch.tensor(self.y.iloc[idx], dtype=torch.long)  # Label
        
        return Data(x=x, edge_index=edge_index, y=y)

# Create DataLoader from PyTorch Geometric
train_dataset = LandmarkDataset(X_train, y_train)
test_dataset = LandmarkDataset(X_test, y_test)

# Use the PyTorch Geometric DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define the GCN-based model for multi-label classification
# Define the GCN-based model
class GCNModel(nn.Module):
    def __init__(self, num_nodes=68, num_features=2, num_classes=7):
        super(GCNModel, self).__init__()
        
        # Convolutional layers
        self.conv1 = GCNConv(num_features, 64)
        self.conv2 = GCNConv(64, 128)
        self.conv3 = GCNConv(128, 256)
        self.conv4 = GCNConv(256, 512)
        self.conv5 = GCNConv(512, 1024)
        self.conv6 = GCNConv(1024, 2048)
        
        # Fully connected layers after flattening
        self.fc1 = nn.Linear(2048, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)
        
    def forward(self, x, edge_index, batch):
        # Apply GCN layers with ReLU activations
        x = F.relu(self.conv1(x, edge_index))
        x = F.relu(self.conv2(x, edge_index))
        x = F.relu(self.conv3(x, edge_index))
        x = F.relu(self.conv4(x, edge_index))
        x = F.relu(self.conv5(x, edge_index))
        
        # Pooling layers - applying both mean and max pooling
        x_mean = global_mean_pool(x, batch)  # Global mean pooling
        x_max = global_max_pool(x, batch)    # Global max pooling
        
        # Concatenate the pooled results
        x = torch.cat([x_mean, x_max], dim=1)
        
        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x

# Training function with loss curve
def train(model, train_loader, optimizer, criterion, device):
    model.train()
    train_loss = 0.0
    all_labels = []
    all_preds = []
    
    for batch in train_loader:
        optimizer.zero_grad()
        
        # Move to device
        batch = batch.to(device)
        
        # Forward pass
        out = model(batch.x, batch.edge_index, batch.batch)
        loss = criterion(out, batch.y)
        
        # Backward pass
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * batch.num_graphs  # Loss for the batch
        
        # Collect predictions and labels for metrics
        preds = torch.sigmoid(out)  # Sigmoid for multi-label classification
        all_labels.append(batch.y.cpu().numpy())
        all_preds.append(preds.cpu().numpy())
    
    train_loss /= len(train_loader.dataset)  # Average loss over the dataset
    
    # Flatten lists
    all_labels = np.concatenate(all_labels)
    all_preds = np.concatenate(all_preds)
    
    # Threshold predictions at 0.5 for multi-label classification
    all_preds = (all_preds > 0.5).astype(int)
    
    # Calculate accuracy, precision, recall, and F1 score
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    
    print(f"Train Loss: {train_loss:.4f}, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")
    
    return train_loss, accuracy, precision, recall, f1

# Evaluation function with confusion matrix
def evaluate(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0.0
    all_labels = []
    all_preds = []
    
    with torch.no_grad():
        for batch in test_loader:
            batch = batch.to(device)
            
            # Forward pass
            out = model(batch.x, batch.edge_index, batch.batch)
            loss = criterion(out, batch.y)
            test_loss += loss.item() * batch.num_graphs
            
            # Collect predictions and labels for metrics
            preds = torch.sigmoid(out)  # Sigmoid for multi-label classification
            all_labels.append(batch.y.cpu().numpy())
            all_preds.append(preds.cpu().numpy())
    
    test_loss /= len(test_loader.dataset)  # Average loss over the dataset
    
    # Flatten lists
    all_labels = np.concatenate(all_labels)
    all_preds = np.concatenate(all_preds)
    
    # Threshold predictions at 0.5 for multi-label classification
    all_preds = (all_preds > 0.5).astype(int)
    
    # Calculate accuracy, precision, recall, and F1 score
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    
    print(f"Test Loss: {test_loss:.4f}, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")
    
    # Generate confusion matrix
    cm = confusion_matrix(all_labels.argmax(axis=1), all_preds.argmax(axis=1))
    
    return test_loss, accuracy, precision, recall, f1, cm

# Main code to train and evaluate the model
if __name__ == "__main__":
    # Initialize model, optimizer, loss function, etc.
    model = GCNModel()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    # For multi-label classification, use BCEWithLogitsLoss
    criterion = nn.BCEWithLogitsLoss()
    
    # Put the model on device (GPU if available)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    # Initialize the DataLoader and training loop
    train_losses = []
    test_losses = []
    accuracies = []
    precisions = []
    recalls = []
    f1_scores = []
    
    # Training and evaluation for 100 epochs
    for epoch in range(1, 101):  # 100 epochs
        print(f"Epoch {epoch}")
        
        # Train
        train_loss, accuracy, precision, recall, f1 = train(model, train_loader, optimizer, criterion, device)
        train_losses.append(train_loss)
        accuracies.append(accuracy)
        precisions.append(precision)
        recalls.append(recall)
        f1_scores.append(f1)
        
        # Evaluate
        test_loss, accuracy, precision, recall, f1, cm = evaluate(model, test_loader, criterion, device)
        test_losses.append(test_loss)
        
        # Plot loss curves
        plt.plot(train_losses, label='Train Loss')
        plt.plot(test_losses, label='Test Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.title('Loss Curve')
        plt.show()
        
        # Confusion matrix
        plt.figure(figsize=(10, 7))
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=emotion_list, yticklabels=emotion_list)
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.title('Confusion Matrix')
        plt.show()
        
    # Plotting performance metrics over epochs
    plt.figure(figsize=(12, 8))
    plt.subplot(2, 2, 1)
    plt.plot(accuracies, label='Accuracy')
    plt.title('Accuracy Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.subplot(2, 2, 2)
    plt.plot(precisions, label='Precision')
    plt.title('Precision Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Precision')
    plt.legend()
    
    plt.subplot(2, 2, 3)
    plt.plot(recalls, label='Recall')
    plt.title('Recall Curve')
    plt.xlabel('Epochs')
    plt.ylabel('Recall')
    plt.legend()
    
    plt.subplot(2, 2, 4)
    plt.plot(f1_scores, label='F1 Score')
    plt.title('F1 Score Curve')
    plt.xlabel('Epochs')
    plt.ylabel('F1 Score')
    plt.legend()
    
    plt.tight_layout()
    plt.show()