### **Imports**

In [None]:
import os
import cv2
import numpy as np
import random
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets
from PIL import Image, ImageOps
from tqdm import tqdm
import matplotlib.pyplot as plt
import warnings
import pandas as pd

warnings.filterwarnings("ignore")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

## **Data Loading**

In [None]:
data_root = r'D:\ExpW_dataset\images_cropped_224'
landmark_root = r'D:\ExpW_dataset\landmarks'
splits = ['train', 'val', 'test']
image_extensions = ('.jpg', '.jpeg', '.png')

In [None]:
class HybridFERDataset(Dataset):
    def __init__(self, image_folder, landmark_folder, transform=None):
        self.image_folder = image_folder
        self.landmark_folder = landmark_folder
        self.transform = transform
        self.samples = []
        self.class_to_idx = {}

        self._prepare_dataset()

    def _prepare_dataset(self):
        """Collect all image paths and corresponding landmark paths"""
        for label_name in sorted(os.listdir(self.image_folder)):
            class_dir = os.path.join(self.image_folder, label_name)
            if not os.path.isdir(class_dir):
                continue

            # Assign numeric label
            if label_name not in self.class_to_idx:
                self.class_to_idx[label_name] = len(self.class_to_idx)

            # Find images and corresponding landmarks
            image_paths = [
                os.path.join(class_dir, f)
                for f in os.listdir(class_dir)
                if f.lower().endswith(('.png', '.jpg'))
            ]
            for img_path in image_paths:
                filename = os.path.basename(img_path)
                landmark_path = os.path.join(
                    self.landmark_folder, label_name, filename.replace(".png", ".npy").replace(".jpg", ".npy")
                )
                if os.path.exists(landmark_path):
                    self.samples.append((img_path, landmark_path, self.class_to_idx[label_name]))
                else:
                    print(f"Missing landmark: {landmark_path}")

        print(f"Loaded {len(self.samples)} total samples from '{self.image_folder}'.")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, lm_path, label = self.samples[idx]

        # Load image
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Load landmarks
        landmarks = torch.from_numpy(np.load(lm_path)).float().flatten()

        return image, landmarks, label


## **FTCS Hybrid ResEmoteNet Model**

In [None]:
# Updated SE Block
class SEBlock(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super().__init__()
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels // reduction, in_channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

# Updated Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_ch, out_ch, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3, stride, 1)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, 1, 1)
        self.bn2 = nn.BatchNorm2d(out_ch)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_ch != out_ch:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, 1, stride, 0),
                nn.BatchNorm2d(out_ch)
            )

    def forward(self, x):
        identity = self.shortcut(x)
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x += identity
        return F.relu(x)

# Visual Backbone
class ResEmoteNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, 7, stride=2, padding=3),  # (B, 64, 112, 112)
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)  # (B, 64, 56, 56)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(64, 128, 3, 1, 1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2)  # (B, 128, 28, 28)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(128, 256, 3, 1, 1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2)  # (B, 256, 14, 14)
        )
        self.se = SEBlock(256)
        self.res1 = ResidualBlock(256, 512, 2)  # (B, 512, 7, 7)
        self.res2 = ResidualBlock(512, 1024, 2)  # (B, 1024, 4, 4)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.out_features = 1024

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.se(x)
        x = self.res1(x)
        x = self.res2(x)
        x = self.pool(x).view(x.size(0), -1)
        return x

# Landmark Branch with optional reconstruction head
class LandmarkBranch(nn.Module):
    def __init__(self, output_reconstruction=True):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(1404, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.3)
        )
        self.output_reconstruction = output_reconstruction
        if output_reconstruction:
            self.decoder = nn.Sequential(
                nn.Linear(256, 512),
                nn.ReLU(),
                nn.Linear(512, 1404)  # Match input dimension for reconstruction
            )

    def forward(self, x):
        encoded = self.encoder(x)
        if self.output_reconstruction:
            decoded = self.decoder(encoded)
            return encoded, decoded
        else:
            return encoded, None


# Cross-Modality Fusion (optional)
class FusionModule(nn.Module):
    def __init__(self, img_dim, lm_dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(img_dim + lm_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
        )

    def forward(self, img_feat, lm_feat):
        x = torch.cat([img_feat, lm_feat], dim=1)
        return self.fc(x)


# Hybrid Model with projection heads for contrastive loss
class HybridEmotionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.visual = ResEmoteNet()
        self.landmarks = LandmarkBranch(output_reconstruction=True)
        self.fusion = FusionModule(self.visual.out_features, 256)
        self.classifier = nn.Linear(128, 7)

        # Add projection heads for contrastive loss
        self.projection_img = nn.Linear(1024, 128)
        self.projection_lm = nn.Linear(256, 128)

    def forward(self, img, lm):
        img_feat = self.visual(img)                    # (B, 1024)
        lm_feat, lm_recon = self.landmarks(lm)         # (B, 256), (B, 936)
        fused = self.fusion(img_feat, lm_feat)         # (B, 128)
        logits = self.classifier(fused)                # (B, 7)

        # Project to same space for cosine contrastive loss
        img_proj = self.projection_img(img_feat)       # (B, 128)
        lm_proj = self.projection_lm(lm_feat)          # (B, 128)

        return logits, img_proj, lm_proj, lm_recon

## **Initializing Parameters**

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

from torchvision import transforms

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Match updated model input
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


train_dataset = HybridFERDataset(
    image_folder= r'D:\ExpW_dataset\images_cropped_224\train',
    landmark_folder= r'D:\ExpW_dataset\landmarks\train',
    transform=transform,
)

val_dataset = HybridFERDataset(
    image_folder= r'D:\ExpW_dataset\images_cropped_224\val',
    landmark_folder =r'D:\ExpW_dataset\landmarks\val',
    transform=transform,
)

test_dataset = HybridFERDataset(
    image_folder= r'D:\ExpW_dataset\images_cropped_224\test',
    landmark_folder= r'D:\ExpW_dataset\landmarks\test',
    transform=transform,
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Initialize everything
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

model = HybridEmotionModel().to(device)
criterion_ce = nn.CrossEntropyLoss()
criterion_contrastive = nn.CosineEmbeddingLoss()
criterion_recon = nn.L1Loss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)

# Training configuration
num_epochs = 50
patience = 10
start_epoch = 0
best_val_acc = 0
patience_counter = 0

# Lists to store history
train_losses, val_losses, test_losses = [], [], []
train_accuracies, val_accuracies, test_accuracies = [], [], []

## **Checkpoints for Training pausing and resuming**

In [None]:
# Load checkpoint if it exists
checkpoint_path = 'checkpoint.pth'
if os.path.exists(checkpoint_path):
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch'] + 1
    best_val_acc = checkpoint['best_val_acc']
    patience_counter = checkpoint['patience_counter']
    train_losses = checkpoint['train_losses']
    val_losses = checkpoint['val_losses']
    test_losses = checkpoint['test_losses']
    train_accuracies = checkpoint['train_accuracies']
    val_accuracies = checkpoint['val_accuracies']
    test_accuracies = checkpoint['test_accuracies']
    print(f"Resumed training from epoch {start_epoch}")

In [None]:
# Evaluation function
def evaluate(loader):
    model.eval()
    total, correct, loss_sum = 0, 0, 0.0
    with torch.no_grad():
        for images, landmarks, labels in loader:
            images, landmarks, labels = images.to(device), landmarks.to(device), labels.to(device)
            logits, _, _, _ = model(images, landmarks)
            loss = criterion_ce(logits, labels)
            _, predicted = torch.max(logits, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            loss_sum += loss.item()
    return loss_sum / len(loader), correct / total

## **Training Loop**

In [None]:
# Training loop
for epoch in range(start_epoch, num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, landmarks, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        images, landmarks, labels = images.to(device), landmarks.to(device), labels.to(device)
        optimizer.zero_grad()

        logits, img_feat, lm_feat, lm_recon = model(images, landmarks)

        loss_ce = criterion_ce(logits, labels)
        sim_target = torch.ones(img_feat.size(0)).to(device)
        loss_contrastive = criterion_contrastive(img_feat, lm_feat, sim_target)
        loss_recon = criterion_recon(lm_recon, landmarks)

        total_loss = loss_ce + 0.1 * loss_contrastive + 0.05 * loss_recon

        total_loss.backward()
        optimizer.step()

        running_loss += total_loss.item()
        _, predicted = torch.max(logits, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_loss = running_loss / len(train_loader)
    train_acc = correct / total
    train_losses.append(train_loss)
    train_accuracies.append(train_acc)

    val_loss, val_acc = evaluate(val_loader)
    test_loss, test_acc = evaluate(test_loader)

    val_losses.append(val_loss)
    val_accuracies.append(val_acc)
    test_losses.append(test_loss)
    test_accuracies.append(test_acc)

    print(f"Epoch {epoch+1} | Train Loss: {train_loss:.4f} | Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        patience_counter = 0
        torch.save(model.state_dict(), r'D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\models\expw_best_model.pth')
    else:
        patience_counter += 1
        print(f"No improvement in val accuracy for {patience_counter} epochs.")
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

    # Save checkpoint after every epoch
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_val_acc': best_val_acc,
        'patience_counter': patience_counter,
        'train_losses': train_losses,
        'val_losses': val_losses,
        'test_losses': test_losses,
        'train_accuracies': train_accuracies,
        'val_accuracies': val_accuracies,
        'test_accuracies': test_accuracies
    }

    torch.save(checkpoint, checkpoint_path)

## **Plotting Accuracy Convergences**

In [None]:
print("Epochs:", epoch)
print("Train Losses:", len(train_losses))
print("Validation Losses:", len(val_losses))
print("Train Accuracies:", len(train_accuracies))
print("Validation Accuracies:", len(val_accuracies))

In [None]:
# Save results
df = pd.DataFrame({
    'Epoch': list(range(1, len(train_losses)+1)),
    'Train Loss': train_losses,
    'Validation Loss': val_losses,
    'Train Accuracy': train_accuracies,
    'Validation Accuracy': val_accuracies,
})
df.to_csv("expw_training_results.csv", index=False)

In [None]:
# Plot accuracy
plt.figure(figsize=(10, 5))
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training vs Validation Accuracy')
plt.legend()
plt.grid(True)
plt.show()

# Plot loss
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.grid(True)
plt.show()

## **Testing Model Performance**

In [None]:
# Load model
model = HybridEmotionModel().to(device)
model.load_state_dict(torch.load(r'D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\models\expw_best_model.pth', map_location=device))
model.eval()

# Loss functions
criterion_ce = torch.nn.CrossEntropyLoss()
criterion_contrastive = torch.nn.CosineEmbeddingLoss()
criterion_recon = torch.nn.L1Loss()

# Evaluation
total, correct = 0, 0
loss_ce_sum, loss_contrastive_sum, loss_recon_sum = 0.0, 0.0, 0.0

with torch.no_grad():
    for images, landmarks, labels in test_loader:
        images, landmarks, labels = images.to(device), landmarks.to(device), labels.to(device)

        logits, img_feat, lm_feat, lm_recon = model(images, landmarks)

        # Loss components
        loss_ce = criterion_ce(logits, labels)
        sim_target = torch.ones(img_feat.size(0)).to(device)
        loss_contrastive = criterion_contrastive(img_feat, lm_feat, sim_target)
        loss_recon = criterion_recon(lm_recon, landmarks)

        # Accumulate
        loss_ce_sum += loss_ce.item()
        loss_contrastive_sum += loss_contrastive.item()
        loss_recon_sum += loss_recon.item()

        # Accuracy
        _, predicted = torch.max(logits, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# Final averaged losses
avg_loss_ce = loss_ce_sum / len(test_loader)
avg_loss_contrastive = loss_contrastive_sum / len(test_loader)
avg_loss_recon = loss_recon_sum / len(test_loader)
test_accuracy = correct / total

print(f"\n Full Test Results:")
print(f"Test CrossEntropy Loss: {avg_loss_ce:.4f}")
print(f"Test CosineEmbedding (Contrastive) Loss: {avg_loss_contrastive:.4f}")
print(f"Test Landmark Reconstruction (L1) Loss: {avg_loss_recon:.4f}")
print(f"Test Accuracy: {test_accuracy*100:.2f}%")


In [1]:
from pathlib import Path

def print_directory_structure(root_dir):
    root_path = Path(root_dir)
    
    for path in root_path.rglob('*'):
        print(path)

# Example usage:
root_directory = r'D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src'
print_directory_structure(root_directory)


D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\components
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\inference_pipeline
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\logging
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\models
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\utils
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\__init__.py
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\components\face_detector.py
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\components\hybrid_model.py
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\components\landmark_extraction.py
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\components\predictor.py
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\components\__init__.py
D:\IIT BBS\Intern works\Flasho tech\expression_identifier\src\inference_pipeline\main.py
D:\IIT BBS\Intern works\Flasho