In [33]:
# %%bash
# # Recursively delete all files and folders in /content
# find /content -mindepth 1 -exec rm -rf {} +

In [None]:
# Import all libraries
# ML
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from torch.utils.mobile_optimizer import optimize_for_mobile
import torch.quantization as quantization

import tensorflow as tf

# Utilities
from PIL import Image
import os
import numpy as np
from tqdm import tqdm

# Runtime
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

In [None]:
class SilverLineDataset(Dataset):
    def __init__(self, root_directory, transform=None):
        self.root_directory = root_directory
        self.transform = transform
        self.samples = []

        # For every image in silver_directory [CLASS 1]
        silver_directory = os.path.join(root_directory, "silver_line")
        if os.path.exists(silver_directory):
            for image_name in os.listdir(silver_directory):
                if image_name.lower().endswith((".png", ".jpg", ".jpeg")):
                    self.samples.append((os.path.join(silver_directory, image_name), 1))

        # For every image in no_silver_directory [CLASS 0]
        no_silver_directory = os.path.join(root_directory, "no_silver_line")
        if os.path.exists(no_silver_directory):
            for image_name in os.listdir(no_silver_directory):
                if image_name.lower().endswith((".png", ".jpg", ".jpeg")):
                    self.samples.append((os.path.join(no_silver_directory, image_name), 0))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index: int):
        image_path, label = self.samples[index]
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, label
    
class CustomSilverDetector(nn.Module):
    def __init__(self, input_size=64, class_count=2):
        super(CustomSilverDetector, self).__init__()

        """
        - 64 -> 32 -> 16 ->  8 ->  4 spatial resolution: forces features to group into heirarchial structure
        -  3 -> 16 -> 32 -> 48 -> 64 feature maps: increases detail and quality for better strucutre identification
        - All use ReLU activation function to break linearity
        """
        self.features = nn.Sequential(
            # Block 1: 64x64 -> 32x32
            nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),

            # Block 2: 32x32 -> 16x16
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),

            # Block 3: 16x16 -> 8x8
            nn.Conv2d(32, 48, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(48),
            nn.ReLU(inplace=True),

            # Block 4: 8x8 -> 4x4
            nn.Conv2d(48, 64, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
        )
        
        """
        Pool to avoid overfitting
        Flatten to pass to next layer
        Drop some pixels at 0.3 chance, encourages generalization
        Linearize for next layer
        Cast ReLU activation function
        Drop some pixels at 0.2 chance, encourages generalization
        Linearize for output layer
        """
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Dropout(0.3),
            nn.Linear(64, 32),
            nn.ReLU(inplace=True),
            nn.Dropout(0.2),
            nn.Linear(32, class_count)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)

        return x
    
def train_model(model, train_loader, validation_loader, epoch_count, device="cuda"):
    # Setup
    """
    Use CrossEntropyLoss as loss function, built for classification models
    Learning Rate = 0.0001 due to small dataset and low parameter count
    Weight Decay = 1 x 10^-3 to avoid overfitting
    
    Every step_size (25) epochs, Learning Rate = Learning Rate x gamma (0.5)
    This allows for finer tweaks as we progress
    """
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-3)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=25, gamma=0.5)

    train_losses =          []
    validation_losses =     []
    train_accuracies =      []
    validation_accuracies = []

    best_validation_accuracy = 0.0
    best_model_state =         None

    # Train
    for epoch in range(0, epoch_count):
        # Switch to training mode
        model.train()

        train_loss    = 0.0
        train_correct = 0
        train_total   = 0

        # Create batches
        train_bar = tqdm(train_loader, desc=f"[TRAINING] Epoch {epoch + 1}/{epoch_count}")

        for images, labels in train_bar:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()                       # Clears gradients 
            outputs = model(images)                     # Forwards propagation
            loss = criterion(outputs, labels)           # Calculate batch loss
            loss.backward()                             # Backwards propagation
            optimizer.step()                            # Update each neuron

            # Update training variables
            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()

            train_bar.set_postfix({
                "Loss": f"{loss.item():.4f}",
                "Accuracy": f"{100 * train_correct / train_total:.2f}"
            })

        # Validation
        # Switch to evulation mode
        model.eval()

        validation_loss    = 0.0
        validation_total   = 0
        validation_correct = 0

        val_preds_all = []
        val_labels_all = []

        # Do not save gradient history
        with torch.no_grad():
            for images, labels in validation_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)

                validation_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                validation_total += labels.size(0)
                validation_correct += (predicted == labels).sum().item()

                # Store all predictions and labels
                val_preds_all.extend(predicted.cpu().numpy())
                val_labels_all.extend(labels.cpu().numpy())

        # Debugging information
        print(f"  All validation predictions: {np.array(val_preds_all)}")
        print(f"  All validation labels: {np.array(val_labels_all)}")
        print(f"  Unique predictions: {np.unique(val_preds_all)}")
        print(f"  Actual validation accuracy: {100 * np.mean(np.array(val_preds_all) == np.array(val_labels_all)):.2f}%")

        # Calculate metrics
        train_accuracy = 100 * train_correct / train_total
        train_losses.append(train_loss / len(train_loader))
        train_accuracies.append(train_accuracy)

        validation_accuracy = 100 * validation_correct / validation_total
        validation_losses.append(validation_loss / len(validation_loader))
        validation_accuracies.append(validation_accuracy)

        # Save the best model only
        if validation_accuracy > best_validation_accuracy:
            best_validation_accuracy = validation_accuracy
            best_model_state = model.state_dict().copy()

        scheduler.step()

        print(f"[RESULT] Epoch {epoch+1}: Train Acc: {train_accuracy:.2f}%, Val Acc: {validation_accuracy:.2f}%")
        print()

    # Load the best model
    model.load_state_dict(best_model_state)

    return model, train_losses, validation_losses, train_accuracies, validation_accuracies

def evaluate_model(model, test_loader, device="cuda"):
    model.eval()
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)

            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    accuracy = 100 * sum(np.array(all_predictions) == np.array(all_labels)) / len(all_labels)

    print(f"Test Accuracy: {accuracy:.2f}%")
    print("\nClassification Report:")
    print(classification_report(all_labels, all_predictions, target_names=["No Silver", "Silver"]))

    # Confusion Matrix
    cm = confusion_matrix(all_labels, all_predictions)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                xticklabels=["No Silver", "Silver"],
                yticklabels=["No Silver", "Silver"])
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()

    return accuracy

In [None]:
# Check which device is available
# Train with cuda on Google Colab

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
%%bash
# Setup image folders for split
mkdir -p /content/images/silver_line
mkdir -p /content/images/no_silver_line

unzip -q -j silver_line.zip    -d /content/images/silver_line
unzip -q -j no_silver_line.zip -d /content/images/no_silver_line

In [None]:
import pathlib, random, shutil

random.seed(0)

# source and destination
source_directories = {
    'silver_line':    '/content/images/silver_line',
    'no_silver_line': '/content/images/no_silver_line'
}
destination_directory = pathlib.Path('/content/silver_cls')

# make folder tree
for split in ('train', 'val', 'test'):
    for cls in source_directories:
        (destination_directory / split / cls).mkdir(parents=True, exist_ok=True)

def getImageFiles(sourceDir: str) -> list:
    """Get all image files from directory"""
    files = list(pathlib.Path(sourceDir).glob('*'))
    return [f for f in files if f.suffix.lower() in ['.png', '.jpg', '.jpeg']]

def balancedSplit(source_directories: dict, train_ratio=0.8, val_ratio=0.1):
    """Create balanced splits across all classes"""

    # Get all image files for each class
    class_files = {}
    for cls, dirPath in source_directories.items():
        class_files[cls] = getImageFiles(dirPath)
        print(f"{cls}: {len(class_files[cls])} image files")

    # Find the minimum class size to balance against
    min_class_size = min(len(files) for files in class_files.values())
    print(f"\nBalancing to minimum class size: {min_class_size}")

    # Calculate split sizes
    train_size = int(train_ratio * min_class_size)
    val_size = int(val_ratio * min_class_size)
    test_size = min_class_size - train_size - val_size

    print(f"Per-class split: {train_size} train, {val_size} val, {test_size} test")

    # Split each class
    for cls, files in class_files.items():
        random.shuffle(files)

        # Take only the balanced amount
        balanced_files = files[:min_class_size]

        # Split into train/val/test
        train_files = balanced_files[:train_size]
        val_files = balanced_files[train_size:train_size + val_size]
        test_files = balanced_files[train_size + val_size:]

        # Move files
        for split, split_files in [('train', train_files), ('val', val_files), ('test', test_files)]:
            for f in split_files:
                shutil.move(str(f), destination_directory / split / cls / f.name)

        print(f"{cls}: moved {len(train_files)} train, {len(val_files)} val, {len(test_files)} test")

# Check source data first
print("Source data distribution:")
for cls, dirPath in source_directories.items():
    image_files = getImageFiles(dirPath)
    print(f"{cls}: {len(image_files)} image files")

print("\nCreating balanced splits...")
balancedSplit(source_directories)

# Verify the split
print("\nFinal split verification:")
for split in ('train', 'val', 'test'):
    for cls in source_directories:
        split_path = destination_directory / split / cls
        if split_path.exists():
            count = len(list(split_path.glob('*')))
            print(f"{split}/{cls}: {count} files")
        else:
            print(f"{split}/{cls}: directory doesn't exist")

In [None]:
"""
Use an input size of 64 by 64, reducing complexity and decreasing inference time
Add horizontal flips
Add vertical flips
Add random rotations between -15 < theta < 15
Add colour jitter
Converts from PIL image to PyTorch tensor
Centers and scales channels so gradients
"""

# Data transforms for aggressive augmentation to compensate for small dataset
train_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create datasets
train_dataset      = SilverLineDataset("/content/silver_cls/train", transform=train_transform)
validation_dataset = SilverLineDataset("/content/silver_cls/val",   transform=val_transform)
test_dataset       = SilverLineDataset("/content/silver_cls/test",  transform=val_transform)

# Calculate sample weights
train_loader =      DataLoader(train_dataset,      batch_size=16, shuffle=True,  num_workers=2)
validation_loader = DataLoader(validation_dataset, batch_size=16, shuffle=False, num_workers=2)
test_loader =       DataLoader(test_dataset,       batch_size=16, shuffle=False, num_workers=2)

In [None]:
# Double check validation distribution
print("Validation dataset class distribution:")
val_labels = []
for batch in validation_loader:
    _, labels = batch
    val_labels.extend(labels.numpy())
print(f"Class 0: {val_labels.count(0)}, Class 1: {val_labels.count(1)}")

In [None]:
# Initialize model
model = CustomSilverDetector(input_size=64, class_count=2).to(device)

# Print model info
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")

# Train model
model, train_losses, val_losses, train_accs, val_accs = train_model(
    model, train_loader, validation_loader, epoch_count=80, device=device
)

In [None]:
# Plot training curves
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label="Train Loss")
plt.plot(val_losses, label="Val Loss")
plt.title("Training and Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_accs, label="Train Accuracy")
plt.plot(val_accs, label="Val Accuracy")
plt.title("Training and Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy (%)")
plt.legend()

plt.tight_layout()
plt.show()

# Evaluate on test set
test_accuracy = evaluate_model(model, test_loader, device)

# Save model for deployment
torch.save(model.state_dict(), "/content/custom_silver_detector.pth")

In [55]:
def convert_to_pi4_optimized(model_path: str, output_path: str = "silver_detector_pi4.pt"):
    device = torch.device("cpu")

    # Load the trained model
    model = CustomSilverDetector(input_size=64, class_count=2)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    # Remove dropout layers for inference
    model.classifier[2] = nn.Identity()  # Remove first dropout
    model.classifier[5] = nn.Identity()  # Remove second dropout

    # Create example input for tracing
    example_input = torch.randn(1, 3, 64, 64)

    # TorchScript tracing
    traced_model = torch.jit.trace(model, example_input)
    traced_model = optimize_for_mobile(traced_model)

    # Save the optimized model
    traced_model.save(output_path)
    print(f"✓ Optimized model saved to {output_path}")

    # Dynamic quantization
    quantized_output = output_path.replace('.pt', '_quantized.pt')

    quantized_model = torch.quantization.quantize_dynamic(
        model,
        {nn.Linear, nn.Conv2d},
        dtype=torch.qint8
    )

    # Trace and optimize quantized model
    traced_quantized = torch.jit.trace(quantized_model, example_input)
    traced_quantized = optimize_for_mobile(traced_quantized)
    traced_quantized.save(quantized_output)

    print(f"✓ Quantized model saved to {quantized_output}")

    return output_path, quantized_output

In [None]:
# Convert from pth to pt, optimized for mobile, quantized to int8
model_path = "/content/custom_silver_detector.pth"

normal_model, quantized_model = convert_to_pi4_optimized(model_path)