In [1]:
#Import libraries
from pathlib import Path
import re, json
import numpy as np
import pandas as pd
import torch
from collections import Counter
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
#Paths (Identical to Data_Exploration_Preprocessing.ipynb)
RAW = Path('../data/raw/fashion-dataset')
PROCESSED = Path('../data/processed')
styles_csv = RAW / 'styles.csv'
images_dir = RAW / 'images'
PROCESSED.mkdir(parents=True, exist_ok=True)

if not styles_csv.exists():
    raise FileNotFoundError(f"Run the download script first. Missing: {styles_csv}")
if not images_dir.exists():
    raise FileNotFoundError(f"Images folder not found at: {images_dir}")

In [3]:
#Load raw CSV
df = pd.read_csv(styles_csv, on_bad_lines='skip')
print("Rows:", len(df))

sample_df = df.sample(n=16, random_state=42)

Rows: 44424


In [4]:
#Drop rows with missing articleType
df = df.dropna(subset=["articleType"])

#Keep only rows where the image file exists
df["image_path"] = df["id"].apply(lambda x: images_dir / f"{x}.jpg")
df = df[df["image_path"].apply(lambda x: x.exists())].reset_index(drop=True)
print(f"After dropping missing/invalid images: {len(df)} rows")

#Remove rare classes
MIN_SAMPLES = 9
class_counts = df["articleType"].value_counts()
valid_classes = class_counts[class_counts >= MIN_SAMPLES].index
df = df[df["articleType"].isin(valid_classes)].reset_index(drop=True)
print(f"Remaining dataset size: {len(df)}")
print(f"Remaining number of classes: {df['articleType'].nunique()}")

After dropping missing/invalid images: 44419 rows
Remaining dataset size: 44283
Remaining number of classes: 109


In [5]:
#Transformations as planned in previous notebook
train_transformations = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=5),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.05),
    transforms.RandomAffine(degrees=0, translate=(0.05, 0.05)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [6]:
#Label encoding + mapping - uses saved mappings from previous notebook/json if found
label_mapping_path = PROCESSED / "label_mapping.json"
if label_mapping_path.exists():
    with open(label_mapping_path, "r") as f:
        label_mapping = json.load(f)
else:
    #Builds mapping from current filtered df
    classes = sorted(df["articleType"].unique())
    label_mapping = {cls: i for i, cls in enumerate(classes)}
    with open(label_mapping_path, "w") as f:
        json.dump(label_mapping, f, indent=2)
    print("Saved:", label_mapping_path)

df["label_id"] = df["articleType"].map(label_mapping).astype(int)
print("Number of classes:", len(label_mapping))

# Cleans text
def clean_text(text):
    text = str(text).lower()
    text = re.sub(r"[^a-z0-9\\s]", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

df["clean_name"] = df["productDisplayName"].apply(clean_text)
print("Final dataset size:", len(df))

Number of classes: 109
Final dataset size: 44283


In [7]:
# Dataset class
class FashionDataset(Dataset):
    def __init__(self, dataframe, image_transform=None, text_transform=None):
        self.df = dataframe.reset_index(drop=True)
        self.image_transform = image_transform
        self.text_transform = text_transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img = Image.open(row["image_path"]).convert("RGB")
        if self.image_transform:
            img = self.image_transform(img)
        text = row["clean_name"]
        if self.text_transform:
            text = self.text_transform(text)
        label = int(row["label_id"])
        return img, text, label 

In [8]:
#Train/Val/Test split (use saved split if present)
split_indices_path = PROCESSED / "split_indices.json"
if split_indices_path.exists():
    with open(split_indices_path, "r") as f:
        split_indices = json.load(f)
    train_index = split_indices["train_index"]
    val_index   = split_indices["val_index"]
    test_index  = split_indices["test_index"]
else:
    from sklearn.model_selection import StratifiedShuffleSplit
    RANDOM_STATE = 42
    split1 = StratifiedShuffleSplit(n_splits=1, test_size=0.30, random_state=RANDOM_STATE)
    train_index, temp_index = next(split1.split(df, df["label_id"]))

    df_train = df.iloc[train_index].reset_index(drop=True)
    df_temp  = df.iloc[temp_index].reset_index(drop=True)

    split2 = StratifiedShuffleSplit(n_splits=1, test_size=0.50, random_state=RANDOM_STATE)
    val_index_rel, test_index_rel = next(split2.split(df_temp, df_temp["label_id"]))
    # map relative indices back to original df indices
    val_index  = np.array(temp_index)[val_index_rel].tolist()
    test_index = np.array(temp_index)[test_index_rel].tolist()

    split_indices = {
        "train_index": list(map(int, train_index)),
        "val_index":   list(map(int, val_index)),
        "test_index":  list(map(int, test_index)),
    }
    with open(split_indices_path, "w") as f:
        json.dump(split_indices, f, indent=2)
    print("Saved split indices:", split_indices_path)

# Build split DataFrames
train_df = df.iloc[train_index].reset_index(drop=True)
val_df   = df.iloc[val_index].reset_index(drop=True)
test_df  = df.iloc[test_index].reset_index(drop=True)

In [9]:
# Dataloaders
BATCH_SIZE = 32
NUM_WORKERS = 0

train_dataset = FashionDataset(train_df, image_transform=train_transformations)
val_dataset   = FashionDataset(val_df,   image_transform=val_transforms)
test_dataset  = FashionDataset(test_df,  image_transform=val_transforms)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,  num_workers=NUM_WORKERS)
val_loader   = DataLoader(val_dataset,   batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
test_loader  = DataLoader(test_dataset,  batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

In [10]:
#Class weights
class_weights_path = PROCESSED / "class_weights.pt"
if class_weights_path.exists():
    class_weights = torch.load(class_weights_path, map_location="cpu")
else:
    counts = Counter(df["label_id"])
    num_classes = len(counts)
    total_samples = sum(counts.values())
    class_weights = torch.tensor(
        [total_samples / (num_classes * counts[i]) for i in range(num_classes)],
        dtype=torch.float
    )
    torch.save(class_weights, class_weights_path)
    print("Saved class weights:", class_weights_path)

In [11]:
#Sanity check batch
imgs, texts, labels = next(iter(train_loader))
print("== Step 1 ready ==")
print(f"Classes: {len(label_mapping)}")
print(f"Train/Val/Test sizes: {len(train_dataset)} / {len(val_dataset)} / {len(test_dataset)}")
print("Batch:", tuple(imgs.shape), "| Example text:", texts[0][:60] if isinstance(texts[0], str) else texts[0], "| Label:", int(labels[0]))

#Make variables global for later use
globals().update(dict(
    df=df, train_df=train_df, val_df=val_df, test_df=test_df,
    train_loader=train_loader, val_loader=val_loader, test_loader=test_loader,
    class_weights=class_weights, label_mapping=label_mapping,
    train_transformations=train_transformations, val_transforms=val_transforms,
    FashionDataset=FashionDataset
))

== Step 1 ready ==
Classes: 109
Train/Val/Test sizes: 30998 / 6642 / 6643
Batch: (32, 3, 224, 224) | Example text: jealous 21 women white flower top purple stripes top | Label: 96


In [27]:
import torch
import torch.nn as nn
import torch.nn.functional as F

#Small utility: parameter count
def count_params(m):
    return sum(p.numel() for p in m.parameters() if p.requires_grad)

#Basic convolutional block: Conv2d -> BN -> ReLU -> MaxPool
class ConvBNReLU(nn.Module):
    #inchannels: number of input channels
    #outchannels: number of filters in block output
    #pooling: True to downsample, False to not downsample
    #p_drop: probability of dropout layers
    def __init__(self, in_channels, out_channels, pooling=False, p_drop=0.0):
        super().__init__()
        #Adding the blocks
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn = nn.BatchNorm2d(out_channels)
        self.act = nn.ReLU(inplace=True)
        self.pool = nn.MaxPool2d(2) if pooling else nn.Identity()
        self.drop = nn.Dropout2d(p_drop) if p_drop > 0 else nn.Identity()
    
    #Defines the computation for an input tensor x of shape (B, in_channels, H, W).
    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.act(x)
        x = self.pool(x)
        x = self.drop(x)
        return x

class ImageEncoderCNN(nn.Module):
    def __init__(self, feature_dim=256, in_channels=3, dropout=0.1):
        super().__init__()
        self.stem = nn.Sequential(
            ConvBNReLU(in_channels,   32, pooling=True,  p_drop=0.05),  # 224 -> 112
            ConvBNReLU(32,      64, pooling=True,  p_drop=0.05),  # 112 -> 56
            ConvBNReLU(64,     128, pooling=True,  p_drop=0.05),  # 56  -> 28
            ConvBNReLU(128,    256, pooling=True,  p_drop=0.05),  # 28  -> 14
        )
        self.gap = nn.AdaptiveAvgPool2d(1)
        self.fc  = nn.Linear(256, feature_dim)
        self.bn  = nn.BatchNorm1d(feature_dim)
        self.act = nn.ReLU(inplace=True)
        self.drop = nn.Dropout(dropout)
        self.apply(self._init_weights)
        
    @staticmethod
    def _init_weights(m):
        if isinstance(m, nn.Conv2d):
            nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
        elif isinstance(m, (nn.BatchNorm2d, nn.BatchNorm1d)):
            nn.init.ones_(m.weight); nn.init.zeros_(m.bias)
        elif isinstance(m, nn.Linear):
            nn.init.trunc_normal_(m.weight, std=0.02)
            nn.init.zeros_(m.bias)

    def forward(self, x):
        x = self.stem(x)
        x = self.gap(x).flatten(1)  # (B, 256)
        x = self.fc(x)              # (B, feature_dim)
        x = self.bn(x)
        x = self.act(x)
        x = self.drop(x)
        return x  # feature vector

class ImageClassifierCNN(nn.Module):
    def __init__(self, num_classes, feature_dim=256):
        super().__init__()
        #Initialise an encoder
        self.encoder = ImageEncoderCNN(feature_dim=feature_dim)
        self.head = nn.Linear(feature_dim, num_classes)
        
    @property
    def feature_dim(self):
        return self.head.in_features

    def forward(self, x, return_features=False):
        z = self.encoder(x)            # (B, feature_dim)
        logits = self.head(z)          # (B, num_classes)
        if return_features:
            return logits, z
        return logits

In [28]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_classes = len(label_mapping)
model = ImageClassifierCNN(num_classes=num_classes, feature_dim=256).to(device)

print(f"ImageEncoder params: {count_params(model.encoder):,}")
print(f"Total model params:  {count_params(model):,}")

# Dry forward on a mini-batch to verify shapes
xb, _, yb = next(iter(train_loader))
xb = xb.to(device)
with torch.no_grad():
    logits, feats = model(xb, return_features=True)
print("Logits shape:", tuple(logits.shape))   # (B, num_classes)
print("Features shape:", tuple(feats.shape))  # (B, 256)

ImageEncoder params: 455,200
Total model params:  483,213
Logits shape: (32, 109)
Features shape: (32, 256)


In [29]:
# ==== OPTIONAL: quick smoke test for the CNN (a few batches) ====
import torch
from torch.optim import AdamW
from torch.cuda.amp import autocast, GradScaler

device = next(model.parameters()).device
criterion = nn.CrossEntropyLoss(weight=class_weights.to(device) if class_weights is not None else None)
optimizer = AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)
scaler = GradScaler(enabled=torch.cuda.is_available())

model.train()
max_batches = 100  # keep it short; raise if you want
running_loss, correct, seen = 0.0, 0, 0

for b, (xb, _, yb) in enumerate(train_loader, 1):
    xb, yb = xb.to(device), yb.to(device)
    optimizer.zero_grad(set_to_none=True)
    with autocast(enabled=torch.cuda.is_available()):
        logits = model(xb)
        loss = criterion(logits, yb)
    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

    running_loss += loss.item() * xb.size(0)
    preds = logits.argmax(dim=1)
    correct += (preds == yb).sum().item()
    seen += xb.size(0)

    if b % 20 == 0 or b == max_batches:
        print(f"[batch {b}] loss={(running_loss/seen):.4f} acc={(correct/seen)*100:.2f}%")

    if b >= max_batches:
        break

# quick val pass
model.eval()
val_correct, val_seen, val_loss = 0, 0, 0.0
with torch.no_grad():
    for xb, _, yb in val_loader:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb)
        val_loss += criterion(logits, yb).item() * xb.size(0)
        val_correct += (logits.argmax(dim=1) == yb).sum().item()
        val_seen += xb.size(0)

print(f"VAL: loss={(val_loss/val_seen):.4f} acc={(val_correct/val_seen)*100:.2f}%")

[batch 20] loss=4.7692 acc=2.81%
[batch 40] loss=4.7137 acc=4.30%
[batch 60] loss=4.6832 acc=4.58%
[batch 80] loss=4.6515 acc=5.47%
[batch 100] loss=4.6185 acc=5.56%
VAL: loss=4.5704 acc=4.56%
