# Coin Grade Classifier - Dual-Image ResNet Fine-Tuning

This notebook fine-tunes a **pretrained ResNet-50** model to classify coin grades using **both obverse and reverse** images at full **1000x1000 resolution**.

## Key Features:
- ü™ô **Dual-Image Input**: Uses both sides of each coin
- üîç **Full Resolution**: 1000x1000 pixels (preserves fine details)
- üéØ **Transfer Learning**: ResNet-50 pretrained on ImageNet
- üîÑ **Feature Fusion**: Combines information from both sides
- üßä **Progressive Unfreezing**: Frozen ‚Üí Fine-tuned backbone

## Architecture:
```
Obverse (1000x1000)     Reverse (1000x1000)
        ‚Üì                       ‚Üì
   ResNet-50               ResNet-50
  (Pretrained)            (Pretrained)
    (2048 dim)              (2048 dim)
        ‚Üì                       ‚Üì
        ‚îî‚îÄ‚îÄ‚îÄ‚îÄ Concatenate ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                 ‚Üì
           Fusion Layer
             (2048)
                 ‚Üì
       Classification Head
                 ‚Üì
            Coin Grade
```

## Training Strategy:
1. **Phase 1** (Epochs 1-15): Freeze ResNet backbone, train only classifier
2. **Phase 2** (Epochs 16-50): Unfreeze all layers, fine-tune end-to-end

## 1. Setup and Imports

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torchvision.transforms as transforms
from torchvision.models import resnet50, ResNet50_Weights

import os
import json
from pathlib import Path
from PIL import Image
import numpy as np
from datetime import datetime
from tqdm import tqdm
import matplotlib.pyplot as plt

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    print("MPS (Apple Silicon) available: True")

PyTorch version: 2.6.0
CUDA available: False
MPS (Apple Silicon) available: True


## 2. Configuration

Key settings:
- **IMAGE_SIZE**: 1000 (full resolution)
- **FREEZE_BACKBONE**: True (freeze ResNet initially)
- **UNFREEZE_EPOCH**: 15 (when to start fine-tuning backbone)
- **DATA_DIR**: Choose 'Proof' or 'Circulation' subfolder

In [11]:
# Paths
DATA_DIR = 'davidlawrence_dataset/Circulation'  # or 'davidlawrence_dataset/Circulation'
OUTPUT_DIR = 'models'
LOG_DIR = 'runs/resnet_dual_' + datetime.now().strftime('%Y%m%d_%H%M%S')

# Model hyperparameters
IMAGE_SIZE = 256  # Full resolution
BATCH_SIZE = 4     # Small batch due to large images
NUM_EPOCHS = 50
LEARNING_RATE = 1e-4
FREEZE_BACKBONE = True  # Freeze early ResNet layers initially
UNFREEZE_EPOCH = 15     # Unfreeze all layers after this epoch

# Device selection
if torch.cuda.is_available():
    DEVICE = torch.device('cuda')
    print("Using CUDA (NVIDIA GPU)")
elif torch.backends.mps.is_available():
    DEVICE = torch.device('mps')
    print("Using MPS (Apple Silicon GPU)")
else:
    DEVICE = torch.device('cpu')
    print("Using CPU")

NUM_WORKERS = 0
PIN_MEMORY = True if torch.cuda.is_available() else False

# Create directories
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)

print(f"\nDevice: {DEVICE}")
print(f"Image size: {IMAGE_SIZE}x{IMAGE_SIZE}")
print(f"Batch size: {BATCH_SIZE}")
print(f"Epochs: {NUM_EPOCHS}")
print(f"Data directory: {DATA_DIR}")

Using MPS (Apple Silicon GPU)

Device: mps
Image size: 256x256
Batch size: 4
Epochs: 50
Data directory: davidlawrence_dataset/Circulation


## 3. Dataset Class - Dual Image Loader

Loads both obverse and reverse images for each coin.

In [12]:
class DualCoinDataset(Dataset):
    """Dataset that loads both obverse and reverse images for each coin."""
    
    def __init__(self, data_dir, split='train', transform=None):
        self.data_dir = Path(data_dir)
        self.transform = transform
        self.samples = []
        self.class_to_idx = {}
        self.idx_to_class = {}
        
        # Scan for grade folders
        grade_folders = sorted([d for d in self.data_dir.iterdir() if d.is_dir()])
        
        for idx, grade_folder in enumerate(grade_folders):
            grade_name = grade_folder.name
            self.class_to_idx[grade_name] = idx
            self.idx_to_class[idx] = grade_name
            
            obverse_dir = grade_folder / 'obverse'
            reverse_dir = grade_folder / 'reverse'
            
            if not obverse_dir.exists() or not reverse_dir.exists():
                print(f"Warning: Missing obverse or reverse folder for {grade_name}")
                continue
            
            obverse_images = sorted([f for f in obverse_dir.glob('*.jpg') if f.is_file()])
            
            for obverse_img in obverse_images:
                reverse_img = reverse_dir / obverse_img.name
                
                if reverse_img.exists():
                    self.samples.append({
                        'obverse': obverse_img,
                        'reverse': reverse_img,
                        'label': idx,
                        'grade': grade_name
                    })
        
        # Split data (70% train, 20% test, 10% val)
        np.random.seed(42)
        indices = np.random.permutation(len(self.samples))
        
        n_train = int(0.7 * len(self.samples))
        n_test = int(0.2 * len(self.samples))
        
        if split == 'train':
            indices = indices[:n_train]
        elif split == 'test':
            indices = indices[n_train:n_train + n_test]
        else:  # val
            indices = indices[n_train + n_test:]
        
        self.samples = [self.samples[i] for i in indices]
        
        print(f"{split.upper()} Dataset:")
        print(f"  Total samples: {len(self.samples)}")
        print(f"  Classes: {len(self.class_to_idx)}")
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample = self.samples[idx]
        obverse = Image.open(sample['obverse']).convert('RGB')
        reverse = Image.open(sample['reverse']).convert('RGB')
        
        if self.transform:
            obverse = self.transform(obverse)
            reverse = self.transform(reverse)
        
        return obverse, reverse, sample['label']

## 4. Data Transforms

ImageNet normalization for pretrained ResNet.

In [13]:
# Training transforms
train_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Validation/test transforms
val_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

print("‚úì Transforms configured")

‚úì Transforms configured


## 5. Create Datasets and DataLoaders

In [14]:
# Create datasets
train_dataset = DualCoinDataset(DATA_DIR, split='train', transform=train_transform)
test_dataset = DualCoinDataset(DATA_DIR, split='test', transform=val_transform)
val_dataset = DualCoinDataset(DATA_DIR, split='val', transform=val_transform)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, 
                         num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False,
                        num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False,
                       num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY)

print(f"\nDataloaders created:")
print(f"  Train batches: {len(train_loader)}")
print(f"  Test batches: {len(test_loader)}")
print(f"  Val batches: {len(val_loader)}")

TRAIN Dataset:
  Total samples: 2585
  Classes: 24
TEST Dataset:
  Total samples: 738
  Classes: 24
VAL Dataset:
  Total samples: 371
  Classes: 24

Dataloaders created:
  Train batches: 647
  Test batches: 185
  Val batches: 93


## 6. Dual ResNet Model

Two ResNet-50 encoders with feature fusion.

In [15]:
class DualResNetClassifier(nn.Module):
    """ResNet-50 for dual-image classification."""
    
    def __init__(self, num_classes, freeze_backbone=True):
        super(DualResNetClassifier, self).__init__()
        
        # Load pretrained ResNet-50
        weights = ResNet50_Weights.IMAGENET1K_V2
        obverse_resnet = resnet50(weights=weights)
        reverse_resnet = resnet50(weights=weights)
        
        # Remove final FC layer (keep 2048-dim features)
        self.obverse_encoder = nn.Sequential(*list(obverse_resnet.children())[:-1])
        self.reverse_encoder = nn.Sequential(*list(reverse_resnet.children())[:-1])
        
        # Freeze backbone if requested
        if freeze_backbone:
            for param in self.obverse_encoder.parameters():
                param.requires_grad = False
            for param in self.reverse_encoder.parameters():
                param.requires_grad = False
            print("‚úì Backbone layers frozen")
        
        self.feature_dim = 2048
        
        # Fusion layer
        self.fusion = nn.Sequential(
            nn.Linear(self.feature_dim * 2, 2048),
            nn.BatchNorm1d(2048),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5)
        )
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(2048, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )
    
    def unfreeze_backbone(self):
        """Unfreeze all backbone layers."""
        for param in self.obverse_encoder.parameters():
            param.requires_grad = True
        for param in self.reverse_encoder.parameters():
            param.requires_grad = True
        print("‚úì Backbone layers unfrozen")
    
    def forward(self, obverse, reverse):
        obverse_feat = self.obverse_encoder(obverse).view(obverse.size(0), -1)
        reverse_feat = self.reverse_encoder(reverse).view(reverse.size(0), -1)
        combined = torch.cat([obverse_feat, reverse_feat], dim=1)
        fused = self.fusion(combined)
        output = self.classifier(fused)
        return output


# Create model
num_classes = len(train_dataset.class_to_idx)
model = DualResNetClassifier(num_classes=num_classes, freeze_backbone=FREEZE_BACKBONE)
model = model.to(DEVICE)

print(f"\nModel created:")
print(f"  Number of classes: {num_classes}")
print(f"  Total params: {sum(p.numel() for p in model.parameters()):,}")
print(f"  Trainable params: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

‚úì Backbone layers frozen

Model created:
  Number of classes: 24
  Total params: 58,049,176
  Trainable params: 11,033,112


## 7. Training Setup

In [16]:
# Loss function
criterion = nn.CrossEntropyLoss()

# Optimizer
optimizer = optim.AdamW(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=LEARNING_RATE,
    weight_decay=0.01
)

# Learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='max', factor=0.5, patience=5, verbose=True
)

# TensorBoard writer
writer = SummaryWriter(LOG_DIR)

print("Training setup complete")
print(f"  Optimizer: AdamW")
print(f"  Learning rate: {LEARNING_RATE}")
print(f"  TensorBoard logs: {LOG_DIR}")

Training setup complete
  Optimizer: AdamW
  Learning rate: 0.0001
  TensorBoard logs: runs/resnet_dual_20251130_233517




## 8. Training and Validation Functions

In [17]:
def train_epoch(model, loader, criterion, optimizer, device, epoch):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    pbar = tqdm(loader, desc=f'Epoch {epoch+1}/{NUM_EPOCHS} [Train]')
    
    for obverse, reverse, labels in pbar:
        obverse, reverse, labels = obverse.to(device), reverse.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(obverse, reverse)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * obverse.size(0)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        pbar.set_postfix({'loss': f'{loss.item():.4f}', 'acc': f'{100*correct/total:.2f}%'})
    
    return running_loss / total, 100 * correct / total


def validate(model, loader, criterion, device, split='Val'):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        pbar = tqdm(loader, desc=f'{split}')
        for obverse, reverse, labels in pbar:
            obverse, reverse, labels = obverse.to(device), reverse.to(device), labels.to(device)
            outputs = model(obverse, reverse)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * obverse.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            pbar.set_postfix({'loss': f'{loss.item():.4f}', 'acc': f'{100*correct/total:.2f}%'})
    
    return running_loss / total, 100 * correct / total

print("‚úì Training functions defined")

‚úì Training functions defined


## 9. Main Training Loop

**Run this cell to start training!**

In [18]:
# Training history
history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
best_val_acc = 0.0
best_model_path = os.path.join(OUTPUT_DIR, 'coin_resnet_dual_best.pth')

print("="*60)
print("STARTING TRAINING")
print("="*60)

for epoch in range(NUM_EPOCHS):
    print(f"\nEpoch {epoch+1}/{NUM_EPOCHS}")
    print("-" * 60)
    
    # Unfreeze backbone after specified epoch
    if FREEZE_BACKBONE and epoch == UNFREEZE_EPOCH:
        print(f"\nüîì Unfreezing backbone at epoch {epoch+1}")
        model.unfreeze_backbone()
        optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE/10, weight_decay=0.01)
    
    # Train and validate
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE, epoch)
    val_loss, val_acc = validate(model, val_loader, criterion, DEVICE)
    
    # Update LR
    scheduler.step(val_acc)
    current_lr = optimizer.param_groups[0]['lr']
    
    # Save history
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    
    # Log to TensorBoard
    writer.add_scalar('Loss/train', train_loss, epoch)
    writer.add_scalar('Loss/val', val_loss, epoch)
    writer.add_scalar('Accuracy/train', train_acc, epoch)
    writer.add_scalar('Accuracy/val', val_acc, epoch)
    
    print(f"\nEpoch {epoch+1} Summary:")
    print(f"  Train: Loss={train_loss:.4f}, Acc={train_acc:.2f}%")
    print(f"  Val:   Loss={val_loss:.4f}, Acc={val_acc:.2f}%")
    print(f"  LR: {current_lr:.6f}")
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_acc': val_acc,
            'class_to_idx': train_dataset.class_to_idx
        }, best_model_path)
        print(f"  ‚úì New best model saved! (Val Acc: {val_acc:.2f}%)")

print(f"\nBest validation accuracy: {best_val_acc:.2f}%")
writer.close()

STARTING TRAINING

Epoch 1/50
------------------------------------------------------------


Epoch 1/50 [Train]: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñâ| 646/647 [02:18<00:00,  4.65it/s, loss=2.2849, acc=23.72%]


ValueError: Expected more than 1 value per channel when training, got input size torch.Size([1, 2048])

## 10. Plot Training History

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

ax1.plot(history['train_loss'], label='Train Loss', marker='o')
ax1.plot(history['val_loss'], label='Val Loss', marker='s')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training and Validation Loss')
if FREEZE_BACKBONE:
    ax1.axvline(x=UNFREEZE_EPOCH, color='red', linestyle='--', alpha=0.5, label='Unfreeze')
ax1.legend()
ax1.grid(True)

ax2.plot(history['train_acc'], label='Train Acc', marker='o')
ax2.plot(history['val_acc'], label='Val Acc', marker='s')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy (%)')
ax2.set_title('Training and Validation Accuracy')
if FREEZE_BACKBONE:
    ax2.axvline(x=UNFREEZE_EPOCH, color='red', linestyle='--', alpha=0.5, label='Unfreeze')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, 'training_history_resnet.png'), dpi=300)
plt.show()

## 11. Test Set Evaluation

In [None]:
# Load best model
checkpoint = torch.load(best_model_path)
model.load_state_dict(checkpoint['model_state_dict'])
print(f"Loaded best model from epoch {checkpoint['epoch']+1}")

# Evaluate
test_loss, test_acc = validate(model, test_loader, criterion, DEVICE, split='Test')

print(f"\n{'='*60}")
print("FINAL TEST RESULTS")
print("="*60)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.2f}%")
print("="*60)

## 12. View TensorBoard

```bash
tensorboard --logdir=runs
```

Then open http://localhost:6006

## 13. Save Results

In [None]:
# Save history and config
with open(os.path.join(OUTPUT_DIR, 'history_resnet.json'), 'w') as f:
    json.dump(history, f, indent=2)

config = {
    'architecture': 'DualResNetClassifier (ResNet-50)',
    'image_size': IMAGE_SIZE,
    'best_val_acc': best_val_acc,
    'test_acc': test_acc
}

with open(os.path.join(OUTPUT_DIR, 'config_resnet.json'), 'w') as f:
    json.dump(config, f, indent=2)

print("‚úì All results saved!")
print(f"  Best Val Acc: {best_val_acc:.2f}%")
print(f"  Test Acc: {test_acc:.2f}%")