In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from PIL import Image, ImageOps
import os
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
import random
from torch.nn import functional as F
from torch.cuda.amp import GradScaler, autocast


In [None]:
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0, path='angle.pth'):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.inf
        self.delta = delta
        self.path = path
        
    def __call__(self, val_loss, model):
        score = -val_loss
        
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0
            
    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [None]:
class GeoAngleDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None, augment=False):
        self.annotations = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.augment = augment
        
        self.annotations = self.annotations[
            (self.annotations['angle'].between(0, 360))
        ].reset_index(drop=True)
        
        self.annotations = self.annotations[['filename', 'angle']]

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.annotations.iloc[idx]['filename'])
        angle_deg = float(self.annotations.iloc[idx]['angle'])
        
        image = Image.open(img_path).convert('RGB')
        
        if self.augment:
            image, angle_deg = self._augment_image(image, angle_deg)
            
        if self.transform:
            image = self.transform(image)
            
        angle_rad = np.deg2rad(angle_deg)
        return image, torch.tensor([np.sin(angle_rad), np.cos(angle_rad)], dtype=torch.float32)

    def _augment_image(self, image, angle):

        transform_list = transforms.Compose([
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
            transforms.RandomGrayscale(p=0.1),
            transforms.GaussianBlur(kernel_size=3, sigma=(0.1, 1.0)) if random.random() < 0.1 else lambda x: x,
            transforms.RandomAdjustSharpness(sharpness_factor=2, p=0.1)
        ])
        image = transform_list(image)
        
        if random.random() < 0.2:
            image = transforms.functional.affine(
                image, angle=0, translate=(10,10), scale=1.0, shear=10
            )
        
        return image, angle


In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_features, out_features, drop_rate=0.0):
        super().__init__()
        self.linear1 = nn.Linear(in_features, out_features)
        self.norm = nn.BatchNorm1d(out_features)
        self.activation = nn.GELU()
        self.dropout = nn.Dropout(drop_rate)
        self.skip = nn.Identity() if in_features == out_features else nn.Linear(in_features, out_features)
        
    def forward(self, x):
        identity = self.skip(x)
        out = self.linear1(x)
        out = self.norm(out)
        out = self.activation(out)
        out = self.dropout(out)
        return out + identity

In [None]:
class ResNet50GeoAngleRegressor(nn.Module):
    def __init__(self, pretrained=True):
        super().__init__()
        self.resnet = models.resnet50(pretrained=pretrained)
        in_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Identity()
        
        self.regressor = nn.Sequential(
            nn.LayerNorm(in_features),
            ResidualBlock(in_features, 1024),
            nn.Dropout(0.3),
            ResidualBlock(1024, 512),
            nn.Dropout(0.2),
            ResidualBlock(512, 256),
            nn.Linear(256, 2)
        )
            
    def forward(self, x):
        features = self.resnet(x)
        return F.normalize(self.regressor(features), p=2, dim=1)

In [None]:
def angular_loss(pred, target, alpha=0.6, beta=0.3, gamma=0.1):
    cosine_loss = 1 - torch.mean(torch.sum(pred * target, dim=1))
    
    pred_angle = torch.atan2(pred[:,0], pred[:,1])
    target_angle = torch.atan2(target[:,0], target[:,1])
    angle_diff = torch.abs(pred_angle - target_angle)
    angle_diff = torch.minimum(angle_diff, 2*np.pi - angle_diff)
    
    l1_loss = torch.mean(angle_diff)
    circumference_loss = torch.mean(1 - torch.cos(2*angle_diff))
    
    return alpha*cosine_loss + beta*l1_loss + gamma*circumference_loss


def calculate_maae(pred, target):
    pred_deg = torch.rad2deg(torch.atan2(pred[:,0], pred[:,1])) % 360
    true_deg = torch.rad2deg(torch.atan2(target[:,0], target[:,1])) % 360
    diff = torch.abs(pred_deg - true_deg)
    return torch.mean(torch.minimum(diff, 360 - diff))


In [None]:
def train_model(train_loader, val_loader, device, epochs=50):
    model = ResNet50GeoAngleRegressor().to(device)
    print(f"no of epochs: {epochs}")

    optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5) 
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
    scaler = GradScaler()

    early_stopping = EarlyStopping(patience=10, verbose=True, delta=0, path='angle.pth')

    for epoch in range(epochs):
        model.train()
        train_loss = []
        
        for images, targets in tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}'):
            images, targets = images.to(device), targets.to(device)
            
            optimizer.zero_grad()
            
            with autocast():
                preds = model(images)
                loss = angular_loss(preds, targets)
            
            scaler.scale(loss).backward()
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            scaler.step(optimizer)
            scaler.update()
            
            train_loss.append(loss.item())
        
        model.eval()
        val_loss = []
        val_maae = []
        with torch.no_grad():
            for images, targets in val_loader:
                images, targets = images.to(device), targets.to(device)
                preds = model(images)
                loss = angular_loss(preds, targets)
                val_loss.append(loss.item())
                val_maae.append(calculate_maae(preds, targets).item())
        
        current_loss = np.mean(val_loss)
        current_maae = np.mean(val_maae)
        score = 1/(1 + current_maae)
        
        scheduler.step()
        
        early_stopping(current_loss, model)
        if early_stopping.early_stop:
            print("Early stopping triggered")
            break
        
        print(f"Epoch {epoch+1}: "
              f"Train Loss: {np.mean(train_loss):.4f} | "
              f"Val Loss: {current_loss:.4f} | "
              f"Val MAE: {current_maae:.2f}° | "
              f"Score: {score:.4f}")
    
    model.load_state_dict(torch.load(early_stopping.path))
    return model

In [None]:
def predict(model, dataloader, device):
    model.eval()
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for images, targets in tqdm(dataloader, desc="Predicting"):
            images = images.to(device)
            preds = model(images)
            
            pred_deg = torch.rad2deg(torch.atan2(preds[:,0], preds[:,1])) % 360
            all_preds.extend(pred_deg.cpu().numpy())
            all_targets.append(targets.cpu())
    
    targets = torch.cat(all_targets)
    pred_vecs = torch.stack([torch.sin(torch.deg2rad(torch.tensor(all_preds))),
                            torch.cos(torch.deg2rad(torch.tensor(all_preds)))], dim=1)
    final_maae = calculate_maae(pred_vecs, targets).item()
    score = 1/(1+final_maae)
    print(f"Final MAE: {final_maae:.2f}° | Score: {score:.4f}")
    
    return all_preds

In [None]:
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(42)

In [None]:

train_ds = GeoAngleDataset(
    '/data3/pratyush.jena/misc/MMT/SMAI_Project/Phase_2_data/Phase_2_data/labels_train.csv',
    '/data3/pratyush.jena/misc/MMT/SMAI_Project/Phase_2_data/Phase_2_data/images_train',
    transform=train_transform,
    augment=True
)

val_ds = GeoAngleDataset(
    '/data3/pratyush.jena/misc/MMT/SMAI_Project/Phase_2_data/Phase_2_data/labels_val.csv',
    '/data3/pratyush.jena/misc/MMT/SMAI_Project/Phase_2_data/Phase_2_data/images_val',
    transform=val_transform
)

train_loader = DataLoader(
    train_ds, batch_size=32, shuffle=True, 
    num_workers=2, pin_memory=True
)

val_loader = DataLoader(
    val_ds, batch_size=64, shuffle=False,
    num_workers=2, pin_memory=True
)

In [None]:
model = train_model(train_loader, val_loader, device, epochs=50)

In [None]:
model = ResNet50GeoAngleRegressor().to(device)
model.load_state_dict(torch.load('angle.pth'))

In [None]:
predictions = predict(model, val_loader, device)

submission_df = pd.DataFrame({
'id': range(369),
'angle': [0] * 369  
})

rounded_preds = np.round(predictions[:369]) % 360
rounded_preds = rounded_preds.astype(int)
submission_df.loc[:368, 'angle'] = rounded_preds
submission_df.to_csv('angle_predictions.csv', index=False)
print("Predictions saved to angle_predictions.csv")

In [None]:
def predict_test(model, dataloader, device):
    """Modified prediction function for test set without targets"""
    model.eval()
    all_preds = []
    
    with torch.no_grad():
        for images in tqdm(dataloader, desc="Predicting Test"):
            images = images.to(device)
            preds = model(images)
            
            pred_deg = torch.rad2deg(torch.atan2(preds[:,0], preds[:,1])) % 360
            all_preds.extend(pred_deg.cpu().numpy())
    
    return all_preds

class TestGeoAngleDataset(Dataset):
    """Dataset for test images without angle labels"""
    def __init__(self, csv_file, img_dir, transform=None):
        self.df = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.df.iloc[idx]['filename'])
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
            
        return image

In [None]:
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_ds = TestGeoAngleDataset(
    csv_file='/data3/pratyush.jena/misc/MMT/SMAI_Project/area_predictions_test_with_filenames.csv',
    img_dir='/data3/pratyush.jena/misc/MMT/SMAI_Project/Phase_2_data/Phase_2_data/images_test',
    transform=test_transform
)

test_loader = DataLoader(
    test_ds, 
    batch_size=64, 
    shuffle=False, 
    num_workers=2, 
    pin_memory=True
)


In [None]:
test_predictions = predict_test(model, test_loader, device)

submission_df = pd.DataFrame({
    'id': range(738),
    'angle': [0] * 738
})

rounded_test_preds = np.round(test_predictions).astype(int) % 360

submission_df.loc[369:, 'angle'] = rounded_test_preds

submission_df.to_csv('angle_test.csv', index=False)
print("Test predictions saved to angle_test.csv")