### **Importing Libraries**

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, ConcatDataset, TensorDataset
from torch.utils.data import Dataset
import torchvision.models as models
from torchvision import transforms
import torchvision
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import torchvision.transforms as T
from PIL import ImageFilter, ImageEnhance
from PIL import Image
import math
import random
from timm.data.mixup import Mixup
import torchvision.transforms.functional as TF
from torchvision.transforms import autoaugment
from timm.data import RandAugment
from timm.scheduler.cosine_lr import CosineLRScheduler
import timm
import csv
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights


import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

### **Dataset**

#### **Helper Functions**

In [None]:
transform_base = T.Compose([
    T.Resize((224, 224)),
    T.RandAugment(num_ops=2, magnitude=9),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ,
    T.RandomErasing(p=0.25, scale=(0.02, 0.1), ratio=(0.3, 3.3)),
])

transform_color = T.Compose([
    T.Resize((224, 224)),
    T.ColorJitter(brightness=0.4, contrast=0.3, saturation=0.3, hue=0.1),
    T.GaussianBlur(kernel_size=3, sigma=(0.1, 1.0)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    T.RandomErasing(p=0.25, scale=(0.02, 0.1), ratio=(0.3, 3.3)),
])

transform_affine = T.Compose([
    T.Resize((288, 288)),
    T.RandomResizedCrop(224, scale=(0.8, 1.0), ratio=(0.9, 1.1)),
    T.RandomAffine(degrees=0, translate=(0.2, 0.2), scale=(0.85, 1.15), shear=10),
    T.RandomPerspective(distortion_scale=0.2, p=0.5),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    T.RandomErasing(p=0.25, scale=(0.02, 0.1), ratio=(0.3, 3.3)),
])

transform_val = T.Compose([
    T.Resize((224, 224)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def angle_to_vector(theta_deg):
    theta_rad = math.radians(theta_deg)
    return torch.tensor([math.cos(theta_rad), math.sin(theta_rad)], dtype=torch.float32)

def vector_to_angle(vector):
    cos_theta, sin_theta = vector
    angle_rad = torch.atan2(sin_theta, cos_theta)
    angle_deg = angle_rad * (180 / math.pi)
    # Ensure angle is in [0, 360) range
    return (angle_deg + 360) % 360

#### **Dataset Class**

##### **Train & Val**

In [None]:
class AngleDataset(Dataset):
    def __init__(self, image_dir, labels_df, transform=None):
        self.image_dir = image_dir
        self.labels_df = labels_df
        self.transform = transform

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        row = self.labels_df.iloc[idx]
        img_path = os.path.join(self.image_dir, row['filename'])
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        angle = float(row['angle'])
        angle_vector = angle_to_vector(angle)  # (cosθ, sinθ)
        return image, angle_vector


##### **Test**

In [None]:
class TestDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.image_filenames = sorted(os.listdir(image_dir))
        self.transform = transform

    def __len__(self):
        return len(self.image_filenames)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.image_filenames[idx])
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

##### **Augment Dataset**

In [None]:
def create_extended_dataset(image_dir, labels_df):
    # Original dataset
    original_dataset = AngleDataset(
        image_dir=image_dir,
        labels_df=labels_df,
        transform=transform_base
    )
    
    # Color jitter augmented dataset
    color_dataset = AngleDataset(
        image_dir=image_dir,
        labels_df=labels_df,
        transform=transform_color
    )
    
    # # Affine transform augmented dataset
    affine_dataset = AngleDataset(
        image_dir=image_dir,
        labels_df=labels_df,
        transform=transform_affine
    )
    
    extended_dataset = ConcatDataset([original_dataset, color_dataset, affine_dataset])
    
    return extended_dataset

#### **Training**

In [None]:
image_dir_train = "./Dataset/Train/images_train"
labels_path_train = "./Dataset/Train/labels_train.csv"

labels_df = pd.read_csv(labels_path_train)

train_dataset = create_extended_dataset(image_dir_train, labels_df)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

#### **Validation**

In [None]:
images_dir_val = "./Dataset/Val/images_val"
labels_path_val = "./Dataset/Val/labels_val.csv"
labels_df_val = pd.read_csv(labels_path_val)

val_dataset = AngleDataset(images_dir_val, labels_df_val, transform_val)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

### **Testing**

In [None]:
images_dir_test = "./Dataset/Test"

test_dataset = TestDataset(images_dir_test, transform_val)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

### **Model**

#### **Model Implementation**

In [None]:
class EfficientNetV3Regressor(nn.Module):
    def __init__(self, pretrained=True, dropout_rate=0.2):
        super().__init__()
        
        self.base_model = timm.create_model(
            'tf_efficientnetv2_s',
            pretrained=pretrained,
            num_classes=0,
        )
        
        feature_dim = self.base_model.num_features  
        
        self.vector_branch = nn.Sequential(
            nn.LayerNorm(feature_dim),
            nn.Dropout(dropout_rate),
            nn.Linear(feature_dim, 2)
        )

    def forward(self, x):
        # x = F.interpolate(x, size=(192, 192), mode='bilinear', align_corners=False)        
        features = self.base_model(x)
        vector_output = self.vector_branch(features)
        vector_output = F.normalize(vector_output, p=2, dim=1)
        return {'vector': vector_output}

class DeitRegressor(nn.Module):
    def __init__(self, pretrained=True, dropout_rate=0.2):
        super().__init__()
        
        self.base_model = timm.create_model(
            'deit_base_patch16_224',
            pretrained=pretrained,
            num_classes=0,
        )
        
        feature_dim = self.base_model.num_features
        
        self.vector_branch = nn.Sequential(
            nn.LayerNorm(feature_dim),
            nn.Dropout(dropout_rate),
            nn.Linear(feature_dim, 2)
        )

    def forward(self, x):

        features = self.base_model(x)
        vector_output = self.vector_branch(features)
        vector_output = F.normalize(vector_output, p=2, dim=1)
        return {'vector': vector_output}

class TorchvisionEffNetV2Regressor(nn.Module):
    def __init__(self, pretrained=True, dropout_rate=0.2):
        super().__init__()
        
        weights = EfficientNet_V2_S_Weights.IMAGENET1K_V1 if pretrained else None
        self.base_model = efficientnet_v2_s(weights=weights)
        
        self.base_model.classifier = nn.Identity()
        feature_dim = self.base_model.classifier.in_features if hasattr(self.base_model.classifier, 'in_features') else 1280

        self.vector_branch = nn.Sequential(
            nn.LayerNorm(feature_dim),
            nn.Dropout(dropout_rate),
            nn.Linear(feature_dim, 2)
        )

    def forward(self, x):
        x = F.interpolate(x, size=(192, 192), mode='bilinear', align_corners=False)
        features = self.base_model(x)
        vector = self.vector_branch(features)
        vector = F.normalize(vector, p=2, dim=1)
        return {"vector": vector}
    
class GlobalContextModule(nn.Module):

    def __init__(self, in_channels, reduction_ratio=16):
        super().__init__()
        self.conv_mask = nn.Conv2d(in_channels, 1, kernel_size=1)
        self.softmax = nn.Softmax(dim=2)
        
        self.channel_attn = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction_ratio),
            nn.SiLU(),
            nn.Linear(in_channels // reduction_ratio, in_channels),
            nn.Sigmoid()
        )
        
    def forward(self, x):
        batch, channels, height, width = x.size()
        
        input_x = x
        input_x = self.conv_mask(input_x)  # [B, 1, H, W]
        input_x = input_x.view(batch, 1, height * width)
        attn = self.softmax(input_x)
        attn = attn.view(batch, 1, height, width)
        
        x_weighted = x * attn
        
        context = torch.sum(x_weighted, dim=(2, 3), keepdim=True) / (height * width)
        context = context.view(batch, channels)
        
        channel_attn = self.channel_attn(context).view(batch, channels, 1, 1)
        
        return x * channel_attn

class FeaturePyramidModule(nn.Module):

    def __init__(self, in_channels_list, out_channels):
        super().__init__()
        
        self.lateral_convs = nn.ModuleList([
            nn.Conv2d(in_channels, out_channels, kernel_size=1)
            for in_channels in in_channels_list
        ])
        
        self.output_convs = nn.ModuleList([
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
            for _ in range(len(in_channels_list))
        ])
        
    def forward(self, features):
        
        results = []
        
        prev = self.lateral_convs[-1](features[-1])
        results.append(self.output_convs[-1](prev))
        
        for i in range(len(features) - 2, -1, -1):

            current = self.lateral_convs[i](features[i])
            
            prev_shape = prev.shape[2:]
            current_shape = current.shape[2:]
            
            if prev_shape[0] < current_shape[0] or prev_shape[1] < current_shape[1]:
                prev = F.interpolate(prev, size=current_shape, mode='bilinear', align_corners=False)
            
            prev = current + prev
            results.append(self.output_convs[i](prev))
        
        return list(reversed(results))

class SelfAttention2D(nn.Module):
    def __init__(self, in_dim):
        super().__init__()
        self.query = nn.Conv2d(in_dim, in_dim // 8, 1)
        self.key = nn.Conv2d(in_dim, in_dim // 8, 1)
        self.value = nn.Conv2d(in_dim, in_dim, 1)
        self.gamma = nn.Parameter(torch.zeros(1))

    def forward(self, x):
        B, C, H, W = x.size()
        proj_query = self.query(x).view(B, -1, H * W).permute(0, 2, 1)  # [B, N, C']
        proj_key = self.key(x).view(B, -1, H * W)                       # [B, C', N]
        energy = torch.bmm(proj_query, proj_key)                       # [B, N, N]
        attention = F.softmax(energy, dim=-1)
        proj_value = self.value(x).view(B, -1, H * W)                  # [B, C, N]
        out = torch.bmm(proj_value, attention.permute(0, 2, 1))       # [B, C, N]
        out = out.view(B, C, H, W)
        return self.gamma * out + x
    
class EfficientNetV2Regressor(nn.Module):

    def __init__(self, pretrained=True, dropout_rate=0.2, use_fpn=True):
        super().__init__()
        
        self.base_model = timm.create_model(
            'tf_efficientnetv2_s', 
            pretrained=pretrained,
            features_only=True,
            out_indices=(2, 3, 4)
        )
        
        with torch.no_grad():
            dummy_input = torch.zeros(1, 3, 224, 224)
            features = self.base_model(dummy_input)
            feature_channels = [feat.shape[1] for feat in features]
            print(f"Actual feature channels: {feature_channels}")
        
        self.use_fpn = use_fpn
        if use_fpn:
            self.fpn = FeaturePyramidModule(feature_channels, 256)
            merged_channels = 256
        else:
            merged_channels = feature_channels[-1]

        self.attention = SelfAttention2D(merged_channels)        
        self.gc_module = GlobalContextModule(merged_channels)
        
        self.vector_branch = nn.Sequential(
            nn.Linear(merged_channels, 512),
            nn.LayerNorm(512),
            nn.SiLU(),
            nn.Dropout(dropout_rate),
            
            nn.Linear(512, 256),
            nn.LayerNorm(256),
            nn.SiLU(),
            nn.Dropout(dropout_rate * 0.5),
            
            nn.Linear(256, 64),
            nn.LayerNorm(64),
            nn.SiLU(),
            
            nn.Linear(64, 2)
        )
        
        self.bin_branch = nn.Sequential(
            nn.Linear(merged_channels, 256),
            nn.LayerNorm(256),
            nn.SiLU(),
            nn.Dropout(dropout_rate),
            
            nn.Linear(256, 128),
            nn.LayerNorm(128),
            nn.SiLU(),
            
            nn.Linear(128, 36)
        )
    
    def forward(self, x):

        features = self.base_model(x)
        
        if self.use_fpn:
            processed_features = self.fpn(features)
            feat = processed_features[0]
        else:
            feat = features[-1]

        feat = self.attention(feat)
        feat = self.gc_module(feat)

        x = F.adaptive_avg_pool2d(feat, 1).flatten(1)
        
        vector_output = self.vector_branch(x)
        bin_logits = self.bin_branch(x)
        
        vector_output = F.normalize(vector_output, p=2, dim=1)
        
        return {
            'vector': vector_output,
            'bin_logits': bin_logits
        }

#### **Loss Function**

In [None]:
class AngleLoss(nn.Module):
    def __init__(self, reduction='mean'):
        super().__init__()
        self.reduction = reduction
        
    def forward(self, y_pred, y_true):

        y_pred_normalized = y_pred / torch.norm(y_pred, dim=1, keepdim=True)
        y_true_normalized = y_true / torch.norm(y_true, dim=1, keepdim=True)
        
        cos_angle_diff = torch.sum(y_pred_normalized * y_true_normalized, dim=1)
        
        cos_angle_diff = torch.clamp(cos_angle_diff, -1.0 + 1e-7, 1.0 - 1e-7)
        
        angle_loss = 1.0 - cos_angle_diff
        
        if self.reduction == 'mean':
            return angle_loss.mean()
        elif self.reduction == 'sum':
            return angle_loss.sum()
        else:
            return angle_loss


### **Training**

#### **Evaluation**

In [None]:
def evaluate_model(model, test_loader, device):
    model.eval()
    predictions = []
    ground_truths = []
    angle_errors = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            outputs = outputs['vector']
            
            for i in range(outputs.size(0)):
                true_angle = vector_to_angle(labels[i]).item()
                pred_angle = vector_to_angle(outputs[i]).item()

                # Normalize angles to [0, 360)
                true_angle = true_angle % 360
                pred_angle = pred_angle % 360
                
                angle_diff = abs(true_angle - pred_angle)
                angle_diff = min(angle_diff, 360 - angle_diff)
                
                predictions.append(pred_angle)
                ground_truths.append(true_angle)
                angle_errors.append(angle_diff)
    
    mean_angle_error = sum(angle_errors) / len(angle_errors)
    median_angle_error = sorted(angle_errors)[len(angle_errors) // 2]
    
    mse = 0
    for i in range(len(predictions)):
        pred_rad = math.radians(predictions[i])
        true_rad = math.radians(ground_truths[i])
        
        pred_sin, pred_cos = math.sin(pred_rad), math.cos(pred_rad)
        true_sin, true_cos = math.sin(true_rad), math.cos(true_rad)
        
        mse += (pred_sin - true_sin)**2 + (pred_cos - true_cos)**2
    
    mse /= len(predictions)
    
    results = {
        'mean_angle_error': mean_angle_error,
        'median_angle_error': median_angle_error,
        'mse_sin_cos': mse,
        'predictions': predictions,
        'ground_truths': ground_truths,
        'angle_errors': angle_errors
    }
    return results


#### **Training Function**

In [None]:
def train_regression_model(model, train_loader, val_loader, optimizer, num_epochs, device, loss_fn):
    model.to(device)
    best_val_loss = float('inf')

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]")

        for images, targets in pbar:
            images = images.to(device)
            targets = targets.to(device)

            outputs = model(images)
            preds = outputs['vector']
            loss = loss_fn(preds, targets)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * images.size(0)
            pbar.set_postfix(loss=loss.item())

        avg_train_loss = train_loss / len(train_loader.dataset)

        # Validation loop
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            pbar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Val]")
            for images, targets in pbar:
                images = images.to(device)
                targets = targets.to(device)

                outputs = model(images)
                preds = outputs['vector']
                loss = loss_fn(preds, targets)

                val_loss += loss.item() * images.size(0)
                pbar.set_postfix(loss=loss.item())

        avg_val_loss = val_loss / len(val_loader.dataset)

        # Optional: you can compute angle error for logging
        results = evaluate_model(model, val_loader, device)
        val_mae = results['mean_angle_error']


        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_model.pth')

        print(f"Epoch {epoch+1}: Train MSE = {avg_train_loss:.4f}, Val MSE = {avg_val_loss:.4f}, Val MAE = {val_mae:.4f}")

### **Model Training**

In [None]:
model = DeitRegressor(pretrained=True, dropout_rate=0.2)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5, weight_decay=0.05)
loss_fn = AngleLoss(reduction='mean')
    

# Train
train_regression_model(model, train_loader, val_loader, optimizer,
                      num_epochs=100, device=device, loss_fn=loss_fn)

In [None]:
model = EfficientNetV2Regressor(pretrained=True,dropout_rate=0.2)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
loss_fn = nn.MSELoss()

# Train
train_regression_model(model, train_loader, val_loader, optimizer,
                       num_epochs=20, device=device, loss_fn=loss_fn)

In [None]:
results = evaluate_model(model, val_loader, device)
print(f"Mean Angle Error: {results['mean_angle_error']:.4f} degrees")
print(f"Median Angle Error: {results['median_angle_error']:.4f} degrees")
print(f"MSE (sin/cos): {results['mse_sin_cos']:.4f}")

In [None]:
def get_predicted_angles_ensemble(models, val_loader, test_loader, device):
    for model in models:
        model.eval()

    predicted_angles = []

    with torch.no_grad():
        # Validation predictions
        for inputs, _ in val_loader:
            inputs = inputs.to(device)
            summed_outputs = torch.zeros(inputs.size(0), 2).to(device)

            for model in models:
                outputs = model(inputs)
                summed_outputs += outputs['vector']

            avg_outputs = summed_outputs / len(models)

            for output in avg_outputs:
                angle = vector_to_angle(output).item()
                predicted_angles.append(angle % 360)

        assert len(predicted_angles) == 369, f"Expected 369 val predictions, got {len(predicted_angles)}"

        # Test predictions
        for inputs in test_loader:
            inputs = inputs.to(device)
            summed_outputs = torch.zeros(inputs.size(0), 2).to(device)

            for model in models:
                outputs = model(inputs)
                summed_outputs += outputs['vector']

            avg_outputs = summed_outputs / len(models)

            for output in avg_outputs:
                angle = vector_to_angle(output).item()
                predicted_angles.append(angle % 360)

        assert len(predicted_angles) == 738, f"Expected 738 total predictions, got {len(predicted_angles)}"

    return predicted_angles


def create_submission_csv(predicted_angles, roll_number='2022101001', version='3'):
    filename = f"{roll_number}_{version}.csv"
    
    with open(filename, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['id', 'angle'])  # Header

        for idx, angle in enumerate(predicted_angles):
            writer.writerow([idx, angle % 360])

    print(f"Submission CSV saved as: {filename}")

In [None]:
def evaluate_ensemble(models, test_loader, device):
    for model in models:
        model.eval()

    predictions = []
    ground_truths = []
    angle_errors = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Sum the output vectors from all models
            summed_outputs = torch.zeros(labels.size(0), 2).to(device)
            for model in models:
                outputs = model(inputs)
                summed_outputs += outputs['vector']

            # Average the outputs
            avg_outputs = summed_outputs / len(models)

            for i in range(avg_outputs.size(0)):
                true_angle = vector_to_angle(labels[i]).item()
                pred_angle = vector_to_angle(avg_outputs[i]).item()

                # Normalize angles to [0, 360)
                true_angle = true_angle % 360
                pred_angle = pred_angle % 360

                angle_diff = abs(true_angle - pred_angle)
                angle_diff = min(angle_diff, 360 - angle_diff)

                predictions.append(pred_angle)
                ground_truths.append(true_angle)
                angle_errors.append(angle_diff)

    mean_angle_error = sum(angle_errors) / len(angle_errors)
    median_angle_error = sorted(angle_errors)[len(angle_errors) // 2]

    mse = 0
    for i in range(len(predictions)):
        pred_rad = math.radians(predictions[i])
        true_rad = math.radians(ground_truths[i])

        pred_sin, pred_cos = math.sin(pred_rad), math.cos(pred_rad)
        true_sin, true_cos = math.sin(true_rad), math.cos(true_rad)

        mse += (pred_sin - true_sin) ** 2 + (pred_cos - true_cos) ** 2

    mse /= len(predictions)

    results = {
        'mean_angle_error': mean_angle_error,
        'median_angle_error': median_angle_error,
        'mse_sin_cos': mse,
        'predictions': predictions,
        'ground_truths': ground_truths,
        'angle_errors': angle_errors
    }

    return results

# Load the models
model_eff_ev = EfficientNetV2Regressor(pretrained=False)
model_eff_ev.to(device)
model_deit_ev = EfficientNetV3Regressor(pretrained=False)
model_deit_ev.to(device)
model_deit2_ev = DeitRegressor(pretrained=False)
model_deit2_ev.to(device)
model_fpn_ev = EfficientNetV2Regressor(pretrained=False, dropout_rate=0.2, use_fpn=True)
model_fpn_ev.to(device)

# Load the ensemble model
checkpoint = torch.load('./ensemble_model_4.pth')
model_eff_ev.load_state_dict(checkpoint['model_eff'])
model_deit_ev.load_state_dict(checkpoint['model_deit'])
model_deit2_ev.load_state_dict(checkpoint['model_deit2'])
model_fpn_ev.load_state_dict(checkpoint['model_fpn'])


results = evaluate_ensemble([model_deit_ev, model_deit2_ev, model_eff_ev, model_fpn_ev], val_loader, device)
print(f"Mean Angle Error: {results['mean_angle_error']:.4f} degrees")
ensemble_predictions = get_predicted_angles_ensemble([model_deit_ev, model_deit2_ev, model_eff_ev, model_fpn_ev], val_loader, test_loader, device)
create_submission_csv(ensemble_predictions, roll_number='2022101096', version='9')
# # Save the ensemble model
# ensemble_model_path = 'ensemble_model_4.pth'
# torch.save({
#     'model_eff': model_eff_ev.state_dict(),
#     'model_deit': model_deit_ev.state_dict(),
#     'model_deit2': model_deit2_ev.state_dict(),
#     'model_fpn': model_fpn_ev.state_dict(),
# }, ensemble_model_path)
# print(f"Ensemble model saved as: {ensemble_model_path}")