In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import pandas as pd
import numpy as np
from tqdm import tqdm
import math
import timm

train_img_dir = '/kaggle/input/mydataset/images_train/images_train'
val_img_dir = '/kaggle/input/mydataset/images_val/images_val/images_val'
train_csv = pd.read_csv('/kaggle/input/mydataset/labels_train.csv')
val_csv = pd.read_csv('/kaggle/input/mydataset/labels_val.csv')

train_csv['angle'] = train_csv['angle'] % 360
val_csv['angle'] = val_csv['angle'] % 360

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])

class AngleDataset(Dataset):
    def __init__(self, df, img_dir, transform):
        self.df = df
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.img_dir, row['filename'])
        image = Image.open(img_path).convert('RGB')
        angle_rad = math.radians(row['angle'])
        target = torch.tensor([math.cos(angle_rad), math.sin(angle_rad)], dtype=torch.float32)
        image = self.transform(image)
        return image, target

train_loader = DataLoader(AngleDataset(train_csv, train_img_dir, transform), batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(AngleDataset(val_csv, val_img_dir, transform), batch_size=32, shuffle=False, num_workers=2)

class ViTRegressor(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = timm.create_model('vit_large_patch16_224', pretrained=True)
        self.backbone.head = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.backbone.head.in_features, 2)
        )

    def forward(self, x):
        return self.backbone(x)

def cosine_vector_loss(preds, targets):
    preds = nn.functional.normalize(preds, dim=1)
    targets = nn.functional.normalize(targets, dim=1)
    return 1 - (preds * targets).sum(dim=1).mean()

def vec_to_angle_deg(vec):
    vec = nn.functional.normalize(vec, dim=1)
    rad = torch.atan2(vec[:, 1], vec[:, 0])
    deg = torch.rad2deg(rad) % 360
    return deg

def mean_angular_error(preds_deg, targets_deg):
    diff = torch.abs(preds_deg - targets_deg)
    return torch.min(diff, 360 - diff).mean().item()

def train_epoch(model, loader, optimizer):
    model.train()
    total_loss = 0
    for imgs, targets in tqdm(loader, leave=False):
        imgs, targets = imgs.cuda(), targets.cuda()
        optimizer.zero_grad()
        preds = model(imgs)
        loss = cosine_vector_loss(preds, targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        total_loss += loss.item() * imgs.size(0)
    return total_loss / len(loader.dataset)

def evaluate(model, loader):
    model.eval()
    preds_all, targets_all = [], []
    with torch.no_grad():
        for imgs, targets in loader:
            imgs = imgs.cuda()
            preds = model(imgs)
            preds_deg = vec_to_angle_deg(preds).cpu()
            targets_deg = vec_to_angle_deg(targets).cpu()
            preds_all.append(preds_deg)
            targets_all.append(targets_deg)
    preds_all = torch.cat(preds_all)
    targets_all = torch.cat(targets_all)
    return mean_angular_error(preds_all, targets_all)

model = ViTRegressor().cuda()
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=25, eta_min=1e-6)

best_maae = float('inf')
patience, patience_counter = 5, 0

for epoch in range(25):
    train_loss = train_epoch(model, train_loader, optimizer)
    val_maae = evaluate(model, val_loader)
    scheduler.step()

    print(f"\nEpoch {epoch+1}: Train Loss = {train_loss:.4f}, Val MAAE = {val_maae:.4f}")

    if val_maae < best_maae:
        best_maae = val_maae
        torch.save(model.state_dict(), 'best_model.pt')
        patience_counter = 0
        print(f"✅ Saved new best model at epoch {epoch+1} with MAAE = {val_maae:.4f}")
    else:
        patience_counter += 1
        print(f"⏸️ No improvement. Patience counter: {patience_counter}/{patience}")

    if patience_counter >= patience:
        print(f"🛑 Early stopping at epoch {epoch+1}")
        break

model.load_state_dict(torch.load('best_model.pt'))

def generate_submission(model, val_df, val_dir, test_dir, transform, output_csv='2022101108_1.csv'):
    model.eval()
    ids, angles = [], []

    for i in tqdm(range(len(val_df)), desc="Predicting Validation"):
        filename = val_df.iloc[i]['filename']
        img_path = os.path.join(val_dir, filename)

        image = Image.open(img_path).convert('RGB')
        image = transform(image).unsqueeze(0).cuda()

        with torch.no_grad():
            pred_vec = model(image)
            pred_vec = nn.functional.normalize(pred_vec, dim=1)
            angle_rad = torch.atan2(pred_vec[:, 1], pred_vec[:, 0])
            angle_deg = torch.rad2deg(angle_rad) % 360
            angle = angle_deg.item()

        ids.append(i)
        angles.append(angle)

    val_df_pred = pd.DataFrame({'id': ids, 'angle': angles})

    test_files = os.listdir(test_dir)  
    test_ids, test_angles = [], []

    for i, filename in tqdm(enumerate(test_files), total=len(test_files), desc="Predicting Test"):
        img_path = os.path.join(test_dir, filename)

        image = Image.open(img_path).convert('RGB')
        image = transform(image).unsqueeze(0).cuda()

        with torch.no_grad():
            pred_vec = model(image)
            pred_vec = nn.functional.normalize(pred_vec, dim=1)
            angle_rad = torch.atan2(pred_vec[:, 1], pred_vec[:, 0])
            angle_deg = torch.rad2deg(angle_rad) % 360
            angle = angle_deg.item()

        test_ids.append(i + len(val_df)) 
        test_angles.append(angle)

    test_df_pred = pd.DataFrame({'id': test_ids, 'angle': test_angles})

    final_df = pd.concat([val_df_pred, test_df_pred], ignore_index=True)
    final_df.to_csv(output_csv, index=False)
    print(f"✅ Submission saved to: {output_csv} with {len(final_df)} rows.")


test_img_dir = '/kaggle/input/mydataset/images_test/images_test'

generate_submission(model, val_csv, val_img_dir, test_img_dir, transform)