In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [15]:
import os
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms, models
from PIL import Image
import numpy as np
from tqdm import tqdm

###############################################################################
# 1. CONFIGURATION
###############################################################################
DATA_PKL     = "/kaggle/input/image-angle-pred-uhh-temp-yay-maybe/AngleOfPerson_20250331_042254.pkl"  # <-- Replace with your path
DEVICE       = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE   = 100
EPOCHS       = 20
LEARNING_RATE = 0.004
TRAIN_SPLIT  = 0.8
# Basic image transforms
IMAGE_TRANSFORM = transforms.Compose([
    transforms.Resize((224, 224)), 
    transforms.ToTensor(),
    # If using a pretrained ResNet, you typically want normalization:
    # transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

print(f"Using device: {DEVICE}")

Using device: cuda


In [16]:

###############################################################################
# 2. DATASET
###############################################################################
class ImageAngleDataset(Dataset):
    """
    Dataset that returns:
      - image tensor
      - angle (float) as a label
    """
    def __init__(self, data_list, transform=None):
        """
        data_list: List of (image_path, angle)
        transform: TorchVision transforms for images
        """
        self.data_list = []
        self.transform = transform
        print("Creating dataset!")
        for item_a, item_b in tqdm(data_list):
            modified_item_a = Image.fromarray(item_a)
            if self.transform:
                modified_item_a = self.transform(modified_item_a)#.to(DEVICE)
            
            modified_item_b = torch.tensor(item_b, dtype=torch.float32).unsqueeze(0)#.to(DEVICE)
            self.data_list.append((modified_item_a, modified_item_b))

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        return self.data_list[idx]
        # Load image
        image = Image.fromarray(image_path)
        # image = torch.from_numpy(np.transpose(image, (2,0,1))).float()
        if self.transform:
            image = self.transform(image)
        # Convert angle to float tensor [1,]
        angle_tensor = torch.tensor(angle, dtype=torch.float32).unsqueeze(0)
        return image, angle_tensor

In [17]:
###############################################################################
# 3. MODEL: Simple ResNet-based regressor
###############################################################################
class ImageRegressor(nn.Module):
    def __init__(self, pretrained=True):
        """
        If pretrained=True, uses pretrained ImageNet weights.
        If pretrained=False, initializes from scratch.
        """
        super().__init__()
        # Use a ResNet18 as the backbone
        if pretrained:
            backbone = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        else:
            backbone = models.resnet18(weights=None)

        # Remove the final classification layer
        num_feats = backbone.fc.in_features
        backbone.fc = nn.Identity()

        self.backbone = backbone
        # Final linear to produce 1 output (angle)
        #       But with an extra layer in between to smoothen the process
        self.fc_before = nn.Linear(num_feats, 32)
        self.fc = nn.Linear(32, 1)
        # self.handle_angle = torch.nn.Hardtanh()

    def forward(self, x):
        # x: [batch_size, 3, H, W]
        features = self.backbone(x)   # [batch_size, 512] for ResNet18
        before_out = self.fc_before(features)
        out = self.fc(before_out)       # [batch_size, 1]
        # https://discuss.pytorch.org/t/custom-loss-function-for-discontinuous-angle-calculation/58579/5
        # out_wrapped = torch.nn.Hardtanh()
        return out

In [18]:
###############################################################################
# 4. TRAINING & TESTING
###############################################################################
def circular_error(pred, actual):
    """
    Computes the circular error (in degrees) between predicted and actual angles.
    The error is defined as the minimum of the absolute difference and 360 minus that difference.
    """
    diff = abs(pred - actual)
    return diff if diff <= 180 else 360 - diff

def train_model(model, train_loader, epochs=10, lr=1e-3):
    model.to(DEVICE)
    criterion = nn.MSELoss().to(DEVICE)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    

    for epoch in range(epochs):
        checkpoint_path = f"checkpoint_angle_pred_images_TBD_epoch_{epoch+1}.pth"
        model.train()
        total_loss = 0.0

        for images, angles in tqdm(train_loader):
            images = images.to(DEVICE)
            angles = angles.to(DEVICE)

            optimizer.zero_grad()
            preds = model(images)
            loss = criterion(preds, angles)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{epochs}]  Train Loss: {avg_loss:.4f}")
        if (epoch+1) % 5 == 0:
            torch.save({
                    'epoch': epoch+1,
                    'model_state_dict': model.state_dict(),
                    'loss': avg_loss,
                }, checkpoint_path)

def test_model(model, test_loader):
    model.to(DEVICE)
    model.eval()
    criterion = nn.MSELoss()
    total_loss = 0.0

    # Optional: store predictions for further analysis
    all_preds = []
    all_labels = []
    total_circular_error = 0.0
    count = 0

    with torch.no_grad():
        for images, angles in tqdm(test_loader):
            images = images.to(DEVICE)
            angles = angles.to(DEVICE)

            preds = model(images)
            loss = criterion(preds, angles)
            total_loss += loss.item()

            preds_list = preds.cpu().view(-1).tolist()
            angles_list = angles.cpu().view(-1).tolist()
            all_preds.extend(preds_list)
            all_labels.extend(angles_list)

            for p, a in zip(preds_list, angles_list):
                err = circular_error(p, a)
                total_circular_error += err
                count += 1

    avg_loss = total_loss / len(test_loader)
    avg_circular_error = total_circular_error / count if count > 0 else 0.0

    print(f"Test Loss: {avg_loss:.4f}")
    print("Sample Predictions vs Actual with Circular Error:")
    # for i in range(len(all_preds)):
    #     err = circular_error(all_preds[i], all_labels[i])
    #     print(f"  Pred: {all_preds[i]:.2f}, Actual: {all_labels[i]:.2f}, Circular Error: {err:.2f}")
    print(f"Average Circular Error: {avg_circular_error:.2f}")
    return all_preds, all_labels

In [None]:
# 1) Load data_list from .pkl
DATA_PKL = "AngleOfPerson_20250331_042254.pkl"
with open(DATA_PKL, "rb") as f:
    data_list = pickle.load(f)
print(f"Loaded {len(data_list)} samples from {DATA_PKL}")

# 2) Create dataset
dataset = ImageAngleDataset(data_list, transform=IMAGE_TRANSFORM)

# 3) Split into train/test
train_size = int(TRAIN_SPLIT * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader  = DataLoader(test_dataset,  batch_size=BATCH_SIZE, shuffle=False)

# 4) Initialize model
model = ImageRegressor(pretrained=True)
print(model)

# # 5) Train
# print("Starting Training ...")
# train_model(model, train_loader, epochs=EPOCHS, lr=LEARNING_RATE)

# # 6) Test
# print("Starting Testing ...")
# test_model(model, test_loader)

# print("Done!")

Loaded 35442 samples from AngleOfPerson_20250331_042254.pkl
Creating dataset!


 10%|▉         | 3459/35442 [00:21<04:10, 127.44it/s]

In [None]:

model = ImageRegressor(pretrained=True)
checkpoint_path = "checkpoint_angle_pred_images_35k_epoch_15.pth"

if os.path.isfile(checkpoint_path):
    print("Loading checkpoint...")
    checkpoint = torch.load(checkpoint_path, map_location=DEVICE, weights_only=True)
    model.load_state_dict(checkpoint['model_state_dict'])
    print(checkpoint['loss'])
    # optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch'] + 1
    # print(f"resuming training from epoch {start_epoch}")
    epoch = start_epoch

    all_preds, all_labels = test_model(model, test_loader)
    print("ACE result is")
    all_preds = np.array(all_preds) % 360
    all_labels = np.array(all_labels) % 360
    ACE = np.sum(abs(all_preds - all_labels)) / len(all_preds)
    print(ACE)

Loading checkpoint...
210.01887958150513


100%|██████████| 71/71 [00:28<00:00,  2.51it/s]

Test Loss: 16330.4874
Sample Predictions vs Actual with Circular Error:
Average Circular Error: 91.26
ACE result is
103.47853445004168



