In [1]:
!pip install torch torchvision opencv-python pandas scikit-learn seaborn

Collecting torchvision
  Downloading torchvision-0.21.0-cp312-cp312-win_amd64.whl.metadata (6.3 kB)
Collecting opencv-python
  Downloading opencv_python-4.11.0.86-cp37-abi3-win_amd64.whl.metadata (20 kB)
Collecting torch
  Downloading torch-2.6.0-cp312-cp312-win_amd64.whl.metadata (28 kB)
Downloading torchvision-0.21.0-cp312-cp312-win_amd64.whl (1.6 MB)
   ---------------------------------------- 0.0/1.6 MB ? eta -:--:--
   ---------------------------------------- 1.6/1.6 MB 20.6 MB/s eta 0:00:00
Downloading torch-2.6.0-cp312-cp312-win_amd64.whl (204.1 MB)
   ---------------------------------------- 0.0/204.1 MB ? eta -:--:--
   - -------------------------------------- 6.6/204.1 MB 40.3 MB/s eta 0:00:05
   -- ------------------------------------- 11.5/204.1 MB 26.7 MB/s eta 0:00:08
   --- ------------------------------------ 19.4/204.1 MB 30.6 MB/s eta 0:00:07
   ----- ---------------------------------- 26.5/204.1 MB 30.5 MB/s eta 0:00:06
   ------ --------------------------------- 33.

In [2]:
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import pandas as pd
import numpy as np
import cv2
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns

# Set up transform
transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.ToTensor()
])

# Dataset class
class VideoDataset(Dataset):
    def __init__(self, data, video_dir, num_frames=16):
        self.data = data.reset_index(drop=True)
        self.data.columns = self.data.columns.str.strip()
        self.video_dir = video_dir
        self.num_frames = num_frames
        self.transform = transform

    def load_video(self, path):
        cap = cv2.VideoCapture(path)
        frames = []
        while len(frames) < self.num_frames:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = Image.fromarray(frame)
            frames.append(self.transform(frame))
        cap.release()
        while len(frames) < self.num_frames:
            frames.append(frames[-1])
        return torch.stack(frames[:self.num_frames])

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        video_path = os.path.join(self.video_dir, row["ClipID"])
        video = self.load_video(video_path).permute(1, 0, 2, 3)
        labels = torch.tensor([row[col] for col in ["Boredom", "Engagement", "Confusion", "Frustration"]], dtype=torch.long)
        return video, labels

    def __len__(self):
        return len(self.data)

# Coral Layer and Loss
class CoralLayer(nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.fc = nn.Linear(input_dim, num_classes - 1)
        self.bias = nn.Parameter(torch.zeros(num_classes - 1))

    def forward(self, x):
        return self.fc(x) + self.bias

def coral_loss(logits, levels):
    prob = torch.sigmoid(logits)
    loss = 0
    for k in range(prob.shape[1]):
        pk = prob[:, k]
        lk = (levels > k).float()
        loss += nn.BCELoss()(pk, lk)
    return loss

# Model
class EmotionOrdinalModel(nn.Module):
    def __init__(self, base_model, num_outputs=4, num_classes=5):
        super().__init__()
        self.backbone = base_model
        self.pool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.heads = nn.ModuleList([CoralLayer(512, num_classes) for _ in range(num_outputs)])

    def forward(self, x):
        x = self.backbone.stem(x)
        x = self.backbone.layer1(x)
        x = self.backbone.layer2(x)
        x = self.backbone.layer3(x)
        x = self.backbone.layer4(x)
        x = self.pool(x).flatten(1)
        return [head(x) for head in self.heads]

def coral_to_label(logits):
    prob = torch.sigmoid(logits)
    return torch.sum(prob > 0.5, dim=1)

def evaluate(model, dataloader, device):
    model.eval()
    correct = [0] * 4
    total = 0
    all_preds = [[] for _ in range(4)]
    all_labels = [[] for _ in range(4)]

    with torch.no_grad():
        for videos, labels in dataloader:
            videos, labels = videos.to(device), labels.to(device)
            logits_list = model(videos)
            preds = [coral_to_label(logits) for logits in logits_list]
            for i in range(4):
                correct[i] += (preds[i] == labels[:, i]).sum().item()
                all_preds[i].extend(preds[i].cpu().numpy())
                all_labels[i].extend(labels[:, i].cpu().numpy())
            total += labels.size(0)

    acc = [100 * c / total for c in correct]
    return acc, all_preds, all_labels

def save_confusion_matrix(y_true, y_pred, emotion, out_dir):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.title(f'Confusion Matrix - {emotion}')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.savefig(os.path.join(out_dir, f'{emotion}_confusion_matrix.png'))
    plt.close()

def train(model, train_loader, val_loader, optimizer, device, out_dir='results', epochs=10):
    os.makedirs(out_dir, exist_ok=True)
    best_avg_acc = 0
    log = []

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for videos, labels in train_loader:
            videos, labels = videos.to(device), labels.to(device)
            optimizer.zero_grad()
            logits_list = model(videos)
            loss = sum(coral_loss(logits, labels[:, i]) for i, logits in enumerate(logits_list))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        val_acc, all_preds, all_labels = evaluate(model, val_loader, device)
        avg_acc = np.mean(val_acc)

        if avg_acc > best_avg_acc:
            best_avg_acc = avg_acc
            torch.save(model.state_dict(), os.path.join(out_dir, 'best_model.pt'))

        log.append({
            'epoch': epoch + 1,
            'train_loss': avg_loss,
            'val_acc_boredom': val_acc[0],
            'val_acc_engagement': val_acc[1],
            'val_acc_confusion': val_acc[2],
            'val_acc_frustration': val_acc[3],
            'avg_val_acc': avg_acc
        })

        print(f"Epoch {epoch+1} | Loss: {avg_loss:.4f} | Val Acc: {val_acc}")
        torch.save(model.state_dict(), os.path.join(out_dir, 'last_model.pt'))

    log_df = pd.DataFrame(log)
    log_df.to_csv(os.path.join(out_dir, "training_log.csv"), index=False)

    emotions = ["Boredom", "Engagement", "Confusion", "Frustration "]
    for i, emotion in enumerate(emotions):
        save_confusion_matrix(all_labels[i], all_preds[i], emotion, out_dir)
        report = classification_report(all_labels[i], all_preds[i], digits=3, output_dict=True)
        pd.DataFrame(report).transpose().to_csv(os.path.join(out_dir, f"{emotion}_report.csv"))

    return log_df

# Main
if __name__ == '__main__':
    df = pd.read_csv("data/labels.csv")
    df.columns = df.columns.str.strip()
    train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)
    train_dataset = VideoDataset(train_df, "data/videos")
    val_dataset = VideoDataset(val_df, "data/videos")
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)

    from torchvision.models.video import r3d_18
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    base_model = r3d_18(pretrained=True)
    model = EmotionOrdinalModel(base_model).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    train(model, train_loader, val_loader, optimizer, device)

Downloading: "https://download.pytorch.org/models/r3d_18-b3b3357e.pth" to C:\Users\Mohit Jain/.cache\torch\hub\checkpoints\r3d_18-b3b3357e.pth
100%|███████████████████████████████████████████████████████████████████████████████| 127M/127M [00:04<00:00, 27.8MB/s]


Epoch 1 | Loss: 5.1667 | Val Acc: [42.5, 58.5, 68.0, 77.0]
Epoch 2 | Loss: 3.7789 | Val Acc: [40.5, 60.5, 62.0, 78.0]
Epoch 3 | Loss: 3.5397 | Val Acc: [40.0, 59.5, 62.0, 78.0]
Epoch 4 | Loss: 3.3143 | Val Acc: [44.0, 60.0, 65.5, 74.0]
Epoch 5 | Loss: 3.0096 | Val Acc: [43.5, 55.5, 64.0, 73.0]
Epoch 6 | Loss: 2.7486 | Val Acc: [40.0, 56.5, 62.5, 74.5]
Epoch 7 | Loss: 2.4461 | Val Acc: [42.5, 50.0, 61.0, 75.0]
Epoch 8 | Loss: 2.1545 | Val Acc: [35.5, 54.0, 61.0, 74.0]
Epoch 9 | Loss: 1.8241 | Val Acc: [36.5, 50.0, 61.0, 65.5]
Epoch 10 | Loss: 1.6013 | Val Acc: [37.5, 51.0, 67.5, 73.0]


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [7]:
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision.models.video import r3d_18
from PIL import Image
import cv2
import numpy as np

# Coral Layer for ordinal regression
class CoralLayer(nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.fc = nn.Linear(input_dim, num_classes - 1)
        self.bias = nn.Parameter(torch.zeros(num_classes - 1))

    def forward(self, x):
        return self.fc(x) + self.bias

# Model with CORAL heads
class EmotionOrdinalModel(nn.Module):
    def __init__(self, base_model, num_outputs=4, num_classes=5):
        super().__init__()
        self.backbone = base_model
        self.pool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.heads = nn.ModuleList([CoralLayer(512, num_classes) for _ in range(num_outputs)])

    def forward(self, x):
        x = self.backbone.stem(x)
        x = self.backbone.layer1(x)
        x = self.backbone.layer2(x)
        x = self.backbone.layer3(x)
        x = self.backbone.layer4(x)
        x = self.pool(x).flatten(1)
        return [head(x) for head in self.heads]

# Decode ordinal predictions
def coral_to_label(logits):
    prob = torch.sigmoid(logits)
    return torch.sum(prob > 0.5, dim=1).item()

# Video preprocessing
def preprocess_video(video_path, transform, num_frames=16):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while len(frames) < num_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = Image.fromarray(frame)
        frames.append(transform(frame))
    cap.release()
    while len(frames) < num_frames:
        frames.append(frames[-1])
    video_tensor = torch.stack(frames[:num_frames]).permute(1, 0, 2, 3)
    return video_tensor.unsqueeze(0)

# Run inference on a single video
def infer(video_path, model_path='results/best_model.pt'):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    transform = transforms.Compose([
        transforms.Resize((112, 112)),
        transforms.ToTensor()
    ])

    base_model = r3d_18(pretrained=True)
    model = EmotionOrdinalModel(base_model).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    video_tensor = preprocess_video(video_path, transform).to(device)
    with torch.no_grad():
        logits_list = model(video_tensor)
        emotions = ["Boredom", "Engagement", "Confusion", "Frustration"]
        predictions = {emotions[i]: coral_to_label(logits_list[i]) for i in range(4)}

    return predictions

# Example usage
if __name__ == '__main__':
    video_file = '/home/mjain107/dm/data/test_videos/1100021003.avi'  # Replace with your test video
    preds = infer(video_file)
    print("Predicted Emotion Levels:")
    for k, v in preds.items():
        print(f"{k}: {v}")



Predicted Emotion Levels:
Boredom: 1
Engagement: 3
Confusion: 0
Frustration: 0
