<a href="https://colab.research.google.com/github/chamindu002/Research/blob/main/casia_cnn_face_new_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
nhatdealin_casiawebface_dataset_crop_path = kagglehub.dataset_download('nhatdealin/casiawebface-dataset-crop')

print('Data source import complete.')


In [None]:
!pip install timm
# Install timm for models and huggingface_hub for cloud saving
!pip install timm huggingface_hub



In [None]:
# Cell 2: Imports and Device Setup
import os
import cv2
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
from PIL import Image
import timm
from tqdm.notebook import tqdm
import warnings
from huggingface_hub import HfApi, login
from kaggle_secrets import UserSecretsClient

# Suppress warnings
warnings.filterwarnings('ignore')

# Device Config
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"‚úÖ Device set to: {device}")

# Check CUDA details
if torch.cuda.is_available():
    print(f"   GPU: {torch.cuda.get_device_name(0)}")
    print(f"   Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

# --- SECURE HUGGING FACE LOGIN ---
try:
    user_secrets = UserSecretsClient()
    hf_token = user_secrets.get_secret("HF_TOKEN")
    login(token=hf_token)
    print("‚úÖ Logged in to Hugging Face Hub securely.")
    USE_HF_BACKUP = True
except Exception as e:
    print("‚ö†Ô∏è Cloud Backup disabled. Could not find 'HF_TOKEN' in Kaggle Secrets.")
    print("   (To enable: Add-ons -> Secrets -> Add 'HF_TOKEN')")
    USE_HF_BACKUP = False



‚úÖ Device set to: cuda
   GPU: Tesla T4
   Memory: 15.83 GB
‚úÖ Logged in to Hugging Face Hub securely.


In [None]:
# Cell 3: Enhanced Configuration
class Config:
    # Dataset Path
    DATA_DIR = '/kaggle/input/casiawebface-dataset-crop/CASIA-WebFace_crop'

    # Model Architecture
    MODEL_NAME = 'efficientnet_b0'  # Can upgrade to 'efficientnet_b3' if GPU allows
    IMG_SIZE = 224
    EMBEDDING_SIZE = 512  # Feature embedding dimension

    # Training Hyperparameters
    BATCH_SIZE = 64  # Increased from 32 for more stable gradients
    EPOCHS = 50  # Increased for better convergence
    LEARNING_RATE = 1e-4  # Reduced from 3e-4 for fine-tuning
    BACKBONE_LR = 5e-5  # Lower LR for pretrained backbone
    NUM_WORKERS = 2

    # Regularization
    WEIGHT_DECAY = 1e-4  # Reduced from 1e-3
    DROPOUT = 0.3  # Reduced from 0.5
    LABEL_SMOOTHING = 0.1
    MIN_SAMPLES = 3

    # Early Stopping
    PATIENCE = 10  # Stop if no improvement for 10 epochs
    MIN_DELTA = 0.001  # Minimum improvement to count

    # Augmentation
    MIXUP_ALPHA = 0.2  # Mixup augmentation strength
    USE_MIXUP = True

    # Cloud Backup
    HF_REPO_ID = "chami002/casia-face-recognition"
    MODEL_FILENAME = "best_face_model.pth"

    # Logging
    SAVE_FREQ = 5  # Save checkpoint every N epochs

cfg = Config()
print("‚úÖ Enhanced Configuration loaded")
print(f"   Model: {cfg.MODEL_NAME}")
print(f"   Batch Size: {cfg.BATCH_SIZE}")
print(f"   Learning Rate: {cfg.LEARNING_RATE}")
print(f"   Epochs: {cfg.EPOCHS}")

‚úÖ Enhanced Configuration loaded
   Model: efficientnet_b0
   Batch Size: 64
   Learning Rate: 0.0001
   Epochs: 50


Cell 4: Data Preparation

In [None]:
# Cell 4: Data Preparation with Enhanced Analysis
def prepare_metadata(data_dir):
    print(f"üìÇ Scanning files in {data_dir}...")

    # Grab all jpg/png files
    files = glob.glob(os.path.join(data_dir, "**/*.jpg"), recursive=True) + \
            glob.glob(os.path.join(data_dir, "**/*.png"), recursive=True) + \
            glob.glob(os.path.join(data_dir, "*.jpg")) + \
            glob.glob(os.path.join(data_dir, "*.png"))

    if not files:
        raise ValueError("‚ùå No images found! Check your cfg.DATA_DIR path.")

    print(f"   Found {len(files)} total files")

    # Parse filenames: "00000045_001.jpg" -> ID: "00000045"
    data = []
    for f in tqdm(files, desc="Parsing files"):
        filename = os.path.basename(f)
        try:
            # Extract ID (everything before the first underscore)
            person_id = filename.split('_')[0]
            data.append({'filepath': f, 'label': person_id})
        except Exception as e:
            continue

    df = pd.DataFrame(data)

    # Analyze class distribution
    counts = df['label'].value_counts()
    print(f"\nüìä Dataset Statistics:")
    print(f"   Total Images: {len(df)}")
    print(f"   Unique Identities: {len(counts)}")
    print(f"   Avg images per person: {counts.mean():.1f}")
    print(f"   Min images per person: {counts.min()}")
    print(f"   Max images per person: {counts.max()}")

    # Filter out identities with too few images
    valid_labels = counts[counts >= cfg.MIN_SAMPLES].index
    df_clean = df[df['label'].isin(valid_labels)].copy()

    removed = len(df) - len(df_clean)
    print(f"\nüßπ Filtered out {removed} images from identities with < {cfg.MIN_SAMPLES} samples")
    print(f"   Final Dataset: {len(df_clean)} images")
    print(f"   Final Identities: {df_clean['label'].nunique()}")

    return df_clean

# Run preparation
df = prepare_metadata(cfg.DATA_DIR)

# Encode Labels (String ID -> 0, 1, 2, ...)
label_encoder = LabelEncoder()
df['label_idx'] = label_encoder.fit_transform(df['label'])
num_classes = len(label_encoder.classes_)

print(f"\n‚úÖ Data preparation complete")
print(f"   Number of classes: {num_classes}")

üìÇ Scanning files in /kaggle/input/casiawebface-dataset-crop/CASIA-WebFace_crop...


KeyboardInterrupt: 

Cell 5: Train-Val Split

In [None]:
# Cell 5: Stratified Train-Val Split
train_df, val_df = train_test_split(
    df,
    test_size=0.15,
    stratify=df['label_idx'],
    random_state=42
)

train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)

print(f"üìä Dataset Split:")
print(f"   Train Set: {len(train_df)} images ({len(train_df)/len(df)*100:.1f}%)")
print(f"   Val Set:   {len(val_df)} images ({len(val_df)/len(df)*100:.1f}%)")
print(f"   Train classes: {train_df['label_idx'].nunique()}")
print(f"   Val classes:   {val_df['label_idx'].nunique()}")

Cell 6: Custom Dataset with Mixup

In [None]:
# Cell 6: Enhanced Dataset Class
class CasiaDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.df = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = row['filepath']
        label = row['label_idx']

        try:
            image = Image.open(img_path).convert("RGB")
        except Exception as e:
            # Fallback to next image if corrupt
            print(f"‚ö†Ô∏è Corrupt image: {img_path}")
            return self.__getitem__((idx + 1) % len(self))

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(label, dtype=torch.long)

# Mixup Augmentation Function
def mixup_data(x, y, alpha=0.2):
    """Apply mixup augmentation"""
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(x.device)

    mixed_x = lam * x + (1 - lam) * x[index]
    y_a, y_b = y, y[index]

    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    """Calculate mixup loss"""
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

print("‚úÖ Dataset class and Mixup functions defined")

Cell 7: Enhanced Augmentation

In [None]:
# Cell 7: Enhanced Augmentation Pipeline
train_transforms = transforms.Compose([
    transforms.Resize((cfg.IMG_SIZE + 32, cfg.IMG_SIZE + 32)),  # Resize larger first
    transforms.RandomCrop(cfg.IMG_SIZE),  # Then random crop
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.RandomGrayscale(p=0.1),  # NEW: Random grayscale
    transforms.RandomApply([transforms.GaussianBlur(kernel_size=3)], p=0.1),  # NEW: Blur
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.3, scale=(0.02, 0.2))  # Increased probability
])

val_transforms = transforms.Compose([
    transforms.Resize((cfg.IMG_SIZE, cfg.IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Create Datasets
train_ds = CasiaDataset(train_df, transform=train_transforms)
val_ds = CasiaDataset(val_df, transform=val_transforms)

# Create DataLoaders
train_loader = DataLoader(
    train_ds,
    batch_size=cfg.BATCH_SIZE,
    shuffle=True,
    num_workers=cfg.NUM_WORKERS,
    pin_memory=True,
    drop_last=True  # Drop incomplete batches for stability
)

val_loader = DataLoader(
    val_ds,
    batch_size=cfg.BATCH_SIZE,
    shuffle=False,
    num_workers=cfg.NUM_WORKERS,
    pin_memory=True
)

print(f"‚úÖ Data loaders created")
print(f"   Train batches: {len(train_loader)}")
print(f"   Val batches: {len(val_loader)}")

Cell 8: Enhanced Model Architecture

In [None]:
# Cell 8: Enhanced Model Architecture
class ImprovedFaceRecognitionModel(nn.Module):
    def __init__(self, model_name, num_classes, embedding_size=512, dropout=0.3):
        super(ImprovedFaceRecognitionModel, self).__init__()

        # Load pretrained backbone
        self.backbone = timm.create_model(model_name, pretrained=True, num_classes=0)
        in_features = self.backbone.num_features

        # Enhanced classifier head with batch normalization
        self.classifier = nn.Sequential(
            nn.Linear(in_features, embedding_size),
            nn.BatchNorm1d(embedding_size),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(embedding_size, embedding_size // 2),
            nn.BatchNorm1d(embedding_size // 2),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout / 2),
            nn.Linear(embedding_size // 2, num_classes)
        )

        # L2 normalization layer (optional, helps with face recognition)
        self.use_l2_norm = False  # Set to True for normalized embeddings

    def forward(self, x):
        features = self.backbone(x)

        if self.use_l2_norm:
            features = F.normalize(features, p=2, dim=1)

        output = self.classifier(features)
        return output

    def get_embedding(self, x):
        """Extract features for similarity comparison"""
        with torch.no_grad():
            features = self.backbone(x)
            if self.use_l2_norm:
                features = F.normalize(features, p=2, dim=1)
        return features

# Create model
print(f"üèóÔ∏è Building {cfg.MODEL_NAME} model...")
model = ImprovedFaceRecognitionModel(
    model_name=cfg.MODEL_NAME,
    num_classes=num_classes,
    embedding_size=cfg.EMBEDDING_SIZE,
    dropout=cfg.DROPOUT
)
model = model.to(device)

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"‚úÖ Model created successfully")
print(f"   Total parameters: {total_params:,}")
print(f"   Trainable parameters: {trainable_params:,}")
print(f"   Model size: ~{total_params * 4 / 1e6:.1f} MB")

Cell 9: Optimizer and Scheduler

In [None]:
# Cell 9: Optimizer, Loss, and Learning Rate Scheduler
# Label Smoothing Loss
criterion = nn.CrossEntropyLoss(label_smoothing=cfg.LABEL_SMOOTHING)

# Differential Learning Rates (lower for backbone, higher for classifier)
optimizer = optim.AdamW([
    {'params': model.backbone.parameters(), 'lr': cfg.BACKBONE_LR},
    {'params': model.classifier.parameters(), 'lr': cfg.LEARNING_RATE}
], weight_decay=cfg.WEIGHT_DECAY)

# Cosine Annealing with Warm Restarts
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer,
    T_0=10,  # Restart every 10 epochs
    T_mult=2,  # Double the period after each restart
    eta_min=1e-6
)

# Alternative: OneCycleLR (uncomment to use)
# scheduler = optim.lr_scheduler.OneCycleLR(
#     optimizer,
#     max_lr=[cfg.BACKBONE_LR * 10, cfg.LEARNING_RATE * 10],
#     epochs=cfg.EPOCHS,
#     steps_per_epoch=len(train_loader),
#     pct_start=0.3
# )

print("‚úÖ Optimizer and scheduler configured")
print(f"   Backbone LR: {cfg.BACKBONE_LR}")
print(f"   Classifier LR: {cfg.LEARNING_RATE}")
print(f"   Weight Decay: {cfg.WEIGHT_DECAY}")

Cell 10: Enhanced Training Function

In [None]:
# Cell 10: Enhanced Training Function with Mixup
def train_fn(model, loader, optimizer, criterion, use_mixup=True):
    model.train()
    running_loss = 0
    correct = 0
    total = 0

    loop = tqdm(loader, desc="Training", leave=False)

    for imgs, labels in loop:
        imgs, labels = imgs.to(device), labels.to(device)

        # Apply mixup augmentation
        if use_mixup and cfg.USE_MIXUP:
            imgs, labels_a, labels_b, lam = mixup_data(imgs, labels, cfg.MIXUP_ALPHA)

            optimizer.zero_grad()
            outputs = model(imgs)
            loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)

            # For accuracy calculation, use original labels
            _, preds = outputs.max(1)
            correct += (lam * preds.eq(labels_a).sum().item() +
                       (1 - lam) * preds.eq(labels_b).sum().item())
        else:
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)

            _, preds = outputs.max(1)
            correct += preds.eq(labels).sum().item()

        loss.backward()

        # Gradient clipping for stability
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()

        running_loss += loss.item()
        total += labels.size(0)

        loop.set_postfix(loss=loss.item(), acc=100.*correct/total)

    return running_loss / len(loader), 100. * correct / total

def val_fn(model, loader, criterion):
    model.eval()
    running_loss = 0
    correct = 0
    total = 0
    top5_correct = 0

    all_preds = []
    all_labels = []

    with torch.no_grad():
        loop = tqdm(loader, desc="Validation", leave=False)
        for imgs, labels in loop:
            imgs, labels = imgs.to(device), labels.to(device)

            outputs = model(imgs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()

            # Top-1 accuracy
            _, preds = outputs.max(1)
            correct += preds.eq(labels).sum().item()

            # Top-5 accuracy
            _, top5_preds = outputs.topk(5, dim=1)
            top5_correct += sum([1 for i, label in enumerate(labels)
                                if label in top5_preds[i]])

            total += labels.size(0)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            loop.set_postfix(loss=loss.item(), acc=100.*correct/total)

    top1_acc = 100. * correct / total
    top5_acc = 100. * top5_correct / total

    return running_loss / len(loader), top1_acc, top5_acc, all_preds, all_labels

print("‚úÖ Training and validation functions defined")

Cell 11: Early Stopping Class

In [None]:
# Cell 11: Early Stopping Implementation
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0.001, mode='max'):
        self.patience = patience
        self.min_delta = min_delta
        self.mode = mode
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, score):
        if self.best_score is None:
            self.best_score = score
            return False

        if self.mode == 'max':
            if score > self.best_score + self.min_delta:
                self.best_score = score
                self.counter = 0
            else:
                self.counter += 1
        else:  # min mode
            if score < self.best_score - self.min_delta:
                self.best_score = score
                self.counter = 0
            else:
                self.counter += 1

        if self.counter >= self.patience:
            self.early_stop = True
            return True

        return False

# Initialize early stopping
early_stopping = EarlyStopping(patience=cfg.PATIENCE, min_delta=cfg.MIN_DELTA, mode='max')

print("‚úÖ Early stopping initialized")
print(f"   Patience: {cfg.PATIENCE} epochs")
print(f"   Min improvement: {cfg.MIN_DELTA}")

In [None]:
# Cell 12: Main Training Loop with All Enhancements
# Setup Cloud Repo
if USE_HF_BACKUP:
    api = HfApi()
    try:
        api.create_repo(repo_id=cfg.HF_REPO_ID, repo_type="model", exist_ok=True)
        print(f"‚úÖ Cloud Repository Ready: https://huggingface.co/{cfg.HF_REPO_ID}")
    except Exception as e:
        print(f"‚ö†Ô∏è Repo creation warning: {e}")

# Training History
history = {
    'train_loss': [], 'val_loss': [],
    'train_acc': [], 'val_acc': [],
    'top5_acc': [], 'learning_rates': []
}

best_acc = 0.0
best_top5_acc = 0.0

print(f"\n{'='*60}")
print(f"üöÄ STARTING TRAINING")
print(f"{'='*60}")
print(f"Model: {cfg.MODEL_NAME}")
print(f"Classes: {num_classes}")
print(f"Training samples: {len(train_df)}")
print(f"Validation samples: {len(val_df)}")
print(f"Epochs: {cfg.EPOCHS}")
print(f"Batch size: {cfg.BATCH_SIZE}")
print(f"{'='*60}\n")

for epoch in range(cfg.EPOCHS):
    print(f"\nüìÖ Epoch {epoch+1}/{cfg.EPOCHS}")
    print(f"{'‚îÄ'*60}")

    # Get current learning rate
    current_lr = optimizer.param_groups[0]['lr']

    # Training phase
    train_loss, train_acc = train_fn(
        model, train_loader, optimizer, criterion,
        use_mixup=(epoch < cfg.EPOCHS * 0.8)  # Disable mixup in last 20% of training
    )

    # Validation phase
    val_loss, val_acc, top5_acc, val_preds, val_labels = val_fn(
        model, val_loader, criterion
    )

    # Update learning rate
    scheduler.step()

    # Store history
    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['train_acc'].append(train_acc)
    history['val_acc'].append(val_acc)
    history['top5_acc'].append(top5_acc)
    history['learning_rates'].append(current_lr)

    # Print epoch summary
    print(f"üìä Results:")
    print(f"   Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%")
    print(f"   Val Loss:   {val_loss:.4f} | Val Acc:   {val_acc:.2f}%")
    print(f"   Top-5 Acc:  {top5_acc:.2f}%")
    print(f"   Learning Rate: {current_lr:.6f}")

    # Calculate overfitting gap
    gap = train_acc - val_acc
    print(f"   Overfit Gap: {gap:.2f}%", end="")
    if gap > 20:
        print(" ‚ö†Ô∏è High overfitting!")
    elif gap > 10:
        print(" ‚ö†Ô∏è Moderate overfitting")
    else:
        print(" ‚úÖ Good generalization")

    # Save best model
    if val_acc > best_acc:
        improvement = val_acc - best_acc
        best_acc = val_acc
        best_top5_acc = top5_acc

        # Save locally
        checkpoint = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'best_acc': best_acc,
            'best_top5_acc': best_top5_acc,
            'num_classes': num_classes,
            'label_encoder': label_encoder
        }
        torch.save(checkpoint, cfg.MODEL_FILENAME)
        print(f"   üíæ Saved Best Model (‚Üë{improvement:.2f}%)")

        # Upload to cloud
        if USE_HF_BACKUP:
            try:
                print("   ‚òÅÔ∏è Uploading to Hugging Face...", end="")
                api.upload_file(
                    path_or_fileobj=cfg.MODEL_FILENAME,
                    path_in_repo=cfg.MODEL_FILENAME,
                    repo_id=cfg.HF_REPO_ID,
                    repo_type="model"
                )
                print(" Done! ‚úÖ")
            except Exception as e:
                print(f" Failed ‚ùå ({e})")

    # Periodic checkpoint
    if (epoch + 1) % cfg.SAVE_FREQ == 0:
        checkpoint_name = f"checkpoint_epoch_{epoch+1}.pth"
        torch.save(checkpoint, checkpoint_name)
        print(f"   üíæ Checkpoint saved: {checkpoint_name}")

    # Early stopping check
    if early_stopping(val_acc):
        print(f"\n‚èπÔ∏è Early stopping triggered at epoch {epoch+1}")
        print(f"   No improvement for {cfg.PATIENCE} consecutive epochs")
        break

print(f"\n{'='*60}")
print(f"‚úÖ TRAINING COMPLETE")
print(f"{'='*60}")
print(f"Best Validation Accuracy: {best_acc:.2f}%")
print(f"Best Top-5 Accuracy: {best_top5_acc:.2f}%")
print(f"Final Overfitting Gap: {history['train_acc'][-1] - history['val_acc'][-1]:.2f}%")
print(f"{'='*60}\n")

vizualization

In [None]:
plt.figure(figsize=(15, 5))

# Accuracy
plt.subplot(1, 2, 1)
plt.plot(history['train_acc'], label='Train Accuracy', marker='o')
plt.plot(history['val_acc'], label='Validation Accuracy', marker='o')
plt.title('Accuracy Improvement')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.grid(True)

# Loss
plt.subplot(1, 2, 2)
plt.plot(history['train_loss'], label='Train Loss', marker='o')
plt.plot(history['val_loss'], label='Validation Loss', marker='o')
plt.title('Loss Reduction')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.show()