# =============================================================================
# BASELINE DENSENET121 PRETRAINING (GENUINE VS FORGED)
# =============================================================================
Pretrain DenseNet121 baseline model on 150 background users for binary classification (genuine vs forged).
Output: baseline_pretrain.pth weights to be used in cross-validation on 110 evaluation users.

# =============================================================================
# STEP 1: SETUP, IMPORTS, AND REPRODUCIBILITY
# =============================================================================

In [None]:
import os
import sys
import json
import random
import shutil
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from PIL import Image

# Set repo root
current_dir = os.path.abspath(os.getcwd())
REPO_ROOT = os.path.abspath(os.path.join(current_dir, '..'))
if REPO_ROOT not in sys.path:
    sys.path.append(REPO_ROOT)

# Import Custom Modules
from models.feature_extractor import DenseNetFeatureExtractor

# Deterministic Seeding for Reproducible Research
def seed_everything(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f" > [System] Seed set to: {seed}")

seed_everything(42)

# Device Configuration
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f" > [System] Computation Device: {DEVICE}")
print(f" > [System] CUDA Available: {torch.cuda.is_available()}")

# =============================================================================
# STEP 2: HYPERPARAMETER CONFIGURATION
# =============================================================================

In [None]:
# --- Input Configuration ---
IMG_SIZE = 224
INPUT_SHAPE = (IMG_SIZE, IMG_SIZE)

# --- Training Hyperparameters ---
BATCH_SIZE = 30
LEARNING_RATE = 1e-3
FINETUNE_LR = 1e-5          # Lower LR for backbone fine-tuning
EPOCHS = 30                  # Total epochs
FREEZE_EPOCHS = 10           # Phase 1: train head only (backbone frozen)
                              # Phase 2: epochs FREEZE_EPOCHS+1..EPOCHS (backbone unfrozen)

# --- Data Configuration ---
# Use 150 background users
BACKGROUND_USERS_PATH = os.path.join(REPO_ROOT, 'data', 'splits', 'bhsig_background_users.json')

# --- Checkpoint Configuration ---
CHECKPOINT_DIR = os.path.join(REPO_ROOT, 'checkpoints', 'baseline_pretraining')
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

print(f"[Config] Image Size: {INPUT_SHAPE}")
print(f"[Config] Batch Size: {BATCH_SIZE}")
print(f"[Config] Learning Rate (head): {LEARNING_RATE}")
print(f"[Config] Learning Rate (fine-tune): {FINETUNE_LR}")
print(f"[Config] Total Epochs: {EPOCHS}")
print(f"[Config] Frozen Epochs (Phase 1): {FREEZE_EPOCHS}")
print(f"[Config] Fine-tune Epochs (Phase 2): {EPOCHS - FREEZE_EPOCHS}")
print(f"[Config] Background Users File: {BACKGROUND_USERS_PATH}")
print(f"[Config] Checkpoints: {CHECKPOINT_DIR}")

# =============================================================================
# STEP 3: DATA CONSOLIDATION AND BACKGROUND USER SPLIT
# =============================================================================

In [None]:
import os
import shutil
import json

# 1. Fix REPO_ROOT to point to the parent 'thesis' folder, not 'notebooks'
current_dir = os.path.abspath(os.getcwd())
if 'notebooks' in current_dir:
    REPO_ROOT = os.path.abspath(os.path.join(current_dir, '..'))
else:
    REPO_ROOT = current_dir

print(f"Project Root: {REPO_ROOT}")

# 2. Define Paths relative to the correct Root
DATA_ROOT = os.path.join(REPO_ROOT, 'data', 'bhsig260-hindi-bengali')

# Fallback: if the Kaggle folder name is different, allow direct repo/data
if not os.path.isdir(DATA_ROOT):
    alt_root = os.path.join(REPO_ROOT, 'data')
    if os.path.isdir(alt_root):
        DATA_ROOT = alt_root

working_dir = os.path.join(REPO_ROOT, 'data')
genuine_dir = os.path.join(working_dir, 'all_genuine')
forged_dir = os.path.join(working_dir, 'all_forged')
splits_dir = os.path.join(working_dir, 'splits')

# Create clean directories
for d in [genuine_dir, forged_dir, splits_dir]:
    if os.path.exists(d):
        shutil.rmtree(d)
    os.makedirs(d, exist_ok=True)

print("Status: Consolidating BHSig dataset (Hindi + Bengali) into a unified structure...")

# Verify source data exists
if os.path.isdir(DATA_ROOT):
    # Copy genuine signatures
    hindi_gen = os.path.join(DATA_ROOT, 'BHSig160_Hindi', 'Genuine')
    bengali_gen = os.path.join(DATA_ROOT, 'BHSig100_Bengali', 'Genuine')
    
    if os.path.isdir(hindi_gen):
        os.system(f'cp -r "{hindi_gen}/"* "{genuine_dir}/" 2>/dev/null || true')
    if os.path.isdir(bengali_gen):
        os.system(f'cp -r "{bengali_gen}/"* "{genuine_dir}/" 2>/dev/null || true')
    
    # Copy forged signatures
    hindi_forg = os.path.join(DATA_ROOT, 'BHSig160_Hindi', 'Forged')
    bengali_forg = os.path.join(DATA_ROOT, 'BHSig100_Bengali', 'Forged')
    
    if os.path.isdir(hindi_forg):
        os.system(f'cp -r "{hindi_forg}/"* "{forged_dir}/" 2>/dev/null || true')
    if os.path.isdir(bengali_forg):
        os.system(f'cp -r "{bengali_forg}/"* "{forged_dir}/" 2>/dev/null || true')
    
    print(f" > Genuine files: {len(os.listdir(genuine_dir))}")
    print(f" > Forged files: {len(os.listdir(forged_dir))}")
else:
    print(f"ERROR: Data source not found at {DATA_ROOT}")

# --- FIX START: Use Absolute Path for Script ---
print(" > Generating dataset splits...")

# Construct the full path to the script
script_path = os.path.join(REPO_ROOT, 'scripts', 'restructure_bhsig.py')

# Verify the script exists before running
if not os.path.exists(script_path):
    print(f"CRITICAL ERROR: Script not found at: {script_path}")
else:
    # Run using the absolute path
    script_cmd = f"python \"{script_path}\" --base_dir \"{DATA_ROOT}\" --output_dir \"{splits_dir}\" --pretrain_users 150"
    exit_code = os.system(script_cmd)
    
    if exit_code != 0:
        print(f"Error: Script failed with exit code {exit_code}")

# Load the identified background users
background_users_path = os.path.join(splits_dir, 'bhsig_background_users.json')
if os.path.exists(background_users_path):
    with open(background_users_path, 'r') as f:
        background_users_dict = json.load(f)
    background_users = list(background_users_dict.keys())
    print(f"Success: Loaded {len(background_users)} users for the Pre-training phase.")
else:
    background_users_dict = {}
    background_users = []
    print("Error: Background users file not found.")

In [None]:
# =============================================================================
# DATASET CLASS FOR GENUINE VS FORGED CLASSIFICATION
# =============================================================================

class BHSigDataset(Dataset):
    """
    Dataset for Genuine vs Forged Classification (Binary Classification)
    Label 0: Genuine signature
    Label 1: Forged signature
    """
    def __init__(self, user_dict, user_list, transform=None):
        """
        Args:
            user_dict: Dictionary of users with genuine/forged image paths
            user_list: List of user IDs to use
            transform: Image transformation pipeline
        """
        self.samples = []
        
        # Collect all samples with binary labels (0=genuine, 1=forged)
        for uid in user_list:
            if uid in user_dict:
                user_data = user_dict[uid]
                
                # Add genuine samples (label = 0)
                for img_path in user_data['genuine']:
                    self.samples.append((img_path, 0))
                
                # Add forged samples (label = 1)
                for img_path in user_data['forged']:
                    self.samples.append((img_path, 1))
        
        self.transform = transform
        
        # Count samples per class
        num_genuine = sum(1 for _, label in self.samples if label == 0)
        num_forged = sum(1 for _, label in self.samples if label == 1)
        
        print(f"   Dataset initialized: {len(self.samples)} total samples")
        print(f"   - Genuine: {num_genuine} samples (label=0)")
        print(f"   - Forged: {num_forged} samples (label=1)")
        print(f"   - From {len(user_list)} users")
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        
        try:
            img = Image.open(img_path).convert('RGB')
            if self.transform:
                img = self.transform(img)
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            # Return a blank tensor on error
            img = torch.zeros(3, IMG_SIZE, IMG_SIZE)
        
        return img, label

# =============================================================================
# DATA TRANSFORMATIONS
# =============================================================================

# Training transformations with augmentation
train_transform = transforms.Compose([
    transforms.Resize(INPUT_SHAPE),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.RandomAffine(
        degrees=0,
        translate=(0.1, 0.1),
        scale=(0.9, 1.1),
        fill=0
    ),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

print(" > [Info] Dataset class and transformations defined for binary classification")

# =============================================================================
# STEP 4: CREATE BINARY CLASSIFICATION DATASET (GENUINE VS FORGED)
# =============================================================================

In [None]:
# Create dataset using the 150 background users
user_data_dict = background_users_dict
num_classes = 2

print(" > Creating pretraining dataset for binary classification (Genuine vs Forged)...")
print(f" > Number of classes: {num_classes} (0=Genuine, 1=Forged)")
print(f" > Using {len(background_users)} background users")
pretrain_dataset = BHSigDataset(user_data_dict, background_users, transform=train_transform)
pretrain_loader = DataLoader(
    pretrain_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True, 
    num_workers=4, 
    pin_memory=True,
    drop_last=True
)

print(" > Pretraining dataloader ready")

# =============================================================================
# STEP 5: MODEL INITIALIZATION
# =============================================================================

In [None]:
# Initialize model for binary classification (genuine vs forged)
model = DenseNetFeatureExtractor(
    backbone_name='densenet121',
    output_dim=num_classes,
    pretrained=True,
    baseline=True
).to(DEVICE)

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(" > Model: DenseNet121 Baseline")
print(f" > Total Parameters: {total_params:,}")
print(f" > Trainable Parameters: {trainable_params:,}")
print(f" > Output Classes: {num_classes}")

# =============================================================================
# STEP 6: PRETRAINING EXECUTION
# =============================================================================

In [None]:
# --- Phase 1: Head-only training (backbone frozen) ---
optimizer = optim.AdamW(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=LEARNING_RATE, weight_decay=1e-4
)
criterion = nn.CrossEntropyLoss()
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=5, verbose=True
)

# Training history
history = {'loss': [], 'accuracy': [], 'phase': []}

best_loss = float('inf')

print(f"\n{'='*60}")
print(f"PHASE 1: HEAD-ONLY TRAINING (Backbone Frozen)")
print(f"  Epochs 1-{FREEZE_EPOCHS} | LR: {LEARNING_RATE}")
print(f"{'='*60}\n")

for epoch in range(EPOCHS):
    # === Phase transition: unfreeze backbone at FREEZE_EPOCHS ===
    if epoch == FREEZE_EPOCHS:
        print(f"\n{'='*60}")
        print(f"PHASE 2: FINE-TUNING (Backbone Unfrozen)")
        print(f"  Epochs {FREEZE_EPOCHS+1}-{EPOCHS} | LR: {FINETUNE_LR}")
        print(f"{'='*60}\n")

        # Unfreeze all backbone parameters
        for param in model.backbone.parameters():
            param.requires_grad = True

        # Re-create optimizer with all parameters and lower LR
        optimizer = optim.AdamW([
            {'params': model.backbone.parameters(), 'lr': FINETUNE_LR},
            {'params': model.custom_head.parameters(), 'lr': FINETUNE_LR}
        ], weight_decay=1e-4)

        scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=3, verbose=True
        )

        trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        print(f"  Trainable Parameters after unfreeze: {trainable_params:,}\n")

    current_phase = 1 if epoch < FREEZE_EPOCHS else 2

    # Training
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    pbar = tqdm(pretrain_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Phase {current_phase}]", leave=False)

    for images, labels in pbar:
        images, labels = images.to(DEVICE), labels.to(DEVICE)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

        pbar.set_postfix({'loss': loss.item(), 'acc': f"{100*correct/total:.2f}%"})

    avg_loss = running_loss / len(pretrain_loader)
    avg_acc = correct / total

    history['loss'].append(avg_loss)
    history['accuracy'].append(avg_acc)
    history['phase'].append(current_phase)

    print(f"Epoch {epoch+1:03d} [P{current_phase}] | Loss: {avg_loss:.4f} | Acc: {avg_acc:.2%} | LR: {optimizer.param_groups[0]['lr']:.2e}")

    scheduler.step(avg_loss)

    # Save best model
    if avg_loss < best_loss:
        best_loss = avg_loss
        ckpt_path = os.path.join(CHECKPOINT_DIR, f"best_pretrain_model.pth")
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'num_classes': num_classes,
            'loss': avg_loss,
            'accuracy': avg_acc
        }, ckpt_path)
        print(f"   >>> Best Model Saved! (Loss: {avg_loss:.4f})")

    # Save checkpoint every 5 epochs
    if (epoch + 1) % 5 == 0:
        ckpt_path = os.path.join(CHECKPOINT_DIR, f"pretrain_epoch_{epoch+1}.pth")
        torch.save(model.state_dict(), ckpt_path)

print(f"\n{'='*60}")
print(f"Pretraining Complete!")
print(f"{'='*60}")

# =============================================================================
# STEP 7: SAVE FINAL PRETRAINED WEIGHTS
# =============================================================================

In [None]:
final_weights_path = os.path.join(REPO_ROOT, "baseline_pretrain.pth")
torch.save({
    'model_state_dict': model.state_dict(),
    'num_classes': num_classes,
    'final_loss': history['loss'][-1],
    'final_accuracy': history['accuracy'][-1],
    'config': {
        'img_size': INPUT_SHAPE,
        'batch_size': BATCH_SIZE,
        'learning_rate': LEARNING_RATE,
        'epochs': EPOCHS
    }
}, final_weights_path)

print(f"\n > Final pretrained weights saved to: {final_weights_path}")

# =============================================================================
# STEP 8: TRAINING VISUALIZATION
# =============================================================================

In [None]:
# Plot training curves with phase boundary
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Loss plot
axes[0].plot(history['loss'], color='steelblue', linewidth=2)
axes[0].axvline(x=FREEZE_EPOCHS, color='red', linestyle='--', alpha=0.7, label='Unfreeze Backbone')
axes[0].set_xlabel('Epoch', fontsize=12)
axes[0].set_ylabel('Loss', fontsize=12)
axes[0].set_title('Pretraining Loss', fontsize=13, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)

# Accuracy plot
axes[1].plot([a*100 for a in history['accuracy']], color='green', linewidth=2)
axes[1].axvline(x=FREEZE_EPOCHS, color='red', linestyle='--', alpha=0.7, label='Unfreeze Backbone')
axes[1].set_xlabel('Epoch', fontsize=12)
axes[1].set_ylabel('Accuracy (%)', fontsize=12)
axes[1].set_title('Pretraining Accuracy', fontsize=13, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)

plt.tight_layout()
plot_path = os.path.join(CHECKPOINT_DIR, 'pretraining_curves.png')
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
plt.show()

print(f" > Training curves saved to: {plot_path}")

# =============================================================================
# PRETRAINING SUMMARY
# =============================================================================

In [None]:
print("\n" + "="*60)
print("PRETRAINING SUMMARY")
print("="*60)
print(f"\nModel: Baseline DenseNet121 (Genuine vs Forged Binary Classification)")
print(f"\nConfiguration:")
print(f"   - Image Size: {INPUT_SHAPE}")
print(f"   - Batch Size: {BATCH_SIZE}")
print(f"   - Total Epochs: {EPOCHS}")
print(f"   - Phase 1 (Frozen): {FREEZE_EPOCHS} epochs @ LR={LEARNING_RATE}")
print(f"   - Phase 2 (Fine-tune): {EPOCHS - FREEZE_EPOCHS} epochs @ LR={FINETUNE_LR}")
print(f"   - Number of Classes: {num_classes} (0=Genuine, 1=Forged)")
print(f"\nDataset:")
print(f"   - Training Users: {len(background_users)} (150 background users)")
print(f"   - Training Samples: {len(pretrain_dataset)}")
print(f"\nResults:")
print(f"   - Final Loss: {history['loss'][-1]:.4f}")
print(f"   - Final Accuracy: {history['accuracy'][-1]:.2%}")
print(f"   - Best Loss: {best_loss:.4f}")
print(f"\nSaved Artifacts:")
print(f"   - Pretrained Weights: {final_weights_path}")
print(f"   - Checkpoints: {CHECKPOINT_DIR}")
print(f"   - Training Curves: {plot_path}")
print("="*60)
print("\nPretraining Complete!")
print("Next: Run baseline_kfold_validation.ipynb for cross-validation")
print("="*60)