In [None]:
import zipfile
import os
from google.colab import drive

drive.mount('/content/drive')

zip_path = '/content/drive/MyDrive/Facial Analysis/archive.zip'
local_dest = '/content/'

print(f"Copying {zip_path} to {local_dest}...")
!cp "{zip_path}" "{local_dest}"

print("Extracting archive...")
with zipfile.ZipFile(os.path.join(local_dest, 'archive.zip'), 'r') as zip_ref:
    zip_ref.extractall(local_dest)

TRAIN_PATH = '/content/train'
TEST_PATH = '/content/test'

print(f"\nDataset extracted!")
print(f"Train path: {TRAIN_PATH}")
print(f"Test path: {TEST_PATH}")

import os
if os.path.exists(TRAIN_PATH):
    train_classes = os.listdir(TRAIN_PATH)
    print(f"\nTrain classes found: {train_classes}")
    for cls in train_classes:
        cls_path = os.path.join(TRAIN_PATH, cls)
        if os.path.isdir(cls_path):
            print(f"  {cls}: {len(os.listdir(cls_path))} images")

if os.path.exists(TEST_PATH):
    test_classes = os.listdir(TEST_PATH)
    print(f"\nTest classes found: {test_classes}")
    for cls in test_classes:
        cls_path = os.path.join(TEST_PATH, cls)
        if os.path.isdir(cls_path):
            print(f"  {cls}: {len(os.listdir(cls_path))} images")


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import cv2
import math
from sklearn.metrics import classification_report, confusion_matrix
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory Available: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")


In [None]:
class Conv2dSame(torch.nn.Conv2d):
    def calc_same_pad(self, i: int, k: int, s: int, d: int) -> int:
        return max((math.ceil(i / s) - 1) * s + (k - 1) * d + 1 - i, 0)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        ih, iw = x.size()[-2:]
        pad_h = self.calc_same_pad(i=ih, k=self.kernel_size[0], s=self.stride[0], d=self.dilation[0])
        pad_w = self.calc_same_pad(i=iw, k=self.kernel_size[1], s=self.stride[1], d=self.dilation[1])

        if pad_h > 0 or pad_w > 0:
            x = F.pad(x, [pad_w // 2, pad_w - pad_w // 2, pad_h // 2, pad_h - pad_h // 2])
        return F.conv2d(x, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups)

print("Conv2dSame class defined - Matches realtime_facial_analysis.py")


In [None]:
class Bottleneck(nn.Module):
    expansion = 4
    
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Bottleneck, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(out_channels, eps=0.001, momentum=0.99)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding='same', bias=False)
        self.batch_norm2 = nn.BatchNorm2d(out_channels, eps=0.001, momentum=0.99)
        
        self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0, bias=False)
        self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion, eps=0.001, momentum=0.99)
        
        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()
        
    def forward(self, x):
        identity = x.clone()
        x = self.relu(self.batch_norm1(self.conv1(x)))
        x = self.relu(self.batch_norm2(self.conv2(x)))
        x = self.conv3(x)
        x = self.batch_norm3(x)
        
        if self.i_downsample is not None:
            identity = self.i_downsample(identity)
        x += identity
        x = self.relu(x)
        
        return x

print("Bottleneck class defined - expansion=4, BatchNorm(eps=0.001, momentum=0.99)")


In [None]:
class ResNet(nn.Module):
    def __init__(self, ResBlock, layer_list, num_classes, num_channels=3):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv_layer_s2_same = Conv2dSame(num_channels, 64, 7, stride=2, groups=1, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(64, eps=0.001, momentum=0.99)
        self.relu = nn.ReLU()
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2)
        
        self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64, stride=1)
        self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
        self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
        self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc1 = nn.Linear(512*ResBlock.expansion, 512)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(512, num_classes)

    def extract_features(self, x):
        x = self.relu(self.batch_norm1(self.conv_layer_s2_same(x)))
        x = self.max_pool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc1(x)
        return x
        
    def forward(self, x):
        x = self.extract_features(x)
        x = self.relu1(x)
        x = self.fc2(x)
        return x
        
    def _make_layer(self, ResBlock, blocks, planes, stride=1):
        ii_downsample = None
        layers = []
        
        if stride != 1 or self.in_channels != planes*ResBlock.expansion:
            ii_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride, bias=False, padding=0),
                nn.BatchNorm2d(planes*ResBlock.expansion, eps=0.001, momentum=0.99)
            )
            
        layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
        self.in_channels = planes*ResBlock.expansion
        
        for i in range(blocks-1):
            layers.append(ResBlock(self.in_channels, planes))
            
        return nn.Sequential(*layers)

def ResNet50(num_classes, channels=3):
    return ResNet(Bottleneck, [3,4,6,3], num_classes, channels)

print("ResNet class defined with exact architecture:")
print("  - Conv2dSame 7x7 stride=2 -> BatchNorm -> ReLU -> MaxPool 3x3 stride=2")
print("  - Layer blocks: [3,4,6,3] with planes [64,128,256,512]")
print("  - FC layers: 2048 -> 512 -> 7 classes")
print("  - BatchNorm everywhere: eps=0.001, momentum=0.99")


In [None]:
class FER2013Dataset(Dataset):
    def __init__(self, root_dir, transform=None, is_training=False):
        self.root_dir = root_dir
        self.transform = transform
        self.is_training = is_training
        
        self.emotion_map = {
            'angry': 6, 'disgusted': 5, 'fearful': 4, 
            'happy': 1, 'neutral': 0, 'sad': 2, 'surprised': 3
        }
        
        self.samples = []
        for emotion_folder in os.listdir(root_dir):
            emotion_path = os.path.join(root_dir, emotion_folder)
            if os.path.isdir(emotion_path) and emotion_folder in self.emotion_map:
                label = self.emotion_map[emotion_folder]
                for img_name in os.listdir(emotion_path):
                    img_path = os.path.join(emotion_path, img_name)
                    self.samples.append((img_path, label))
        
        print(f"Loaded {len(self.samples)} images from {root_dir}")
        
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        
        image = cv2.imread(img_path)
        if image is None:
            image = np.zeros((48, 48, 3), dtype=np.uint8)
        
        if len(image.shape) == 2:
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
        elif image.shape[2] == 1:
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
        
        image = cv2.resize(image, (224, 224), interpolation=cv2.INTER_NEAREST)
        
        if self.transform:
            image = self.transform(image)
        
        image = image.astype(np.float32)
        image = image[..., ::-1].copy()
        image[..., 0] -= 91.4953
        image[..., 1] -= 103.8827
        image[..., 2] -= 131.0912
        
        image = torch.from_numpy(image).permute(2, 0, 1)
        
        return image, label

print("FER2013Dataset class defined:")
print("  - Emotion mapping: angry→6, disgusted→5, fearful→4, happy→1, neutral→0, sad→2, surprised→3")
print("  - Resize: 224x224 with INTER_NEAREST")
print("  - RGB mean normalization: R-91.4953, G-103.8827, B-131.0912")


In [None]:
class TrainAugmentation:
    def __init__(self, p=0.5):
        self.p = p
        
    def __call__(self, image):
        if np.random.rand() < self.p:
            image = cv2.flip(image, 1)
        
        if np.random.rand() < self.p:
            angle = np.random.uniform(-15, 15)
            h, w = image.shape[:2]
            M = cv2.getRotationMatrix2D((w/2, h/2), angle, 1.0)
            image = cv2.warpAffine(image, M, (w, h), borderMode=cv2.BORDER_REPLICATE)
        
        if np.random.rand() < self.p:
            brightness = np.random.uniform(0.8, 1.2)
            image = np.clip(image * brightness, 0, 255).astype(np.uint8)
        
        if np.random.rand() < self.p:
            contrast = np.random.uniform(0.8, 1.2)
            mean = image.mean()
            image = np.clip((image - mean) * contrast + mean, 0, 255).astype(np.uint8)
        
        return image

train_augmentation = TrainAugmentation(p=0.5)

print("Data augmentation pipeline created:")
print("  - Horizontal flip (p=0.5)")
print("  - Random rotation ±15° (p=0.5)")
print("  - Brightness adjustment 0.8-1.2x (p=0.5)")
print("  - Contrast adjustment 0.8-1.2x (p=0.5)")


In [None]:
BATCH_SIZE = 64
NUM_WORKERS = 4

train_dataset = FER2013Dataset(TRAIN_PATH, transform=train_augmentation, is_training=True)
test_dataset = FER2013Dataset(TEST_PATH, transform=None, is_training=False)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)

print(f"\nDataLoaders created:")
print(f"  - Train batches: {len(train_loader)} (batch_size={BATCH_SIZE})")
print(f"  - Test batches: {len(test_loader)} (batch_size={BATCH_SIZE})")
print(f"  - Total train samples: {len(train_dataset)}")
print(f"  - Total test samples: {len(test_dataset)}")


In [None]:
model = ResNet50(num_classes=7, channels=3)
model = model.to(device)

criterion = nn.CrossEntropyLoss()

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Model initialized: ResNet50")
print(f"  - Input: 224x224x3 RGB images")
print(f"  - Output: 7 emotion classes")
print(f"  - Total parameters: {total_params:,}")
print(f"  - Trainable parameters: {trainable_params:,}")
print(f"  - Device: {device}")
print(f"  - Loss function: CrossEntropyLoss")


In [None]:
LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.0001
NUM_EPOCHS = 50

optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, min_lr=1e-7)

print(f"Optimizer configured:")
print(f"  - Type: Adam")
print(f"  - Learning rate: {LEARNING_RATE}")
print(f"  - Weight decay: {WEIGHT_DECAY}")
print(f"  - Scheduler: ReduceLROnPlateau")
print(f"    * Factor: 0.5 (halves LR)")
print(f"    * Patience: 3 epochs")
print(f"    * Min LR: 1e-7")
print(f"  - Training epochs: {NUM_EPOCHS}")


In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    progress_bar = tqdm(train_loader, desc='Training', leave=False)
    for images, labels in progress_bar:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * images.size(0)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        
        progress_bar.set_postfix({'loss': f'{loss.item():.4f}', 'acc': f'{100.*correct/total:.2f}%'})
    
    epoch_loss = running_loss / total
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

def validate_epoch(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        progress_bar = tqdm(val_loader, desc='Validation', leave=False)
        for images, labels in progress_bar:
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * images.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            
            progress_bar.set_postfix({'loss': f'{loss.item():.4f}', 'acc': f'{100.*correct/total:.2f}%'})
    
    epoch_loss = running_loss / total
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

print("Training and validation functions defined")
print("  - train_epoch(): trains model for one epoch with progress tracking")
print("  - validate_epoch(): evaluates model on validation set")
print("  - Both return loss and accuracy metrics")


In [None]:
history = {
    'train_loss': [], 'train_acc': [],
    'val_loss': [], 'val_acc': []
}

best_val_acc = 0.0
best_model_path = '/content/drive/MyDrive/Facial Analysis/models/FER_static_ResNet50_AffectNet.pt'

print("=" * 70)
print(f"Starting Training - {NUM_EPOCHS} Epochs")
print("=" * 70)

for epoch in range(NUM_EPOCHS):
    print(f"\nEpoch [{epoch+1}/{NUM_EPOCHS}]")
    print("-" * 70)
    
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = validate_epoch(model, test_loader, criterion, device)
    
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%")
    print(f"Val Loss:   {val_loss:.4f} | Val Acc:   {val_acc:.2f}%")
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), best_model_path)
        print(f"✓ Best model saved! Val Acc: {val_acc:.2f}%")
    
    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']
    print(f"Learning Rate: {current_lr:.2e}")

print("\n" + "=" * 70)
print("Training Complete!")
print(f"Best Validation Accuracy: {best_val_acc:.2f}%")
print(f"Model saved to: {best_model_path}")
print("=" * 70)


In [None]:
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

epochs_range = range(1, NUM_EPOCHS + 1)

axes[0].plot(epochs_range, history['train_loss'], 'b-', label='Train Loss', linewidth=2)
axes[0].plot(epochs_range, history['val_loss'], 'r-', label='Val Loss', linewidth=2)
axes[0].set_xlabel('Epoch', fontsize=12)
axes[0].set_ylabel('Loss', fontsize=12)
axes[0].set_title('Training and Validation Loss', fontsize=14, fontweight='bold')
axes[0].legend(fontsize=11)
axes[0].grid(True, alpha=0.3)

axes[1].plot(epochs_range, history['train_acc'], 'b-', label='Train Accuracy', linewidth=2)
axes[1].plot(epochs_range, history['val_acc'], 'r-', label='Val Accuracy', linewidth=2)
axes[1].set_xlabel('Epoch', fontsize=12)
axes[1].set_ylabel('Accuracy (%)', fontsize=12)
axes[1].set_title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
axes[1].legend(fontsize=11)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('/content/drive/MyDrive/Facial Analysis/models/training_curves.png', dpi=150, bbox_inches='tight')
plt.show()

print(f"Training curves saved to Google Drive")
print(f"Final Train Accuracy: {history['train_acc'][-1]:.2f}%")
print(f"Best Validation Accuracy: {max(history['val_acc']):.2f}%")


In [None]:
model.load_state_dict(torch.load(best_model_path))
model.eval()

all_preds = []
all_labels = []

print("Evaluating best model on test set...")
with torch.no_grad():
    for images, labels in tqdm(test_loader, desc='Evaluating'):
        images = images.to(device)
        outputs = model(images)
        _, predicted = outputs.max(1)
        
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.numpy())

emotion_names = ['Neutral', 'Happiness', 'Sadness', 'Surprise', 'Fear', 'Disgust', 'Anger']

print("\n" + "=" * 70)
print("CLASSIFICATION REPORT")
print("=" * 70)
print(classification_report(all_labels, all_preds, target_names=emotion_names, digits=4))

cm = confusion_matrix(all_labels, all_preds)
accuracy_per_class = cm.diagonal() / cm.sum(axis=1) * 100

print("\n" + "=" * 70)
print("PER-CLASS ACCURACY")
print("=" * 70)
for i, emotion in enumerate(emotion_names):
    print(f"{emotion:12s}: {accuracy_per_class[i]:6.2f}%")
print("=" * 70)

overall_acc = 100. * sum(cm.diagonal()) / cm.sum()
print(f"\nOverall Test Accuracy: {overall_acc:.2f}%")
print(f"Best Validation Accuracy: {best_val_acc:.2f}%")
print("=" * 70)


In [None]:
test_model = ResNet50(num_classes=7, channels=3)
test_model.load_state_dict(torch.load(best_model_path))
test_model.to(device)
test_model.eval()

print("=" * 70)
print("MODEL COMPATIBILITY VERIFICATION")
print("=" * 70)

test_input = torch.randn(1, 3, 224, 224).to(device)
with torch.no_grad():
    output = test_model(test_input)
    features = test_model.extract_features(test_input)

print(f"✓ Model loaded successfully")
print(f"✓ Input shape: {test_input.shape}")
print(f"✓ Output shape: {output.shape} (batch, 7 classes)")
print(f"✓ Feature shape: {features.shape} (batch, 512 features)")
print(f"✓ Model architecture matches realtime_facial_analysis.py")

print("\n" + "=" * 70)
print("USAGE INSTRUCTIONS")
print("=" * 70)
print("1. Download the trained model from Google Drive:")
print(f"   Location: /content/drive/MyDrive/Facial Analysis/models/FER_static_ResNet50_AffectNet.pt")
print()
print("2. Place it in your project models/ directory:")
print(f"   your-project/models/FER_static_ResNet50_AffectNet.pt")
print()
print("3. Run realtime facial analysis:")
print("   python realtime_facial_analysis.py")
print()
print("4. Model specifications:")
print(f"   - Architecture: ResNet50 with custom Conv2dSame")
print(f"   - Input: 224x224 RGB images")
print(f"   - Preprocessing: BGR to RGB + mean normalization")
print(f"   - Output: 7 emotions (Neutral, Happiness, Sadness, Surprise, Fear, Disgust, Anger)")
print(f"   - Test Accuracy: {overall_acc:.2f}%")
print("=" * 70)
