# 1. Packages

In [127]:
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt

import os, csv
import numpy as np

from torch.utils.data import Dataset
import torch



# 2. Constants

In [128]:
TRAIN_FILENAME = 'train.csv'
TEST_FILENAME = 'test.csv'
IMAGE_DIRECTORY = "images"
DATA_BASEPATH = "../dataset"
LABELS = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '13', '14', '15', '16', '17', '18', '19']
NUM_OF_LABELS = len(LABELS)
RESIZED_SIZE = 320 # All images are at most 320x320 pixels

# 3. Data Ingestion

In [129]:
def append_none_to_caption(df: pd.DataFrame) -> pd.DataFrame:
    """
    Append non-null entries from the 'None' column into the caption column, then drop the 'None' column.
    """
    caption_col = "Caption"
    extra_col = None
    sep = " "

    if extra_col is None:
        if None in df.columns:
            extra_col = None
        elif 'None' in df.columns:
            extra_col = 'None'
        else:
            raise KeyError("Could not find a column named None or 'None' in your DataFrame.")

    # make sure captions are strings
    df[caption_col] = df[caption_col].astype(str)

    # build a Series of the extra text, safely capturing sep in the lambda’s default
    extras = df[extra_col].apply(
        lambda val, sep=sep: (
            '' if pd.isna(val)
            else sep.join(str(item).strip() for item in (val if isinstance(val, (list, tuple)) else [val]))
        )
    )

    # only append where there actually is some extra text
    mask = extras.ne('')
    df.loc[mask, caption_col] = df.loc[mask, caption_col] + sep + extras[mask]

    return df.drop(columns=[extra_col])

def extract_df(filename):
    path = os.path.join(DATA_BASEPATH, filename)
    with open(path, newline="", encoding="utf-8") as csvfile:
        reader = csv.DictReader(csvfile, delimiter=",", quotechar='"', escapechar="\\")
        df = pd.DataFrame(reader)

    return append_none_to_caption(df)

train_data = extract_df(TRAIN_FILENAME)
test_data = extract_df(TEST_FILENAME)

# 4. Data Transformation

In [130]:
def create_label_to_index_dictionary():
    """
    A dictionary which transforms labels from [1, 19] to an index used for one-hot encoding
    """
    dictionary = {}
    index = 0
    for label in LABELS:
        dictionary[label] = index
        index += 1
    
    return dictionary

label_to_index_dict = create_label_to_index_dictionary()

In [131]:
def letterbox_resize(img, target_size: int = RESIZED_SIZE, pad_color=(0, 0, 0)) -> Image.Image:
    """
    Resize a PIL image so the longer side equals `target_size`, keeping aspect ratio,
    then pad the shorter side with `pad_color` to make a square image of size target_sizextarget_size.
    
    Args:
        img (PIL.Image): input image
        target_size (int): final square side length (e.g. 320)
        pad_color (tuple/int): RGB or grayscale pad value
    
    Returns:
        PIL.Image: 320x320 letter-boxed image
    """
    w, h = img.size
    
    # ---- 1. Compute new size that preserves aspect ratio ---- #
    if w > h:
        new_w = target_size
        new_h = int(round(h * target_size / w))
    else:
        new_h = target_size
        new_w = int(round(w * target_size / h))
    
    # ---- 2. Resize with high-quality interpolation ---- #
    img = img.resize((new_w, new_h), Image.BILINEAR)
    
    # ---- 3. Create padded canvas and paste resized image ---- #
    pad_w = target_size - new_w
    pad_h = target_size - new_h
    
    # Center the image (optional: randomize offsets for extra augmentation)
    left   = pad_w // 2
    top    = pad_h // 2

    if img.mode == "RGB":
        pad_color = pad_color if isinstance(pad_color, tuple) else (pad_color,)*3
    else:
        pad_color = pad_color if isinstance(pad_color, int) else pad_color[0]
    
    new_img = Image.new(img.mode, (target_size, target_size), pad_color)
    new_img.paste(img, (left, top))
    
    return new_img


In [132]:
class ImageRecord:

    def __init__(self, id, image, caption, labels):
        """
        Collects and stores relevant information about each data entry.
        Attributes:
            - id, retrieved from the filename
            - image, a PIL image best used for displaying
            - label, a string containing labels. If there is more than one, separated by a space
            - captions, a string containing a description of the image
        """
        self.id = id
        # self.image  = image 
        self.image = letterbox_resize(image) # Reshapes original image to 320x320
        self.caption = caption
        self.label  = labels

        if labels != None:
            self.one_hot_encode = self.create_one_hot_vector() # Transforms label into one_hot_encoding
    
    def display_data(self):
        plt.imshow(self.image)
        plt.axis('off')      
        plt.title(f"ImageID: {self.id} Label: {self.label}\n{self.caption}", wrap=True)
        plt.show()

    def get_filename(self):
        return f"{self.id}.jpg"
    
    def create_one_hot_vector(self):
        labels = self.label.split(" ")

        one_hot_encode = np.zeros(NUM_OF_LABELS)
        for label in labels:
            one_hot_encode[label_to_index_dict.get(label)] = 1  
        
        return one_hot_encode

def retrieve_image(filename):
    """
    Returns an image from a given filename
    """
    return Image.open(f'{DATA_BASEPATH}/{IMAGE_DIRECTORY}/{filename}')

def create_all_samples(dataframe, is_training=True):
    """
    Uses given dataframe to generate all samples which contain an id, image, label and caption.

    Returns a list of samples
    """
    samples = []
    label = None

    for index, row in dataframe.iterrows():
        filename = row["ImageID"]
        image = retrieve_image(filename)
        caption = row["Caption"]

        if is_training:

            label = row["Labels"]

        sample = ImageRecord(filename[:-4], image, caption, label)
        samples.append(sample)
    
    return samples

train_samples = create_all_samples(train_data)
# test_samples = create_all_samples(test_data, is_training=False)

# 5. Resnet Model

### 5.1 Download the Model

In [133]:
# import pathlib, urllib.request
# from torchvision.models import ResNet50_Weights

# url  = ResNet50_Weights.DEFAULT.url     # gives the same V2 link :contentReference[oaicite:2]{index=2}
# path = pathlib.Path("checkpoints/resnet50_v2.pth")
# path.parent.mkdir(parents=True, exist_ok=True)

# urllib.request.urlretrieve(url, path)   # 97 MB download


### 5.2 Loading the Model

In [134]:
# from torchvision.models import resnet50
# import torch.nn as nn
# import torch
                             
# model = resnet50(weights=None)              
# model.fc = nn.Linear(model.fc.in_features, NUM_OF_LABELS)  # new head
# model.load_state_dict(torch.load("checkpoints/resnet50_v2.pth"), strict=False)



In [135]:
import torch.hub
import torch, torch.nn as nn
from torchvision.models import resnet50
from torchvision import transforms

torch.hub.set_dir("checkpoints") # Sets pytorch to load from checkpoints directory

state_path = "checkpoints/resnet50_v2.pth"      # wherever you saved it
model = resnet50(weights=None) # build a vanilla ResNet-50, no automatic weights

# bring in the state_dict
state = torch.load(state_path, map_location="cpu")
model.load_state_dict(state)                    # all 1000-class params

IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD  = (0.229, 0.224, 0.225)

# ───── training (add lightweight augmentation if you want) ─────
train_tfms = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),   # optional
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])

# ───── validation / test ─────
val_tfms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])


In [138]:
from torch.utils.data import DataLoader
from dataset import MultiLabelDataset


train_ds = MultiLabelDataset(train_samples, transform=train_tfms, cache=False)
# val_ds   = MultiLabelDataset(val_samples,   transform=val_tfms,   cache=False)

train_loader = DataLoader(
    train_ds, batch_size=32, shuffle=True,
    num_workers=4, pin_memory=True
)


# val_loader = DataLoader(
#     val_ds, batch_size=32, shuffle=False,
#     num_workers=4, pin_memory=True
# )

In [139]:
import torch, torch.nn as nn, torch.optim as optim
from sklearn.metrics import f1_score
from pathlib import Path


# ── hyper-params ─────────────────────────────────────────────
EPOCH_WARM   = 5                 # head-only
EPOCH_FINE   = 20                # full network
LR_HEAD      = 1e-3              # warm-up lr
LR_BACKBONE  = 1e-4              # fine-tune lr
WEIGHT_DECAY = 1e-4
BEST_PATH    = Path("best_model.pth")

# ── loss ─────────────────────────────────────────────────────
criterion = nn.BCEWithLogitsLoss()

# ── split params into head vs backbone ───────────────────────
head_params     = list(model.fc.parameters())
backbone_params = [p for n, p in model.named_parameters() if "fc" not in n]

# freeze backbone for warm-up
for p in backbone_params:
    p.requires_grad = False

optimizer = optim.AdamW(
    [
        {"params": head_params,     "lr": LR_HEAD},
        {"params": backbone_params, "lr": 0.0},       # frozen
    ],
    weight_decay=WEIGHT_DECAY,
)

# cosine schedule for the fine-tune phase
scheduler = optim.lr_scheduler.CosineAnnealingLR(
    optimizer, T_max=EPOCH_FINE, eta_min=1e-6
)

# ── main loop ────────────────────────────────────────────────
best_micro = 0.0
total_epochs = EPOCH_WARM + EPOCH_FINE
model.to(device)

for epoch in range(total_epochs):
    # ---------- Train ----------
    model.train()
    running = 0.0
    for imgs, labels in train_loader:
        imgs, labels = imgs.to(device), labels.to(device)

        logits = model(imgs)
        loss   = criterion(logits, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running += loss.item() * imgs.size(0)

    # ---------- Un-freeze backbone after warm-up ----------
    if epoch + 1 == EPOCH_WARM:
        print("🔓  Unfreezing backbone …")
        for p in backbone_params:
            p.requires_grad = True
        for g in optimizer.param_groups:
            g["lr"] = LR_BACKBONE      # set lr for all groups

    # ---------- Validate ----------
    model.eval()
    preds, trues = [], []
    with torch.no_grad():
        for imgs, labels in val_loader:
            logits = model(imgs.to(device))
            probs  = torch.sigmoid(logits).cpu()          # [B,19]
            preds.append((probs >= 0.5).float())
            trues.append(labels)
    preds = torch.cat(preds)
    trues = torch.cat(trues)

    micro_f1 = f1_score(trues, preds, average="micro", zero_division=0)
    macro_f1 = f1_score(trues, preds, average="macro", zero_division=0)

    print(
        f"Epoch {epoch+1:02}/{total_epochs} | "
        f"Train loss {running/len(train_loader.dataset):.4f} | "
        f"micro-F1 {micro_f1:.3f} | macro-F1 {macro_f1:.3f}"
    )

    # ---------- Save best ----------
    if micro_f1 > best_micro:
        best_micro = micro_f1
        torch.save(model.state_dict(), BEST_PATH)
        print(f"  ✅  New best saved to {BEST_PATH}")

    # LR scheduler (only after un-freezing)
    if epoch + 1 > EPOCH_WARM:
        scheduler.step()

print(f"\n🏁  Training finished. Best micro-F1 = {best_micro:.3f}")


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/miniconda3/envs/A2_COMP4329/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/miniconda3/envs/A2_COMP4329/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'ImageRecord' on <module '__main__' (built-in)>


KeyboardInterrupt: 