Mohammed elidrissi laoukili
* subjet  : video analysis

In [None]:
# # This Python 3 environment comes with many helpful analytics libraries installed
# # It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# # For example, here's several helpful packages to load

# import numpy as np # linear algebra
# import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# # Input data files are available in the read-only "../input/" directory
# # For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# # You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# # You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
"""
TimeSformer for Facial Expression Recognition on Kaggle
Adapted for image-based dataset with YOLO format
"""
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import cv2
from pathlib import Path
from transformers import TimesformerModel, TimesformerConfig
import albumentations as A
from albumentations.pytorch import ToTensorV2

2026-01-02 16:46:01.411562: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1767372361.659518      55 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1767372361.727087      55 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1767372362.346701      55 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1767372362.346744      55 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1767372362.346747      55 computation_placer.cc:177] computation placer alr

In [3]:
# ========================
# 1. DATASET CLASS
# ========================

class FacialExpressionDataset(Dataset):
    """Dataset for facial expressions with temporal augmentation"""
    def __init__(self, base_dir, split='train', num_frames=8, frame_size=224, transform=None):
        self.base_dir = Path(base_dir)
        self.split = split
        self.num_frames = num_frames
        self.frame_size = frame_size
        self.transform = transform
        
        # Emotion mapping
        self.emotion_map = {
            0: 'angry', 1: 'contempt', 2: 'disgust', 3: 'fear', 
            4: 'happy', 5: 'natural', 6: 'sad', 7: 'sleepy', 8: 'surprised'
        }
        self.idx_to_emotion = {v: k for k, v in self.emotion_map.items()}
        
        # Load image paths and labels
        self.samples = self._load_samples()
        
        print(f"Loaded {len(self.samples)} images from {split} split")
        self._print_distribution()
    
    def _load_samples(self):
        """Load all image paths and extract labels from YOLO format"""
        samples = []
        
        images_dir = self.base_dir / self.split / 'images'
        labels_dir = self.base_dir / self.split / 'labels'
        
        if not images_dir.exists():
            raise FileNotFoundError(f"Images directory not found: {images_dir}")
        
        for img_path in sorted(images_dir.glob('*.[jp][pn]g')):
            # Get corresponding label file
            label_path = labels_dir / f"{img_path.stem}.txt"
            
            if label_path.exists():
                # Read YOLO label (first number is class)
                with open(label_path, 'r') as f:
                    line = f.readline().strip()
                    if line:
                        class_id = int(line.split()[0])
                        samples.append((str(img_path), class_id))
            else:
                # If no label, try to infer from filename or skip
                print(f"Warning: No label found for {img_path.name}")
        
        return samples
    
    def _print_distribution(self):
        """Print class distribution"""
        label_counts = {}
        for _, label in self.samples:
            emotion = self.emotion_map.get(label, 'unknown')
            label_counts[emotion] = label_counts.get(emotion, 0) + 1
        
        print(f"\nClass distribution ({self.split}):")
        for emotion, count in sorted(label_counts.items()):
            print(f"  {emotion}: {count}")
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        
        try:
            # Load image
            image = cv2.imread(img_path)
            if image is None:
                raise ValueError(f"Failed to load image: {img_path}")
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            
            # Create temporal sequence by augmenting the same image
            frames = self._create_temporal_sequence(image)
        except Exception as e:
            print(f"Error loading {img_path}: {e}")
            # Return dummy data if loading fails
            frames = torch.zeros(self.num_frames, 3, self.frame_size, self.frame_size)
        
        return frames, label
    
    def _create_temporal_sequence(self, image):
        """
        Create a temporal sequence from a single image
        Strategy: Apply different augmentations to create pseudo-temporal data
        """
        # Resize base image once
        image = cv2.resize(image, (self.frame_size, self.frame_size))
        
        frames = []
        for i in range(self.num_frames):
            if self.transform:
                # Apply augmentation (includes normalization)
                augmented = self.transform(image=image.copy())
                frame = augmented['image']
            else:
                # Just convert to tensor and normalize
                frame = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
                mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
                std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
                frame = (frame - mean) / std
            
            frames.append(frame)
        
        # Stack: [num_frames, 3, H, W]
        frames = torch.stack(frames)
        
        return frames

In [4]:
# ========================
# 2. AUGMENTATION
# ========================

def get_transforms(mode='train', img_size=224):
    """Get augmentation pipeline - returns numpy arrays for efficiency"""
    if mode == 'train':
        return A.Compose([
            A.HorizontalFlip(p=0.5),
            A.Rotate(limit=15, p=0.5),
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
            A.GaussNoise(var_limit=(10.0, 50.0), p=0.3),
            A.GaussianBlur(blur_limit=(3, 5), p=0.3),
            A.CoarseDropout(max_holes=1, max_height=32, max_width=32, p=0.3),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ToTensorV2(),
        ])
    else:
        return A.Compose([
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ToTensorV2(),
        ])

In [5]:
# ========================
# 3. MODEL
# ========================

class TimeSformerForFacialExpression(nn.Module):
    """TimeSformer model for facial expression recognition"""
    def __init__(self, num_classes=9, num_frames=8, pretrained=True, freeze_backbone=False):
        super().__init__()
        
        print("\nüèóÔ∏è  Building TimeSformer model...")
        
        if pretrained:
            print("   Loading pretrained TimeSformer...")
            self.timesformer = TimesformerModel.from_pretrained(
                "facebook/timesformer-base-finetuned-k400",
                ignore_mismatched_sizes=True
            )
            print("   ‚úì Pretrained weights loaded")
        else:
            config = TimesformerConfig(
                image_size=224,
                patch_size=16,
                num_channels=3,
                num_frames=num_frames,
                hidden_size=768,
                num_hidden_layers=12,
                num_attention_heads=12,
                intermediate_size=3072,
                attention_type="divided_space_time"
            )
            self.timesformer = TimesformerModel(config)
        
        # Optionally freeze backbone
        if freeze_backbone:
            print("   Freezing TimeSformer backbone...")
            for param in self.timesformer.parameters():
                param.requires_grad = False
        
        hidden_size = self.timesformer.config.hidden_size
        
        # Enhanced classification head
        self.classifier = nn.Sequential(
            nn.LayerNorm(hidden_size),
            nn.Dropout(0.5),
            nn.Linear(hidden_size, 512),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.GELU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )
        
        print(f"   Output Classes: {num_classes}")
        print(f"   Backbone Frozen: {freeze_backbone}")
    
    def forward(self, pixel_values):
        """Forward pass"""
        # pixel_values: [batch_size, num_frames, channels, height, width]
        outputs = self.timesformer(pixel_values)
        cls_token = outputs.last_hidden_state[:, 0]
        logits = self.classifier(cls_token)
        return logits

In [6]:
# ========================
# 4. TRAINING
# ========================

def train_epoch(model, loader, criterion, optimizer, device, scaler=None, accumulation_steps=1):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    optimizer.zero_grad()
    
    pbar = tqdm(loader, desc='Training')
    for batch_idx, (frames, labels) in enumerate(pbar):
        frames = frames.to(device)
        labels = labels.to(device)
        
        # Mixed precision training with gradient accumulation
        if scaler:
            with torch.cuda.amp.autocast():
                outputs = model(frames)
                loss = criterion(outputs, labels)
                loss = loss / accumulation_steps  # Scale loss
            
            scaler.scale(loss).backward()
            
            if (batch_idx + 1) % accumulation_steps == 0:
                scaler.unscale_(optimizer)
                torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()
        else:
            outputs = model(frames)
            loss = criterion(outputs, labels)
            loss = loss / accumulation_steps
            loss.backward()
            
            if (batch_idx + 1) % accumulation_steps == 0:
                torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
                optimizer.step()
                optimizer.zero_grad()
        
        total_loss += loss.item() * accumulation_steps
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        
        pbar.set_postfix({
            'loss': f'{loss.item() * accumulation_steps:.4f}',
            'acc': f'{100.*correct/total:.2f}%'
        })
    
    return total_loss / len(loader), correct / total


def validate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for frames, labels in tqdm(loader, desc='Validating'):
            frames = frames.to(device)
            labels = labels.to(device)
            
            outputs = model(frames)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            _, predicted = outputs.max(1)
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)
    
    return total_loss / len(loader), acc, f1, all_preds, all_labels

In [7]:
def plot_confusion_matrix(y_true, y_pred, emotion_map, save_path='confusion_matrix.png'):
    """Plot and save confusion matrix"""
    cm = confusion_matrix(y_true, y_pred)
    
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=emotion_map.values(),
                yticklabels=emotion_map.values())
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.xticks(rotation=45)
    plt.yticks(rotation=45)
    plt.tight_layout()
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"Confusion matrix saved to {save_path}")


def plot_training_history(history, save_path='training_history.png'):
    """Plot and save training history"""
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    # Loss plot
    axes[0].plot(history['train_loss'], label='Train Loss', marker='o')
    axes[0].plot(history['val_loss'], label='Val Loss', marker='s')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('Training and Validation Loss')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # Accuracy plot
    axes[1].plot([a*100 for a in history['train_acc']], label='Train Acc', marker='o')
    axes[1].plot([a*100 for a in history['val_acc']], label='Val Acc', marker='s')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Accuracy (%)')
    axes[1].set_title('Training and Validation Accuracy')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"Training history saved to {save_path}")

In [None]:
# ========================
# 5. MAIN
# ========================

def main():
    print("="*80)
    print("TIMESFORMER FACIAL EXPRESSION RECOGNITION")
    print("="*80)
    
    # Configuration
    config = {
        'base_dir': '/kaggle/input/8-facial-expressions-for-yolo/9 Facial Expressions you need',
        'num_frames': 8,
        'frame_size': 224,
        'batch_size': 4,  # Reduced for memory efficiency with TimeSformer
        'num_epochs': 3,
        'learning_rate': 5e-5,  # Lower initial LR for stability
        'weight_decay': 0.01,
        'device': 'cuda' if torch.cuda.is_available() else 'cpu',
        'use_pretrained': True,
        'freeze_backbone': True,  # Freeze for first epochs
        'num_workers': 2,
        'mixed_precision': True,
        'accumulation_steps': 4,  # Gradient accumulation for effective batch size
    }
    
    print(f"\nConfiguration:")
    for k, v in config.items():
        print(f"  {k}: {v}")
    
    # Set seeds
    torch.manual_seed(42)
    np.random.seed(42)
    
    # Load datasets
    print("\nüìÅ Loading datasets...")
    train_dataset = FacialExpressionDataset(
        config['base_dir'], 
        split='train',
        num_frames=config['num_frames'],
        frame_size=config['frame_size'],
        transform=get_transforms('train', config['frame_size'])
    )
    
    val_dataset = FacialExpressionDataset(
        config['base_dir'],
        split='valid',
        num_frames=config['num_frames'],
        frame_size=config['frame_size'],
        transform=get_transforms('val', config['frame_size'])
    )
    
    test_dataset = FacialExpressionDataset(
        config['base_dir'],
        split='test',
        num_frames=config['num_frames'],
        frame_size=config['frame_size'],
        transform=get_transforms('val', config['frame_size'])
    )
    
    # DataLoaders
    train_loader = DataLoader(
        train_dataset, 
        batch_size=config['batch_size'],
        shuffle=True, 
        num_workers=config['num_workers'],
        pin_memory=True,
        drop_last=True
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=config['batch_size'],
        shuffle=False,
        num_workers=config['num_workers'],
        pin_memory=True
    )
    
    test_loader = DataLoader(
        test_dataset,
        batch_size=config['batch_size'],
        shuffle=False,
        num_workers=config['num_workers'],
        pin_memory=True
    )
    
    # Model
    model = TimeSformerForFacialExpression(
        num_classes=9,
        num_frames=config['num_frames'],
        pretrained=config['use_pretrained'],
        freeze_backbone=config['freeze_backbone']
    ).to(config['device'])
    
    total_params = sum(p.numel() for p in model.parameters()) / 1e6
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6
    print(f"\nüìà Parameters: {total_params:.2f}M total, {trainable_params:.2f}M trainable")
    
    # Training setup
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
    optimizer = optim.AdamW(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr=config['learning_rate'],
        weight_decay=config['weight_decay']
    )
    scheduler = optim.lr_scheduler.CosineAnnealingLR(
        optimizer, 
        T_max=config['num_epochs']
    )
    
    scaler = torch.cuda.amp.GradScaler() if config['mixed_precision'] and config['device'] == 'cuda' else None
    
    # Training loop
    best_val_acc = 0
    best_val_f1 = 0
    patience = 7
    patience_counter = 0
    history = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': [], 'val_f1': []}
    
    print("\n" + "="*80)
    print("üöÄ STARTING TRAINING")
    print("="*80)
    
    for epoch in range(config['num_epochs']):
        print(f"\nüìÖ Epoch {epoch+1}/{config['num_epochs']}")
        
        # Unfreeze backbone after some epochs
        if config['freeze_backbone'] and epoch == 5:
            print("   üîì Unfreezing backbone...")
            for param in model.timesformer.parameters():
                param.requires_grad = True
            # Rebuild optimizer with all parameters and lower LR
            optimizer = optim.AdamW(
                model.parameters(),
                lr=config['learning_rate'] / 10,
                weight_decay=config['weight_decay']
            )
            # Reset scheduler for remaining epochs
            scheduler = optim.lr_scheduler.CosineAnnealingLR(
                optimizer, 
                T_max=config['num_epochs'] - epoch
            )
        
        train_loss, train_acc = train_epoch(
            model, train_loader, criterion, optimizer, config['device'], 
            scaler, config['accumulation_steps']
        )
        
        val_loss, val_acc, val_f1, _, _ = validate(
            model, val_loader, criterion, config['device']
        )
        
        scheduler.step()
        
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['train_acc'].append(train_acc)
        history['val_acc'].append(val_acc)
        history['val_f1'].append(val_f1)
        
        print(f"Train: Loss={train_loss:.4f}, Acc={train_acc*100:.2f}%")
        print(f"Val:   Loss={val_loss:.4f}, Acc={val_acc*100:.2f}%, F1={val_f1:.4f}")
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_val_f1 = val_f1
            patience_counter = 0
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_acc': val_acc,
                'val_f1': val_f1,
                'config': config,
            }, 'best_timesformer_facial.pth')
            print(f"‚úÖ New best model saved! Val Acc: {val_acc*100:.2f}%")
        else:
            patience_counter += 1
            print(f"No improvement. Patience: {patience_counter}/{patience}")
            
            if patience_counter >= patience:
                print(f"\n‚ö†Ô∏è  Early stopping triggered after {epoch+1} epochs")
                break
    
    # Test evaluation
    print("\n" + "="*80)
    print("üéØ FINAL TEST EVALUATION")
    print("="*80)
    
    checkpoint = torch.load('best_timesformer_facial.pth')
    model.load_state_dict(checkpoint['model_state_dict'])
    
    test_loss, test_acc, test_f1, test_preds, test_labels = validate(
        model, test_loader, criterion, config['device']
    )
    
    print(f"\nüìä Test Results:")
    print(f"  Accuracy: {test_acc*100:.2f}%")
    print(f"  F1-Score: {test_f1:.4f}")
    
    # Plot confusion matrix
    plot_confusion_matrix(test_labels, test_preds, train_dataset.emotion_map)
    
    # Plot training history
    plot_training_history(history)
    
    # Print per-class metrics
    from sklearn.metrics import classification_report
    print("\nüìã Per-Class Metrics:")
    print(classification_report(test_labels, test_preds, 
                                target_names=list(train_dataset.emotion_map.values()),
                                digits=4))
    
    print("\n‚úÖ Training complete!")
    print(f"Best validation accuracy: {best_val_acc*100:.2f}%")
    print(f"Best validation F1-score: {best_val_f1:.4f}")


if __name__ == '__main__':
    main()

TIMESFORMER FACIAL EXPRESSION RECOGNITION

Configuration:
  base_dir: /kaggle/input/8-facial-expressions-for-yolo/9 Facial Expressions you need
  num_frames: 8
  frame_size: 224
  batch_size: 4
  num_epochs: 3
  learning_rate: 5e-05
  weight_decay: 0.01
  device: cuda
  use_pretrained: True
  freeze_backbone: True
  num_workers: 2
  mixed_precision: True
  accumulation_steps: 4

üìÅ Loading datasets...


  A.GaussNoise(var_limit=(10.0, 50.0), p=0.3),
  A.CoarseDropout(max_holes=1, max_height=32, max_width=32, p=0.3),
