In [None]:
# Cell 1: Import libraries
import os
import numpy as np
from PIL import Image
import torch
import torchvision.transforms as T
import random
import re
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score

In [None]:
# Cell 2: Settings
INPUT_FOLDER = "path/to/your/images"  # CHANGE THIS
OUTPUT_FOLDER = "output_balanced"     # CHANGE THIS
IMAGE_SIZE = 224
TARGET_COUNT = 30  # Target images per class
SEED = 42

random.seed(SEED)
np.random.seed(SEED)

In [None]:
# Cell: Clear output folders
import shutil

# Define folders to clear
folders_to_clear = [
    f"{OUTPUT_FOLDER}/augmented",
    f"{OUTPUT_FOLDER}/resized", 
    f"{OUTPUT_FOLDER}/final"
]

# Clear each folder
print("Clearing output folders...")
for folder in folders_to_clear:
    if os.path.exists(folder):
        # Remove all contents
        shutil.rmtree(folder)
        # Recreate empty folder
        os.makedirs(folder)
        print(f"  ✓ Cleared: {folder}")
    else:
        # Create if doesn't exist
        os.makedirs(folder)
        print(f"  ✓ Created: {folder}")

print("\nAll output folders cleared and ready!")

In [None]:
# Cell 3: Create folders
os.makedirs(f"{OUTPUT_FOLDER}/augmented", exist_ok=True)
os.makedirs(f"{OUTPUT_FOLDER}/resized", exist_ok=True)
os.makedirs(f"{OUTPUT_FOLDER}/final", exist_ok=True)

In [None]:
# Cell 4: Basic functions
def get_class_label(filename):
    """Get class from filename (e.g., '0p', '5p', '100p')"""
    match = re.search(r'(\d+)[pP]', filename)
    if match:
        return f"{match.group(1)}p"
    return "unknown"

def get_numeric_label(filename):
    """Extract numeric value from filename for regression"""
    match = re.search(r'(\d+)[pP]', filename)
    if match:
        return float(match.group(1))
    return 0.0

def make_square(image, size):
    """Resize image and pad to square while keeping RGB"""
    # Convert to RGB if not already
    if image.mode != 'RGB':
        image = image.convert('RGB')
    
    # Calculate new size
    w, h = image.size
    scale = size / max(w, h)
    new_w = int(w * scale)
    new_h = int(h * scale)
    
    # Resize
    resized = image.resize((new_w, new_h), Image.BILINEAR)
    
    # Create square with white background
    square = Image.new('RGB', (size, size), (255, 255, 255))
    
    # Paste in center
    x = (size - new_w) // 2
    y = (size - new_h) // 2
    square.paste(resized, (x, y))
    
    return square

In [None]:
# Cell 5: Define augmentations
def aug1(img): return img.rotate(90, expand=True)     # 90° rotation
def aug2(img): return img.rotate(180, expand=True)    # 180° rotation  
def aug3(img): return img.rotate(270, expand=True)    # 270° rotation
def aug4(img): return img.transpose(Image.FLIP_LEFT_RIGHT)
def aug5(img): return img.transpose(Image.FLIP_TOP_BOTTOM)

augmentation_list = [aug1, aug2, aug3, aug4, aug5]

In [None]:
# Cell 6: Group images by class
image_files = [f for f in os.listdir(INPUT_FOLDER) 
               if f.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp'))]

# Group by class
classes = {}
for filename in image_files:
    label = get_class_label(filename)
    if label not in classes:
        classes[label] = []
    classes[label].append(filename)

# Show counts
print("Images per class:")
for label, images in sorted(classes.items()):
    print(f"  {label}: {len(images)} images")

In [None]:
# Cell 7: Copy originals and create augmentations
print("\nProcessing images...")

for class_label, filenames in sorted(classes.items()):
    print(f"\nClass {class_label}:")
    
    # Copy all originals first
    for filename in filenames:
        img = Image.open(os.path.join(INPUT_FOLDER, filename))
        # Keep as RGB (no binary conversion)
        if img.mode != 'RGB':
            img = img.convert('RGB')
        
        base = os.path.splitext(filename)[0]
        img.save(f"{OUTPUT_FOLDER}/augmented/{base}_original.jpg")
    
    # Create augmentations if needed
    current_count = len(filenames)
    if current_count < TARGET_COUNT:
        needed = TARGET_COUNT - current_count
        print(f"  Creating {needed} augmentations...")
        
        for i in range(needed):
            # Pick random original
            source_file = random.choice(filenames)
            img = Image.open(os.path.join(INPUT_FOLDER, source_file))
            # Keep as RGB (no binary conversion)
            if img.mode != 'RGB':
                img = img.convert('RGB')
            
            # Apply 3-4 random augmentations
            num_augs = random.randint(3, 4)
            selected_augs = random.sample(augmentation_list, num_augs)
            
            augmented = img
            for aug_func in selected_augs:
                augmented = aug_func(augmented)
            
            # Save
            base = os.path.splitext(source_file)[0]
            augmented.save(f"{OUTPUT_FOLDER}/augmented/{base}_aug{i}.jpg")

In [None]:
# Cell 8: Resize all images
print("\nResizing all images...")
augmented_files = [f for f in os.listdir(f"{OUTPUT_FOLDER}/augmented") 
                   if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

for i, filename in enumerate(augmented_files):
    if i % 50 == 0:
        print(f"  {i}/{len(augmented_files)}")
    
    img = Image.open(f"{OUTPUT_FOLDER}/augmented/{filename}")
    resized = make_square(img, IMAGE_SIZE)
    
    # Save as JPG to maintain RGB
    base = os.path.splitext(filename)[0]
    resized.save(f"{OUTPUT_FOLDER}/resized/{base}.jpg")

In [None]:
# Cell 9: Normalize for ResNet (ImageNet normalization)
print("\nNormalizing images for ResNet...")
resized_files = [f for f in os.listdir(f"{OUTPUT_FOLDER}/resized") 
                 if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

# ImageNet normalization values for ResNet
normalize = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

for i, filename in enumerate(resized_files):
    if i % 50 == 0:
        print(f"  {i}/{len(resized_files)}")
    
    # Load RGB image
    img = Image.open(f"{OUTPUT_FOLDER}/resized/{filename}")
    
    # Apply ResNet normalization
    tensor = normalize(img)
    
    # Save tensor
    base = os.path.splitext(filename)[0]
    np.save(f"{OUTPUT_FOLDER}/final/{base}.npy", tensor.numpy())
    
    # Save image for viewing
    img.save(f"{OUTPUT_FOLDER}/final/{base}.jpg")

In [None]:
# Cell 10: Check final counts
print("\nFinal image count per class:")
final_counts = {}
for filename in os.listdir(f"{OUTPUT_FOLDER}/final"):
    if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
        label = get_class_label(filename)
        final_counts[label] = final_counts.get(label, 0) + 1

for label, count in sorted(final_counts.items()):
    print(f"  {label}: {count} images")

In [None]:
# Cell 11: ResNet Regression Model
import torch.nn as nn
import torchvision.models as models
from sklearn.metrics import mean_absolute_error, r2_score

class ResNetRegression(nn.Module):
    def __init__(self, pretrained=True):
        super(ResNetRegression, self).__init__()
        # Load pretrained ResNet18
        self.resnet = models.resnet18(pretrained=pretrained)
        
        # Replace the final classification layer with regression layer
        # ResNet18 has 512 features before the final layer
        self.resnet.fc = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.resnet.fc.in_features, 1)  # Single output for regression
        )
    
    def forward(self, x):
        return self.resnet(x)

# Create model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ResNetRegression(pretrained=True).to(device)

print(f"Model created and moved to {device}")
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

# Loss function and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

print("ResNet regression model ready for training!")

In [None]:
# Cell 12: Data Loading and Dataset
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

class CoinDataset(Dataset):
    def __init__(self, data_folder, transform=None):
        self.data_folder = data_folder
        self.transform = transform
        
        # Get all .npy files (normalized tensors)
        self.files = [f for f in os.listdir(data_folder) if f.endswith('.npy')]
        
        # Extract labels (numeric values for regression)
        self.labels = []
        for filename in self.files:
            label = get_numeric_label(filename)
            self.labels.append(label)
    
    def __len__(self):
        return len(self.files)
    
    def __getitem__(self, idx):
        # Load normalized tensor
        tensor_path = os.path.join(self.data_folder, self.files[idx])
        tensor = torch.from_numpy(np.load(tensor_path)).float()
        
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        
        return tensor, label

# Create dataset
final_folder = f"{OUTPUT_FOLDER}/final"
full_dataset = CoinDataset(final_folder)

print(f"Total samples: {len(full_dataset)}")
print(f"Sample tensor shape: {full_dataset[0][0].shape}")
print(f"Sample label: {full_dataset[0][1]}")

# Split into train/validation (80/20 split)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size

# Use a subset for training (10% of total data for quick training)
subset_size = max(100, len(full_dataset) // 10)  # At least 100 samples
subset_train_size = int(0.8 * subset_size)
subset_val_size = subset_size - subset_train_size

print(f"Using subset of {subset_size} samples for training")
print(f"Training samples: {subset_train_size}")
print(f"Validation samples: {subset_val_size}")

# Create subset indices
indices = list(range(len(full_dataset)))
np.random.shuffle(indices)
subset_indices = indices[:subset_size]

train_indices = subset_indices[:subset_train_size]
val_indices = subset_indices[subset_train_size:]

# Create samplers
from torch.utils.data import SubsetRandomSampler
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)

# Create data loaders
batch_size = 16
train_loader = DataLoader(full_dataset, batch_size=batch_size, sampler=train_sampler)
val_loader = DataLoader(full_dataset, batch_size=batch_size, sampler=val_sampler)

print(f"Data loaders created with batch size: {batch_size}")

In [None]:
# Cell 13: Training Loop with MSE, MAE, and R² tracking
import time
from collections import defaultdict

def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    all_preds = []
    all_targets = []
    
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data).squeeze()
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        all_preds.extend(output.detach().cpu().numpy())
        all_targets.extend(target.detach().cpu().numpy())
    
    # Calculate metrics
    avg_loss = running_loss / len(train_loader)
    mae = mean_absolute_error(all_targets, all_preds)
    r2 = r2_score(all_targets, all_preds)
    
    return avg_loss, mae, r2

def validate_epoch(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            output = model(data).squeeze()
            loss = criterion(output, target)
            
            running_loss += loss.item()
            all_preds.extend(output.cpu().numpy())
            all_targets.extend(target.cpu().numpy())
    
    # Calculate metrics
    avg_loss = running_loss / len(val_loader)
    mae = mean_absolute_error(all_targets, all_preds)
    r2 = r2_score(all_targets, all_preds)
    
    return avg_loss, mae, r2

# Training configuration
num_epochs = 10
best_val_loss = float('inf')

# Storage for metrics
history = defaultdict(list)

print("Starting training...")
print("=" * 60)

start_time = time.time()

for epoch in range(num_epochs):
    epoch_start = time.time()
    
    # Train
    train_loss, train_mae, train_r2 = train_epoch(model, train_loader, criterion, optimizer, device)
    
    # Validate
    val_loss, val_mae, val_r2 = validate_epoch(model, val_loader, criterion, device)
    
    # Store metrics
    history['train_loss'].append(train_loss)
    history['train_mae'].append(train_mae)
    history['train_r2'].append(train_r2)
    history['val_loss'].append(val_loss)
    history['val_mae'].append(val_mae)
    history['val_r2'].append(val_r2)
    
    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), f'{OUTPUT_FOLDER}/best_model.pth')
    
    epoch_time = time.time() - epoch_start
    
    # Print progress
    print(f"Epoch {epoch+1:2d}/{num_epochs}")
    print(f"  Train - Loss: {train_loss:.4f}, MAE: {train_mae:.2f}, R²: {train_r2:.3f}")
    print(f"  Val   - Loss: {val_loss:.4f}, MAE: {val_mae:.2f}, R²: {val_r2:.3f}")
    print(f"  Time: {epoch_time:.1f}s")
    print("-" * 50)

total_time = time.time() - start_time
print(f"\nTraining completed in {total_time:.1f}s")
print(f"Best validation loss: {best_val_loss:.4f}")

# Final results
print("\n" + "=" * 60)
print("FINAL RESULTS")
print("=" * 60)
print(f"Final Train - Loss: {history['train_loss'][-1]:.4f}, MAE: {history['train_mae'][-1]:.2f}, R²: {history['train_r2'][-1]:.3f}")
print(f"Final Val   - Loss: {history['val_loss'][-1]:.4f}, MAE: {history['val_mae'][-1]:.2f}, R²: {history['val_r2'][-1]:.3f}")
print(f"Best Val Loss: {best_val_loss:.4f}")
print(f"Model saved to: {OUTPUT_FOLDER}/best_model.pth")

In [None]:
# Cell 14: Plot Training Results
import matplotlib.pyplot as plt

def plot_training_history(history):
    """Plot training metrics"""
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # Loss plot
    axes[0].plot(history['train_loss'], label='Train Loss', marker='o')
    axes[0].plot(history['val_loss'], label='Val Loss', marker='s')
    axes[0].set_title('MSE Loss')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].legend()
    axes[0].grid(True)
    
    # MAE plot
    axes[1].plot(history['train_mae'], label='Train MAE', marker='o')
    axes[1].plot(history['val_mae'], label='Val MAE', marker='s')
    axes[1].set_title('Mean Absolute Error')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('MAE')
    axes[1].legend()
    axes[1].grid(True)
    
    # R² plot
    axes[2].plot(history['train_r2'], label='Train R²', marker='o')
    axes[2].plot(history['val_r2'], label='Val R²', marker='s')
    axes[2].set_title('R² Score')
    axes[2].set_xlabel('Epoch')
    axes[2].set_ylabel('R²')
    axes[2].legend()
    axes[2].grid(True)
    
    plt.tight_layout()
    plt.savefig(f'{OUTPUT_FOLDER}/training_history.png', dpi=300, bbox_inches='tight')
    plt.show()

# Plot results if history exists
try:
    plot_training_history(history)
    print(f"Training plots saved to {OUTPUT_FOLDER}/training_history.png")
except NameError:
    print("Run the training cell first to generate history data")

# Quick model test
def test_model_prediction():
    """Test the model on a few samples"""
    model.eval()
    with torch.no_grad():
        # Get a batch from validation
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            predictions = model(data).squeeze()
            
            print("\nSample Predictions vs Actual:")
            print("-" * 40)
            for i in range(min(5, len(predictions))):
                pred = predictions[i].cpu().item()
                actual = target[i].cpu().item()
                error = abs(pred - actual)
                print(f"Predicted: {pred:6.2f} | Actual: {actual:6.2f} | Error: {error:5.2f}")
            break

try:
    test_model_prediction()
except:
    print("Train the model first to see predictions")