In [None]:
!pip install numpy==1.22.4 scipy==1.10.1 umap-learn scikit-learn tqdm matplotlib seaborn opencv-python-headless torch torchvision torchaudio --upgrade --quiet


In [None]:
!pip install umap-learn

### LeNet-5 Implementation for Amazigh Handwritten Character Recognition

In [None]:
# -*- coding: utf-8 -*-
"""LeNet-5 Implementation for AMHCD Classification

This script implements LeNet-5 architecture for classifying Amazigh handwritten characters (33 classes).
It includes data preprocessing, model training, evaluation, and visualization.

Requirements:
- Python 3.7+
- PyTorch 1.9+
- torchvision
- scikit-learn
- matplotlib
- seaborn
- numpy
- pandas
- umap-learn
- opencv-python

Example:
    $ python lenet5_amhcd.py

"""

# Section 1: Imports & Environment Setup
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import transforms, models
from PIL import Image
from sklearn.metrics import confusion_matrix, classification_report, f1_score
from sklearn.model_selection import train_test_split
import umap

# Set random seeds for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Create directories if they don't exist
os.makedirs('models', exist_ok=True)
os.makedirs('figures', exist_ok=True)
os.makedirs('results', exist_ok=True)

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Section 2: Dataset Loading & Preprocessing
class AMHCDDataset(Dataset):
    """AMHCD Dataset class for loading and preprocessing Amazigh handwritten characters."""
    
    def __init__(self, root_dir, transform=None, split='train', mean=None, std=None):
        self.root_dir = root_dir
        self.transform = transform
        self.split = split
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        self.images = []
        self.labels = []
        
        # Load images and labels
        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                for img_name in os.listdir(class_dir):
                    if img_name.endswith('.png'):
                        self.images.append(os.path.join(class_dir, img_name))
                        self.labels.append(self.class_to_idx[class_name])
        
        # Calculate mean and std from training set if not provided
        if mean is None or std is None:
            self.mean, self.std = self.calculate_mean_std()
        else:
            self.mean, self.std = mean, std
            
        # Update transform with normalization
        if self.transform is None:
            self.transform = self.get_default_transform()
        else:
            # Add normalization to the provided transform
            self.transform.transforms.append(transforms.Normalize(self.mean, self.std))
    
    def calculate_mean_std(self):
        """Calculate mean and standard deviation of the training set."""
        print("Calculating mean and std from training data...")
        mean = np.zeros(3)
        std = np.zeros(3)
        num_pixels = 0
        
        for img_path in self.images:
            img = Image.open(img_path).convert('RGB')
            img = np.array(img) / 255.0
            mean += img.mean(axis=(0, 1))
            std += img.std(axis=(0, 1))
            num_pixels += 1
        
        mean /= num_pixels
        std /= num_pixels
        
        return mean, std
    
    def get_default_transform(self):
        """Get default transform based on split (train/val/test)."""
        if self.split == 'train':
            return transforms.Compose([
                transforms.Resize((32, 32)),
                transforms.RandomRotation(10),
                transforms.RandomAffine(0, translate=(0.1, 0.1)),
                transforms.ColorJitter(brightness=0.1, contrast=0.1),
                transforms.ToTensor(),
                transforms.Normalize(self.mean, self.std)
            ])
        else:
            return transforms.Compose([
                transforms.Resize((32, 32)),
                transforms.ToTensor(),
                transforms.Normalize(self.mean, self.std)
            ])
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Load dataset and create splits
dataset_path = "data/amhcd"
full_dataset = AMHCDDataset(dataset_path, split='train')

# Get mean and std from full dataset for consistent normalization
mean, std = full_dataset.mean, full_dataset.std

# Split dataset into train, val, test (70/15/15)
train_idx, temp_idx = train_test_split(
    range(len(full_dataset)), 
    test_size=0.3, 
    random_state=seed, 
    stratify=full_dataset.labels
)
val_idx, test_idx = train_test_split(
    temp_idx, 
    test_size=0.5, 
    random_state=seed, 
    stratify=[full_dataset.labels[i] for i in temp_idx]
)

# Create subset datasets
train_dataset = torch.utils.data.Subset(full_dataset, train_idx)
val_dataset = torch.utils.data.Subset(full_dataset, val_idx)
test_dataset = torch.utils.data.Subset(full_dataset, test_idx)

# Set the split type for each subset
train_dataset.dataset.split = 'train'
val_dataset.dataset.split = 'val'
test_dataset.dataset.split = 'test'

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")

# Section 3: DataLoaders
batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Section 4: LeNet-5 Model Definition
class LeNet5(nn.Module):
    """LeNet-5 architecture adapted for 33-class AMHCD classification."""
    
    def __init__(self, num_classes=33, dropout_rate=0.2):
        super(LeNet5, self).__init__()
        
        # Feature extraction layers
        self.features = nn.Sequential(
            # Conv1: input 32x32x3, output 28x28x6
            nn.Conv2d(3, 6, kernel_size=5),
            nn.ReLU(),
            # Pool1: output 14x14x6
            nn.AvgPool2d(kernel_size=2, stride=2),
            
            # Conv2: output 10x10x16
            nn.Conv2d(6, 16, kernel_size=5),
            nn.ReLU(),
            # Pool2: output 5x5x16
            nn.AvgPool2d(kernel_size=2, stride=2),
            
            # Conv3: output 1x1x120
            nn.Conv2d(16, 120, kernel_size=5),
            nn.ReLU(),
        )
        
        # Classifier layers
        self.classifier = nn.Sequential(
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(84, num_classes),
        )
        
        # Initialize weights
        self._initialize_weights()
    
    def _initialize_weights(self):
        """Initialize weights using Kaiming He initialization for ReLU activations."""
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# Create model
model = LeNet5(num_classes=33, dropout_rate=0.2).to(device)
print(model)

# Section 5: Training Function + Validation
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)

# Initialize metrics tracking
train_losses, val_losses = [], []
train_accs, val_accs = [], []
best_val_acc = 0.0

def train_epoch(model, loader, optimizer, criterion, device):
    """Train for one epoch."""
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (data, target) in enumerate(loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()
    
    epoch_loss = running_loss / len(loader)
    epoch_acc = 100. * correct / total
    
    return epoch_loss, epoch_acc

def validate_epoch(model, loader, criterion, device):
    """Validate for one epoch."""
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            
            running_loss += loss.item()
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()
    
    epoch_loss = running_loss / len(loader)
    epoch_acc = 100. * correct / total
    
    return epoch_loss, epoch_acc

# Training loop
print("Starting training...")
for epoch in range(30):
    # Train
    train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
    
    # Validate
    val_loss, val_acc = validate_epoch(model, val_loader, criterion, device)
    
    # Update scheduler
    scheduler.step()
    
    # Save metrics
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), 'models/best.pt')
    
    print(f'Epoch {epoch+1:02d}: '
          f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}% | '
          f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')

# Save final model
torch.save(model.state_dict(), 'models/final.pt')

# Save metrics to CSV
metrics_df = pd.DataFrame({
    'epoch': range(1, 31),
    'train_loss': train_losses,
    'val_loss': val_losses,
    'train_acc': train_accs,
    'val_acc': val_accs
})
metrics_df.to_csv('results/metrics.csv', index=False)

# Section 6: Evaluation on Test Set
print("Evaluating on test set...")
model.load_state_dict(torch.load('models/best.pt'))
test_loss, test_acc = validate_epoch(model, test_loader, criterion, device)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')

# Get predictions for test set
all_preds = []
all_targets = []
all_probs = []

model.eval()
with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        probs = torch.softmax(output, dim=1)
        
        _, preds = output.max(1)
        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(target.cpu().numpy())
        all_probs.extend(probs.cpu().numpy())

# Calculate F1 scores
macro_f1 = f1_score(all_targets, all_preds, average='macro')
weighted_f1 = f1_score(all_targets, all_preds, average='weighted')

print(f'Test Accuracy: {test_acc:.2f}%')
print(f'Macro F1: {macro_f1:.3f}')
print(f'Weighted F1: {weighted_f1:.3f}')

# Generate classification report
class_report = classification_report(all_targets, all_preds, target_names=full_dataset.classes, output_dict=True)
class_report_df = pd.DataFrame(class_report).transpose()
class_report_df.to_csv('results/classification_report.csv')

# Section 7: Generate Figures

# 7.1 Loss & Accuracy curves
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_accs, label='Train Accuracy')
plt.plot(val_accs, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Training and Validation Accuracy')
plt.legend()

plt.tight_layout()
plt.savefig('figures/loss_acc.png', dpi=300, bbox_inches='tight')
plt.show()

# 7.2 Confusion matrix
plt.figure(figsize=(15, 12))
cm = confusion_matrix(all_targets, all_preds)
sns.heatmap(cm, annot=False, fmt='d', cmap='Blues', 
            xticklabels=full_dataset.classes, 
            yticklabels=full_dataset.classes)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.xticks(rotation=90)
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig('figures/confusion_matrix.png', dpi=300, bbox_inches='tight')
plt.show()

# 7.3 Grid of sample predictions
def visualize_predictions(model, loader, class_names, device, num_samples=12):
    """Visualize sample predictions."""
    model.eval()
    images, labels = next(iter(loader))
    images, labels = images.to(device), labels.to(device)
    
    with torch.no_grad():
        outputs = model(images[:num_samples])
        probs = torch.softmax(outputs, dim=1)
        pred_probs, preds = probs.max(1)
    
    fig, axes = plt.subplots(3, 4, figsize=(15, 12))
    axes = axes.ravel()
    
    for i in range(num_samples):
        # Denormalize image
        img = images[i].cpu().numpy().transpose((1, 2, 0))
        img = img * std + mean  # Reverse normalization
        img = np.clip(img, 0, 1)
        
        axes[i].imshow(img)
        axes[i].set_title(f'True: {class_names[labels[i]]}\n'
                         f'Pred: {class_names[preds[i]]}\n'
                         f'Prob: {pred_probs[i]:.3f}')
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.savefig('figures/samples_grid.png', dpi=300, bbox_inches='tight')
    plt.show()

visualize_predictions(model, test_loader, full_dataset.classes, device)

# 7.4 t-SNE visualization of features
def extract_features(model, loader, device):
    """Extract features from the model."""
    model.eval()
    features = []
    targets = []
    
    with torch.no_grad():
        for data, target in loader:
            data = data.to(device)
            # Extract features from the layer before the classifier
            feature = model.features(data)
            feature = torch.flatten(feature, 1)
            features.append(feature.cpu().numpy())
            targets.append(target.numpy())
    
    return np.vstack(features), np.concatenate(targets)

print("Extracting features for t-SNE visualization...")
features, targets = extract_features(model, test_loader, device)

# Apply UMAP (better for visualization than t-SNE in many cases)
reducer = umap.UMAP(random_state=seed)
embedding = reducer.fit_transform(features)

plt.figure(figsize=(12, 10))
scatter = plt.scatter(embedding[:, 0], embedding[:, 1], c=targets, cmap='tab20', s=10)
plt.colorbar(scatter, ticks=range(33))
plt.title('UMAP Visualization of Feature Space')
plt.savefig('figures/umap.png', dpi=300, bbox_inches='tight')
plt.show()

# 7.5 Grad-CAM visualization
class GradCAM:
    """Grad-CAM implementation for visualization."""
    
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None
        self.hooks = []
        
        # Register hooks
        self._register_hooks()
    
    def _register_hooks(self):
        def forward_hook(module, input, output):
            self.activations = output
        
        def backward_hook(module, grad_input, grad_output):
            self.gradients = grad_output[0]
        
        self.hooks.append(self.target_layer.register_forward_hook(forward_hook))
        self.hooks.append(self.target_layer.register_backward_hook(backward_hook))
    
    def generate(self, input_image, target_class=None):
        self.model.eval()
        
        # Forward pass
        output = self.model(input_image.unsqueeze(0))
        
        if target_class is None:
            target_class = output.argmax(dim=1).item()
        
        # Backward pass
        self.model.zero_grad()
        one_hot = torch.zeros_like(output)
        one_hot[0, target_class] = 1
        output.backward(gradient=one_hot)
        
        # Calculate weights
        gradients = self.gradients[0].cpu().numpy()
        activations = self.activations[0].cpu().numpy()
        weights = np.mean(gradients, axis=(1, 2))
        
        # Generate CAM
        cam = np.zeros(activations.shape[1:], dtype=np.float32)
        for i, w in enumerate(weights):
            cam += w * activations[i]
        
        cam = np.maximum(cam, 0)  # ReLU
        cam = cv2.resize(cam, (32, 32))
        cam = cam - np.min(cam)
        cam = cam / np.max(cam) if np.max(cam) > 0 else cam
        
        return cam, target_class
    
    def remove_hooks(self):
        for hook in self.hooks:
            hook.remove()

# Apply Grad-CAM to sample images
def apply_gradcam(model, loader, device, num_images=6):
    """Apply Grad-CAM to sample images."""
    # Get target layer (last convolutional layer)
    target_layer = model.features[-2]  # The last conv layer before ReLU
    
    # Initialize Grad-CAM
    gradcam = GradCAM(model, target_layer)
    
    # Get sample images (3 correct, 3 incorrect)
    model.eval()
    correct_samples = []
    incorrect_samples = []
    
    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            preds = output.argmax(dim=1)
            
            for i in range(len(data)):
                if preds[i] == target[i] and len(correct_samples) < 3:
                    correct_samples.append((data[i], target[i], preds[i]))
                elif preds[i] != target[i] and len(incorrect_samples) < 3:
                    incorrect_samples.append((data[i], target[i], preds[i]))
                
                if len(correct_samples) >= 3 and len(incorrect_samples) >= 3:
                    break
            
            if len(correct_samples) >= 3 and len(incorrect_samples) >= 3:
                break
    
    # Generate Grad-CAM visualizations
    samples = correct_samples + incorrect_samples
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.ravel()
    
    for i, (img, true_label, pred_label) in enumerate(samples):
        # Generate CAM
        cam, _ = gradcam.generate(img, pred_label)
        
        # Convert to heatmap
        heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)
        heatmap = np.float32(heatmap) / 255
        
        # Denormalize original image
        img_np = img.cpu().numpy().transpose((1, 2, 0))
        img_np = img_np * std + mean  # Reverse normalization
        img_np = np.clip(img_np, 0, 1)
        
        # Overlay heatmap on image
        overlay = heatmap + np.float32(img_np)
        overlay = overlay / np.max(overlay)
        
        # Plot
        axes[i].imshow(overlay)
        axes[i].set_title(f'True: {full_dataset.classes[true_label]}\n'
                         f'Pred: {full_dataset.classes[pred_label]}')
        axes[i].axis('off')
        
        # Save individual images
        plt.imsave(f'figures/gradcam_{i+1}.png', overlay)
    
    plt.tight_layout()
    plt.savefig('figures/gradcam_all.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # Remove hooks
    gradcam.remove_hooks()

print("Generating Grad-CAM visualizations...")
apply_gradcam(model, test_loader, device)

# Section 8: Save Models and Metrics
# Already done during training and evaluation

# Section 9: Display Summary of Results
print("\n" + "="*50)
print("SUMMARY OF RESULTS")
print("="*50)
print(f"Test Accuracy: {test_acc:.2f}%")
print(f"Macro F1: {macro_f1:.3f}")
print(f"Weighted F1: {weighted_f1:.3f}")
print("="*50)

# Create a summary dataframe
summary_df = pd.DataFrame({
    'Metric': ['Test Accuracy', 'Macro F1', 'Weighted F1'],
    'Value': [f"{test_acc:.2f}%", f"{macro_f1:.3f}", f"{weighted_f1:.3f}"]
})
summary_df.to_csv('results/summary.csv', index=False)

print("All results and figures have been saved successfully!")