In [None]:
# !pip install torch torchvision --force-reinstall

from google.colab import drive
drive.mount('/content/drive')

!cp -r /content/drive/MyDrive/SMAI_Project/images_train /content/
!cp -r /content/drive/MyDrive/SMAI_Project/images_val /content/
!cp -r /content/drive/MyDrive/SMAI_Project/images_test /content/
!cp -r /content/drive/MyDrive/SMAI_Project/labels_train.csv /content/
!cp -r /content/drive/MyDrive/SMAI_Project/labels_val.csv /content/

In [None]:
val_csv_path = 'labels_val.csv'
train_csv_path = '/content/labels_train.csv'
train_img_dir = '/content/images_train'
val_img_dir = '/content/images_val'
lat_long_output_csv = '/content/lat-long.csv'
region_output_csv = '/content/region.csv'
angle_output_csv = '/content/angle.csv'
test_img_dir = '/content/images_test'

In [None]:
import os
import pandas as pd
import numpy as np
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms, models
import timm

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"💻 Using device: {device}")

In [None]:
train_df = pd.read_csv(train_csv_path)
val_df = pd.read_csv(val_csv_path)
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
])


val_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])
test_transform = val_transform

In [None]:
class CompassDataset(Dataset):
    def __init__(self, img_dir, csv_path, transform=None):
        self.df = pd.read_csv(csv_path)
        self.img_dir = img_dir
        self.transform = transform
        angles_rad = np.deg2rad(self.df['angle'].astype(float).values)
        self.sin_targets = np.sin(angles_rad)
        self.cos_targets = np.cos(angles_rad)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.img_dir, row['filename'])
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        target = torch.tensor([self.sin_targets[idx], self.cos_targets[idx]], dtype=torch.float32)
        return image, target

class TestDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_dir = img_dir
        self.filenames = sorted(os.listdir(img_dir))
        self.transform = transform

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.filenames[idx])
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

In [None]:
train_dataset = CompassDataset(train_img_dir, train_csv_path, transform=train_transform)
val_dataset = CompassDataset(val_img_dir, val_csv_path, transform=val_transform)
test_dataset = TestDataset(test_img_dir, transform=val_transform)
print(train_csv_path)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=2, pin_memory=True)

In [None]:
model = timm.create_model('efficientnet_b0', pretrained=True)
in_features = model.classifier.in_features
model.classifier = nn.Linear(in_features, 2)
if hasattr(model, 'gradient_checkpointing_enable'):
    model.gradient_checkpointing_enable()

model = model.to(device)

def circular_mae(pred, target):
    ang_pred = torch.atan2(pred[:, 0], pred[:, 1]) * (180.0 / np.pi)
    ang_true = torch.atan2(target[:, 0], target[:, 1]) * (180.0 / np.pi)
    diff = torch.abs(ang_pred - ang_true)
    return torch.min(diff, 360.0 - diff).mean()


def evaluate(model, loader, device):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for imgs, tgt in loader:
            imgs, tgt = imgs.to(device, non_blocking=True), tgt.to(device, non_blocking=True)
            preds = model(imgs)
            total_loss += circular_mae(preds, tgt).item() * imgs.size(0)
    return total_loss / len(loader.dataset)

In [None]:
def set_trainable_layers(model, train_head_only=True):
    for name, param in model.named_parameters():
        param.requires_grad = True if not train_head_only else ('classifier' in name)

def get_optimizer(model, lr):
    params_to_update = [p for p in model.parameters() if p.requires_grad]
    return torch.optim.Adam(params_to_update, lr=lr)

def process_batch(model, imgs, tgt, device):
    imgs, tgt = imgs.to(device), tgt.to(device)
    with torch.cuda.amp.autocast():
        preds = model(imgs)
        loss_batch = circular_mae(preds, tgt)
    return preds, loss_batch


def calculate_loss_and_backward(loss_batch, scaler, optimizer, batch_accum):
    loss = loss_batch / batch_accum
    scaler.scale(loss).backward()
    return loss


def manage_optimizer_step(step, scaler, optimizer, batch_accum):
    if (step + 1) % batch_accum == 0:
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()


def train_one_epoch(model, loader, optimizer, scaler, device, batch_accum, desc="Training"):
    model.train()
    loop = iter(tqdm(loader, desc=desc))
    step = 0
    total_loss = 0.0
    optimizer.zero_grad()

    while True:
        try:
            imgs, tgt = next(loop)
        except StopIteration:
            break


        preds, loss_batch = process_batch(model, imgs, tgt, device)
        loss = calculate_loss_and_backward(loss_batch, scaler, optimizer, batch_accum)
        manage_optimizer_step(step, scaler, optimizer, batch_accum)
        
        step += 1
        total_loss += loss_batch.item() * imgs.size(0)
        tqdm.write(f"Loss: {loss_batch.item():.4f}")

    return total_loss / len(loader.dataset)


def save_best_model(model, val_loss, best_loss, best_state_dict, path='best_model.pt'):
    if val_loss >= best_loss:
        return best_loss, best_state_dict
    best_state_dict = model.state_dict()
    torch.save(best_state_dict, path)
    print(f"Saved best model with Val MAE: {val_loss:.2f}° → {path}")
    return val_loss, best_state_dict


def train_head(model, train_loader, val_loader, device, head_epochs, head_lr, batch_accum, patience):
    scaler = torch.cuda.amp.GradScaler()
    best_loss = float('inf')
    best_state_dict = None
    optimizer = get_optimizer(model, head_lr)
    
    epoch = 0
    while epoch < head_epochs:
        train_mae = train_one_epoch(model, train_loader, optimizer, scaler, device, batch_accum, desc=f"Head Epoch {epoch+1}/{head_epochs}")
        val_loss = evaluate(model, val_loader, device)
        print(f"→ [Head] Train MAE: {train_mae:.2f}°, Val MAE: {val_loss:.2f}°")
        best_loss, best_state_dict = save_best_model(model, val_loss, best_loss, best_state_dict)
        epoch += 1
        
    return best_loss, best_state_dict


def fine_tune(model, train_loader, val_loader, device, ft_epochs, ft_lr, batch_accum, patience, best_loss, best_state_dict):
    scaler = torch.cuda.amp.GradScaler()
    optimizer = get_optimizer(model, ft_lr)
    
    epoch = 0
    while epoch < ft_epochs:
        train_mae = train_one_epoch(model, train_loader, optimizer, scaler, device, batch_accum, desc=f"FT Epoch {epoch+1}/{ft_epochs}")
        val_loss = evaluate(model, val_loader, device)
        print(f"→ [FT] Train MAE: {train_mae:.2f}°, Val MAE: {val_loss:.2f}°")
        best_loss, best_state_dict = save_best_model(model, val_loss, best_loss, best_state_dict)
        epoch += 1
        
    return best_loss, best_state_dict


def train_model(model, train_loader, val_loader, device, head_epochs=10, ft_epochs=50, head_lr=1e-3, ft_lr=1e-4, batch_accum=2, patience=5):
    # Head Training
    set_trainable_layers(model, train_head_only=True)
    best_loss, best_state_dict = train_head(model, train_loader, val_loader, device, head_epochs, head_lr, batch_accum, patience)
    
    # Fine-Tuning
    set_trainable_layers(model, train_head_only=False)
    best_loss, best_state_dict = fine_tune(model, train_loader, val_loader, device, ft_epochs, ft_lr, batch_accum, patience, best_loss, best_state_dict)
    
    # Load the best model after training
    model.load_state_dict(best_state_dict) if best_state_dict else print("No model was saved (no improvement observed during training).")


train_model(model, train_loader, val_loader, device)

In [None]:
def compute_angles_from_output(preds):
    """Convert sine and cosine predictions to angles in degrees."""
    sin = preds[:, 0]
    cos = preds[:, 1]
    ang = torch.atan2(sin, cos) * (180.0 / np.pi)
    return ang % 360

def summarize_angles(angles):
    """Print basic statistics about predicted angles."""
    print(f"📊 Angle Stats → Min: {min(angles):.2f}°, Max: {max(angles):.2f}°, Mean: {np.mean(angles):.2f}°")

def save_angles_to_csv(angles, path):
    """Save predicted angles to a CSV file for debugging or analysis."""
    df = pd.DataFrame({"angle": angles})
    df.to_csv(path, index=False)
    print(f"📝 Debug angles saved to {path}")

def predict_angles(model, loader, device, return_raw=False, verbose=False, save_csv=False, csv_path="debug_angles.csv"):
    """Run inference and predict angles from a model."""
    model.eval()
    preds_final = []
    sin_cos_raw = []

    with torch.no_grad():
        for i, batch in enumerate(loader):
            imgs = batch[0] if isinstance(batch, (tuple, list)) else batch
            imgs = imgs.to(device)
            preds = model(imgs)
            angles = compute_angles_from_output(preds).cpu().numpy().tolist()
            preds_final.extend(angles)
            if return_raw:
                sin_cos_raw.extend(preds.cpu().numpy().tolist())
            if verbose:
                print(f"Batch {i+1}: Angles = {[round(a, 2) for a in angles]}")

    summarize_angles(preds_final)

    if save_csv:
        save_angles_to_csv(preds_final, csv_path)

    return (preds_final, sin_cos_raw) if return_raw else preds_final

def create_submission_csv(angles, output_csv):
    """Create submission file from predicted angles."""
    all_preds = [int(round(x)) for x in angles]
    ids = list(range(len(all_preds)))
    df = pd.DataFrame({"id": ids, "angle": all_preds})
    df.to_csv(output_csv, index=False)
    print(f"Saved submission with {len(df)} rows to {output_csv}")

def generate_submission(model, val_loader, test_loader, device, output_csv="YourRollNo_1.csv", verbose=False):
    """Load the best model and generate submission from validation + test predictions."""
    model.load_state_dict(torch.load("best_model.pt", map_location=device))
    
    print("📦 Predicting on validation data...")
    val_preds = predict_angles(model, val_loader, device, verbose=verbose)

    print("📦 Predicting on test data...")
    test_preds = predict_angles(model, test_loader, device, verbose=verbose)

    all_preds = val_preds + test_preds
    create_submission_csv(all_preds, output_csv)

generate_submission(model, val_loader, test_loader, device, output_csv='2022101001_5.csv', verbose=True)