<a href="https://colab.research.google.com/github/Romeo-the-rebel/COS711-Assignment-3/blob/main/COS711_Assignment_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# cell 1: essentials
import os, glob, math
import pandas as pd
import numpy as np
from pathlib import Path

# optional: use astropy for accurate sky distances (recommended)
try:
    from astropy.coordinates import SkyCoord
    import astropy.units as u
    ASTROPY_AVAILABLE = True
except Exception:
    ASTROPY_AVAILABLE = False

# paths (adjust if needed)
labels_csv = "labels.csv"    # provided labels file (from assignment zip)
test_csv   = "test.csv"
typ_dir    = "data/typical"
exo_dir    = "data/exotic"
unl_dir    = "data/unlabeled"

# helper to parse filenames like "RA_DEC_...png" (the actual separator may vary)
def parse_coords_from_filename(fname):
    # adapt to your filename pattern; this is a robust attempt
    base = Path(fname).stem
    parts = base.replace(",", "_").split("_")
    # find two tokens that parse as floats
    floats = []
    for p in parts:
        try:
            floats.append(float(p))
            if len(floats)==2:
                return floats[0], floats[1]
        except:
            continue
    return None, None

# load labels
labels_df = pd.read_csv(labels_csv)
# inspect
labels_df.head()


In [None]:
# cell 2: map images -> nearest label (using astropy if available)
def nearest_label_for_image(img_path, labels_df):
    ra_img, dec_img = parse_coords_from_filename(img_path)
    if ra_img is None:
        return None
    if ASTROPY_AVAILABLE:
        c_img = SkyCoord(ra=ra_img*u.deg, dec=dec_img*u.deg, unit=(u.deg,u.deg))
        c_labels = SkyCoord(ra=labels_df['ra'].values*u.deg, dec=labels_df['dec'].values*u.deg, unit=(u.deg,u.deg))
        sep = c_img.separation(c_labels).arcsec  # angular separation in arcsec
        idx = np.argmin(sep)
        return labels_df.iloc[idx]
    else:
        # fallback: Euclidean distance in coordinate space (works roughly if coords in degrees)
        coords = labels_df[['ra','dec']].values
        dists = np.sqrt((coords[:,0]-ra_img)**2 + (coords[:,1]-dec_img)**2)
        idx = np.argmin(dists)
        return labels_df.iloc[idx]

# example: map typical dir
typ_files = glob.glob(os.path.join(typ_dir, "*"))
mapped = []
for f in typ_files[:200]:   # don't iterate all now if large -- just test
    lab = nearest_label_for_image(f, labels_df)
    mapped.append((f, lab['label'] if lab is not None else None))

mapped[:10]


In [None]:
# cell: PyTorch dataloader skeleton
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms as T

# define the set of classes you'll predict (example; adapt to real labels)
CLASSES = ["Point", "FRI", "FRII", "Bent", "XRG", "ZRG", "ShouldBeDiscarded", "Other", "Exotic"]
class_to_idx = {c:i for i,c in enumerate(CLASSES)}

def labels_to_multihot(label_list):
    mh = np.zeros(len(CLASSES), dtype=np.float32)
    for lab in label_list:
        if lab in class_to_idx:
            mh[class_to_idx[lab]] = 1.0
    return mh

train_transforms = T.Compose([
    T.Resize((224,224)),
    T.RandomRotation(30),
    T.RandomHorizontalFlip(),
    T.RandomVerticalFlip(),
    T.ToTensor(),
    # do normalization if using a pretrained model expecting ImageNet stats
    T.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

class RadioDataset(Dataset):
    def __init__(self, rows, transform=None):
        # rows: list of (img_path, [list_of_labels])
        self.rows = rows
        self.transform = transform
    def __len__(self):
        return len(self.rows)
    def __getitem__(self, idx):
        p, labs = self.rows[idx]
        img = Image.open(p).convert("RGB")
        if self.transform:
            img = self.transform(img)
        label = torch.tensor(labels_to_multihot(labs))
        return img, label

# Example instantiation:
# train_rows = [(path, ["FRI","Bent"]), ...]
# ds = RadioDataset(train_rows, transform=train_transforms)
# dl = DataLoader(ds, batch_size=16, shuffle=True, num_workers=4)


In [None]:
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim

def get_model(num_classes=len(CLASSES), backbone="resnet50", pretrained=True):
    if backbone=="resnet50":
        m = models.resnet50(pretrained=pretrained)
        nfeats = m.fc.in_features
        m.fc = nn.Linear(nfeats, num_classes)
    else:
        # swap to EfficientNet, etc., as needed
        m = models.resnet18(pretrained=pretrained)
        nfeats = m.fc.in_features
        m.fc = nn.Linear(nfeats, num_classes)
    return m

model = get_model()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.BCEWithLogitsLoss()  # for multi-label
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)


In [None]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0.0
    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        out = model(imgs)
        loss = criterion(out, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * imgs.size(0)
    return total_loss / len(loader.dataset)
