In [None]:
import os
import torch
from torch.utils.data import Dataset
from PIL import Image
from torchvision import transforms
import cv2
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
import torchvision
from torch.utils.data import random_split, DataLoader
from sklearn.metrics import accuracy_score, confusion_matrix, precision_recall_fscore_support

torch.manual_seed(12)

In [None]:
def create_optic(img1, img2):
    if img1 is None or img2 is None:
        print("no files")
        return None

    img1_gray = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
    img2_gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)

    if img1_gray.shape != img2_gray.shape:
        img2_gray = cv2.resize(img2_gray, (img1_gray.shape[1], img1_gray.shape[0]))


    flow = cv2.calcOpticalFlowFarneback(img1_gray, img2_gray, None, 0.5, 3, 19, 5, 7, 1.2, 0)
    magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    hsv = np.zeros((flow.shape[0], flow.shape[1], 3), dtype=np.uint8)
    hsv[..., 0] = angle * 180 / np.pi / 2
    hsv[..., 1] = 255 
    hsv[..., 2] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)
    flow_bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

    return flow_bgr


class SiameseDataset(Dataset):
    def __init__(self, csv_file, image_root_folder, transform=None):

        self.data = []
        self.image_root_folder = image_root_folder
        self.transform = transform
        self.emotion_to_group = {
            "happiness": "positive",
            "disgust": "negative",
            "fear": "negative",
            "surprise": "surprise",
            "repression": "negative",
            "sadness": "negative",
            "others": "other"
        }


        all_groups = set(self.emotion_to_group.values())
        self.group_to_index = {group: idx for idx, group in enumerate(sorted(all_groups))}

        with open(csv_file, 'r') as f:
            next(f)  
            for line in f.readlines():
                _, subject, folder, onset_frame, apex_frame, emotion = line.strip().split(',')
                self.data.append({
                    'subject': subject,
                    'folder': folder,
                    'onset_frame': int(onset_frame),
                    'apex_frame': int(apex_frame),
                    'emotion': emotion
                })

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        entry = self.data[idx]
        folder_path = os.path.join(self.image_root_folder, f"sub{str(entry['subject']).zfill(2)}", f"ROI_{entry['folder']}")

        image1_path = os.path.join(folder_path, f"img{entry['onset_frame']}.jpg")
        image2_path = os.path.join(folder_path, f"img{entry['apex_frame']}.jpg")

        image1 = Image.open(image1_path).convert("RGB") 
        image2 = Image.open(image2_path).convert("RGB") 

        img1_np = np.array(image1)
        img2_np = np.array(image2)


        optic_flow = create_optic(img1_np, img2_np)
        if self.transform:
            image1 = self.transform(image1)
            image2 = self.transform(image2)
            optic_flow = self.transform(Image.fromarray(optic_flow))


        emotion_label = entry['emotion']
        group_label = self.emotion_to_group[emotion_label]
        label_tensor = torch.tensor(self.group_to_index[group_label], dtype=torch.long)
        return optic_flow, label_tensor

In [None]:

class OpticalFlowResNetClassifier(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.2):
        super(OpticalFlowResNetClassifier, self).__init__()
        
        # Backbone: ResNet (with pre-trained weights)
        resnet = models.resnet18(pretrained=True)

        # Optionally freeze ResNet parameters
        # for param in resnet.parameters():
        #     param.requires_grad = False

        self.backbone = nn.Sequential(*list(resnet.children())[:-1]) 
        self.embedding = nn.Sequential(
            nn.Linear(512, 256),  
            nn.ReLU(),
            nn.Dropout(dropout_rate),  
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(dropout_rate)
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(64, num_classes) 
        )

    def forward(self, x):

        x = self.backbone(x)
        x = x.view(x.size(0), -1)
        x = self.embedding(x)
        predictions = self.classifier(x)
        
        return predictions

In [None]:
num_epochs = 25
patience = 6
batch_size = 10
learning_rate = 0.0001

dataset = SiameseDataset(csv_file="../info.csv", image_root_folder="../output_copy", transform=transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    #transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])
    transforms.Normalize(mean = [0.5], std = [0.5])
]))


train_size = int(0.75 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)


device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = OpticalFlowResNetClassifier(num_classes=4,dropout_rate=0.5).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


best_val_loss = float('inf') 
early_stop_counter = 0  

for epoch in range(num_epochs):
    model.train()  
    running_loss = 0.0

    for batch in train_loader:
        optic, labels = batch
        optic, labels = optic.to(device),labels.to(device).long()

        optimizer.zero_grad()
        predictions= model(optic)
        loss = criterion(predictions, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)


    model.eval()  
    val_loss = 0.0
    with torch.no_grad():
        for batch in val_loader:
            input1, labels = batch
            input1, labels = input1.to(device), labels.to(device).long()

            predictions= model(input1)
            loss = criterion(predictions, labels)
            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_loader)

    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {avg_train_loss:.4f} - Val Loss: {avg_val_loss:.4f}")


    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        early_stop_counter = 0
        torch.save(model.state_dict(), "best_optic_model.pth")
    else:
        early_stop_counter += 1

    if early_stop_counter >= patience:
        print(f"Early stopping at epoch {epoch+1}")
        break

In [None]:
model.load_state_dict(torch.load("opticflow_model.pth"))

def evaluate_model(model, val_loader, device):
    model.eval()  

    all_labels = []
    all_predictions = []

    with torch.no_grad(): 
        for batch in val_loader:
            input1, labels = batch
            input1, labels = input1.to(device), labels.to(device)

            predictions = model(input1)
            probabilities = F.softmax(predictions, dim=1)
            predicted_classes = torch.argmax(probabilities, dim=1)

            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted_classes.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_predictions, average='weighted')
    conf_matrix = confusion_matrix(all_labels, all_predictions)

    results = {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "confusion_matrix": conf_matrix
    }

    return results


results = evaluate_model(model, val_loader, device)
print("Accuracy:", results['accuracy'])
print("Precision:", results['precision'])
print("Recall:", results['recall'])
print("F1 Score:", results['f1_score'])
print("Confusion Matrix:\n", results['confusion_matrix'])