In [1]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from PIL import Image
import time
import copy

In [2]:
# ==========================================
# 1. CONFIGURATION
# ==========================================
# UPDATE THIS PATH based on your Kaggle Data tab
DATA_PATH = "/kaggle/input/amlfif/Dataset" 

BATCH_SIZE = 64
LEARNING_RATE = 0.001
EPOCHS = 100
NUM_CLASSES = 200
IMAGE_SIZE = 448
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Using Device: {DEVICE}")

Using Device: cuda


In [3]:
class BirdDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        
        # Adjust label to be 0-199 (PyTorch starts at 0)
        if 'label' in self.data.columns:
            self.data['label'] = self.data['label'] - 1

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Get the row by position
        row = self.data.iloc[idx]
        
        # --- SMART DETECTION START ---
        # We use .iloc[] to avoid the FutureWarning
        val0 = row.iloc[0]
        val1 = row.iloc[1]
        
        # Check which one is the image path (string ending in .jpg)
        if isinstance(val0, str) and (val0.endswith('.jpg') or '/' in val0):
            img_path = val0
            label_or_id = val1
        else:
            img_path = val1
            label_or_id = val0
        # --- SMART DETECTION END ---
        
        # Handle leading slashes
        if str(img_path).startswith("/"):
            img_path = img_path[1:]
            
        full_path = os.path.join(self.root_dir, img_path)
        
        try:
            image = Image.open(full_path).convert("RGB")
        except FileNotFoundError:
            image = Image.new('RGB', (224, 224))
            
        if self.transform:
            image = self.transform(image)
            
        # Return proper pair
        if 'label' in self.data.columns:
            # Training: Return (Image, Label)
            return image, torch.tensor(label_or_id, dtype=torch.long)
        else:
            # Testing: Return (Image, ID)
            return image, label_or_id

In [4]:
# ==========================================
# 3. TRANSFORMS (Data Augmentation)
# ==========================================
# Innovation: Adding augmentation helps the model generalize better
train_transforms = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),  # Flip left/right
    transforms.RandomRotation(degrees=15),   # Rotate slightly
    transforms.ColorJitter(brightness=0.1, contrast=0.1), # vary lighting
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

#### Simple CNN

#### Improvised Resnet

In [5]:
# ==========================================
# IMPROVED MODEL: Mini-ResNet
# ==========================================

# 1. The Building Block (The Innovation)
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        
        # First convolution
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        
        # Second convolution
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # The "Skip Connection" logic
        # If the input size changes (due to stride), we need to resize the shortcut too
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = x # Save the original input (the "jump")
        
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        
        out += self.shortcut(identity) # ADD the original input back here
        out = self.relu(out)
        
        return out

In [6]:
# ==========================================
# IMPROVED MODEL: ResNet with Dropout
# ==========================================
class ResNetFromScratch(nn.Module):
    def __init__(self, num_classes=200):
        super(ResNetFromScratch, self).__init__()
        
        # Initial processing (Entry point)
        self.in_channels = 64
        # Standard ResNet Start
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        # Stack the Residual Blocks
        self.layer1 = self._make_layer(64, 2, stride=1)
        self.layer2 = self._make_layer(128, 2, stride=2)
        self.layer3 = self._make_layer(256, 2, stride=2)
        self.layer4 = self._make_layer(512, 2, stride=2)
        
        # Classifier
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        
        # --- INNOVATION: Dropout ---
        # Dropping 50% of neurons prevents overfitting on small datasets
        self.dropout = nn.Dropout(p=0.5) 
        
        self.fc = nn.Linear(512, num_classes)

        # Initialize weights (Helps training start better)
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, out_channels, blocks, stride):
        layers = []
        layers.append(ResidualBlock(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        
        # Apply Dropout before the final classification
        x = self.dropout(x)
        
        x = self.fc(x)
        return x

#### Data loader

In [7]:
# ==========================================
# 5. SETUP LOADERS & MODEL
# ==========================================
# Load Data
full_dataset = BirdDataset(
    csv_file=f'{DATA_PATH}/train_images.csv', 
    root_dir=f'{DATA_PATH}',
    transform=train_transforms
)

# Split 80/20
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

# IMPORTANT: Validation set should NOT use augmentation (just resize)
val_dataset.dataset.transform = val_transforms 

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)


#### CNN Model

#### Restnet Model

In [8]:
# ==========================================
# 6. SUPER-CONVERGENCE TRAINING LOOP
# ==========================================
import time

# 1. SETUP

print("Initializing ResNet with Dropout & OneCycleLR...")
model = ResNetFromScratch(num_classes=NUM_CLASSES).to(DEVICE)

criterion = nn.CrossEntropyLoss()

# OPTIMIZER: AdamW (Better weight decay handling)
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

# SCHEDULER: OneCycleLR
# This ramps the LR up to 'max_lr' then down to 0. 
# It is extremely effective for training from scratch.
scheduler = optim.lr_scheduler.OneCycleLR(
    optimizer, 
    max_lr=0.003,              # Peak learning rate
    epochs=EPOCHS, 
    steps_per_epoch=len(train_loader)
)

# 2. TRAINING ENGINE
# -----------------------------
best_acc = 0.0
best_model_wts = copy.deepcopy(model.state_dict())
start_time = time.time()

print(f"Starting training on {DEVICE} for {EPOCHS} epochs...")

for epoch in range(EPOCHS):
    
    # --- TRAIN PHASE ---
    model.train()
    running_loss = 0.0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        
        optimizer.zero_grad()
        
        # Forward
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward
        loss.backward()
        optimizer.step()
        
        # STEP SCHEDULER (OneCycleLR updates every BATCH, not every EPOCH)
        scheduler.step()
        
        running_loss += loss.item() * inputs.size(0)
    
    epoch_loss = running_loss / len(train_loader.dataset)
    
    # --- VALIDATION PHASE ---
    model.eval()
    correct = 0
    total = 0
    val_loss = 0.0
    
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels) # Calc val loss too
            
            _, predicted = torch.max(outputs, 1)
            
            val_loss += loss.item() * inputs.size(0)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    val_acc = correct / total
    val_loss = val_loss / len(val_loader.dataset)
    
    # Get current LR for printout
    current_lr = optimizer.param_groups[0]['lr']
    
    print(f"Epoch {epoch+1}/{EPOCHS} | Train Loss: {epoch_loss:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f} | LR: {current_lr:.6f}")
    
    # --- SAVE BEST MODEL ---
    if val_acc > best_acc:
        best_acc = val_acc
        best_model_wts = copy.deepcopy(model.state_dict())
        torch.save(model.state_dict(), 'best_custom_model.pth')

# End of training
time_elapsed = time.time() - start_time
print(f'\nTraining complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
print(f'Best Validation Accuracy: {best_acc:.4f}')

# Load best weights
model.load_state_dict(best_model_wts)

Initializing ResNet with Dropout & OneCycleLR...
Starting training on cuda for 100 epochs...
Epoch 1/100 | Train Loss: 5.4081 | Val Loss: 5.1934 | Val Acc: 0.0216 | LR: 0.000128
Epoch 2/100 | Train Loss: 5.1062 | Val Loss: 5.1179 | Val Acc: 0.0242 | LR: 0.000152
Epoch 3/100 | Train Loss: 5.0073 | Val Loss: 5.0449 | Val Acc: 0.0242 | LR: 0.000191
Epoch 4/100 | Train Loss: 4.9097 | Val Loss: 5.2082 | Val Acc: 0.0356 | LR: 0.000245
Epoch 5/100 | Train Loss: 4.8281 | Val Loss: 5.0208 | Val Acc: 0.0344 | LR: 0.000313
Epoch 6/100 | Train Loss: 4.7823 | Val Loss: 4.9835 | Val Acc: 0.0471 | LR: 0.000395
Epoch 7/100 | Train Loss: 4.7333 | Val Loss: 4.9522 | Val Acc: 0.0496 | LR: 0.000490
Epoch 8/100 | Train Loss: 4.6622 | Val Loss: 5.0760 | Val Acc: 0.0369 | LR: 0.000597
Epoch 9/100 | Train Loss: 4.5847 | Val Loss: 5.3362 | Val Acc: 0.0369 | LR: 0.000714
Epoch 10/100 | Train Loss: 4.5530 | Val Loss: 5.1418 | Val Acc: 0.0369 | LR: 0.000841
Epoch 11/100 | Train Loss: 4.4957 | Val Loss: 4.9498 | V

<All keys matched successfully>

In [9]:
# ==========================================
# 7. GENERATE SUBMISSION
# ==========================================
# Load Best Model
model.load_state_dict(torch.load('best_custom_model.pth'))
model.eval()

test_dataset = BirdDataset(
    csv_file=f'{DATA_PATH}/test_images_path.csv',
    root_dir=f'{DATA_PATH}',
    transform=val_transforms # No augmentation for testing
)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

all_preds = []
all_ids = []

print("Generating predictions...")
with torch.no_grad():
    for inputs, ids in test_loader:
        inputs = inputs.to(DEVICE)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        
        # Convert back to 1-200 range
        predicted = predicted.cpu().numpy() + 1 
        
        all_preds.extend(predicted)
        all_ids.extend(ids.numpy())

# Save CSV
submission = pd.DataFrame({'id': all_ids, 'label': all_preds})
submission.to_csv('submission_scratch5.csv', index=False)
print("Saved submission_scratch.csv!")

Generating predictions...
Saved submission_scratch.csv!
