In [None]:
# !pip install evaluate

In [None]:
import numpy as np
import random
from transformers import ResNetModel
from torch import nn
from torch.utils.data import Dataset
from PIL import Image
from torchvision.transforms import v2
import torch
import pandas as pd
import evaluate

In [None]:
import numpy as np
from transformers import ResNetModel
from torch import nn
from torch.utils.data import Dataset
from PIL import Image
from torchvision.transforms import v2
import torch
import pandas as pd
import evaluate
from torch.utils.data import DataLoader
import torch.optim as optim

In [182]:
# DEVICE = 'cuda'
DEVICE = torch.device("cuda")

base_path = 'archive/'
img_path = f'{base_path}Food Images/Food Images/'
cap_path = f'{base_path}Food Ingredients and Recipe Dataset with Image Name Mapping.csv'

data = pd.read_csv(cap_path)
partitions = np.load("datasets/Food_Images/food_partitions.npy", allow_pickle=True).item()
print(len(partitions["train"]))  # Access train partition
print(len(partitions["test"]))   # Access test partition
print(len(partitions["valid"])) 

10800
1351
1350


# Data Cleaning

In [183]:
dropped_indices = data[data["Title"].isna()].index  # Get indices of dropped rows
partitions['train'] = [idx for idx in partitions['train'] if idx not in dropped_indices]
print(len(partitions['train']))

10797


In [184]:
data = data.dropna(subset=["Title"])
len(data)

13496

In [185]:
import os
image_folder = f"archive/Food Images/Food Images"
# valid_images = list(set(os.listdir(image_folder)))

valid_images = list({os.path.splitext(f)[0] for f in os.listdir(image_folder)})

print(len(valid_images))
# print(valid_images[:5])
# print(data["Image_Name"].head())
      
data = data[data["Image_Name"].isin(valid_images)]

# Reset index after filtering
data = data.reset_index(drop=True)

13582


In [186]:
valid_indices = set(data.index)  # These are the indices that remain after filtering

partitions['train'] = [idx for idx in partitions['train'] if idx in valid_indices]
partitions['valid'] = [idx for idx in partitions['valid'] if idx in valid_indices]
partitions['test'] = [idx for idx in partitions['test'] if idx in valid_indices]

In [187]:
# unique_chars_1 = set("".join(data["Ingredients"].astype(str)))
# unique_chars_2 = set("".join(data["Instructions"].astype(str)))
# unique_chars_3 = set("".join(data["Image_Name"].astype(str)))
# unique_chars_4 = set("".join(data["Cleaned_Ingredients"].astype(str)))

# print(len(unique_chars_1))
# print(len(unique_chars_2))
# print(len(unique_chars_3))
# print(len(unique_chars_4))

In [188]:
import unicodedata

# Normalize and remove unwanted characters
def clean_text(text):
    text = unicodedata.normalize("NFKD", text)  # Normalize Unicode
    text = text.encode("ascii", "ignore").decode("ascii")  # Remove non-ASCII chars
    return text

# Apply cleaning to the Title column
data["Title"] = data["Title"].astype(str).apply(clean_text)

# Extract unique characters
chars = list(set("".join(data["Title"])))

# Ensure special tokens are first
chars = ['<SOS>', '<EOS>', '<PAD>'] + sorted(chars)

In [189]:
# chars = ['<SOS>', '<EOS>', '<PAD>', ' ', '!', '"', '#', '&', "'", '(', ')', ',', '-', '.', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '=', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
 
NUM_CHAR = len(chars)
idx2char = {k: v for k, v in enumerate(chars)}
char2idx = {v: k for k, v in enumerate(chars)}

TEXT_MAX_LEN = 201

In [194]:
class Data(Dataset):
    def __init__(self, data, partition):
        self.data = data
        self.partition = partition
        self.num_captions = 5
        self.max_len = TEXT_MAX_LEN
        self.img_proc = torch.nn.Sequential(
            v2.ToImage(),
            v2.ToDtype(torch.float32, scale=True),
            v2.Resize((224, 224), antialias=True),
            v2.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),)

    def __len__(self):
        return len(self.partition)
    
    def __getitem__(self, idx):
        real_idx = self.partition[idx]  # Row index in dataset
        item = self.data.iloc[real_idx]  # Get row
                
        img_name = item.Image_Name + '.jpg'
        # print(img_name)
        img = Image.open(f'{img_path}{img_name}').convert('RGB')
        img = self.img_proc(img)
        
        caption = item["Title"]
        cap_list = list(caption)

        final_list = [chars[0]]
        final_list.extend(cap_list)
        final_list.extend([chars[1]])
        gap = self.max_len - len(final_list)
        final_list.extend([chars[2]]*gap)

        missing_chars = [c for c in final_list if c not in char2idx]
        if missing_chars:
            print(f"Missing characters: {set(missing_chars)}")

        for char in missing_chars:
            if char not in char2idx:
                char2idx[char] = len(char2idx)  # Assign a new index

        cap_idx = [char2idx[i] for i in final_list]

        # return img, cap_idx
        return img, torch.tensor(cap_idx, dtype=torch.long)

In [None]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.resnet = ResNetModel.from_pretrained('microsoft/resnet-18').to(DEVICE)
        self.gru = nn.GRU(512, 512, num_layers=1)
        self.proj = nn.Linear(512, NUM_CHAR)
        self.embed = nn.Embedding(NUM_CHAR, 512)
        self.num_classes = NUM_CHAR

    def forward(self, img, captions=None, teacher_forcing_ratio=0.5):
        batch_size = img.shape[0]
        feat = self.resnet(img)
        feat = feat.pooler_output.squeeze(-1).squeeze(-1).unsqueeze(0)  # (1, batch, 512)

        start_token = torch.full((batch_size,), char2idx['<SOS>'], dtype=torch.long, device=DEVICE)
        start_embed = self.embed(start_token).unsqueeze(0)  # (1, batch, 512)

        hidden = feat
        inp = start_embed
        outputs = []

        for t in range(TEXT_MAX_LEN):  # Exclude <SOS>
            out, hidden = self.gru(inp, hidden)
            logits = self.proj(out[-1])  # (batch, NUM_CHAR)

            outputs.append(logits.unsqueeze(1))  # Store timestep output

            # Decide whether to use teacher forcing
            if captions is not None and torch.rand(1).item() < teacher_forcing_ratio:
                inp = self.embed(captions[:, t]).unsqueeze(0)  # Use ground truth token
            else:
                pred = logits.argmax(dim=1)
                inp = self.embed(pred).unsqueeze(0)  # Use model prediction

        outputs = torch.cat(outputs, dim=1)  # (batch, seq_len, NUM_CHAR)
        return outputs.permute(0, 2, 1)  # (batch, NUM_CHAR, seq_len)


In [195]:
'''A simple example to calculate loss of a single batch (size 2)'''
dataset = Data(data, partitions['train'])

img1, caption1 = next(iter(dataset))

img2, caption2 = next(iter(dataset))

caption1 = torch.tensor(caption1)
caption2 = torch.tensor(caption2)
img = torch.cat((img1.unsqueeze(0), img2.unsqueeze(0)))
caption = torch.cat((caption1.unsqueeze(0), caption2.unsqueeze(0)))
img, caption = img.to(DEVICE), caption.to(DEVICE)
model = Model().to(DEVICE)
pred = model(img)
crit = nn.CrossEntropyLoss()
loss = crit(pred, caption)
print(loss)


  caption1 = torch.tensor(caption1)
  caption2 = torch.tensor(caption2)


RuntimeError: Expected target size [2, 81], got [2, 201]

In [None]:
print("Current Device:", torch.cuda.current_device())
print("Device Name:", torch.cuda.get_device_name(torch.cuda.current_device()))

Current Device: 0
Device Name: NVIDIA GeForce GTX 1660 Ti with Max-Q Design


In [None]:
import torch

# torch.cuda.empty_cache()
# torch.cuda.ipc_collect()
# torch.cuda.reset_max_memory_allocated()
# torch.cuda.reset_max_memory_cached()
# torch.cuda.synchronize()

DEVICE = torch.device("cuda")
print("Using device:", DEVICE)

try:
    x = torch.rand(3, 3).to(DEVICE)
    print("Tensor successfully moved to:", x.device)
except Exception as e:
    print("Error:", e)


Using device: cuda
Tensor successfully moved to: cuda:0


In [196]:
'''metrics'''
bleu = evaluate.load('bleu')
meteor = evaluate.load('meteor')
rouge = evaluate.load('rouge')

reference = [['A child in a pink dress is climbing up a set of stairs in an entry way .', 'A girl going into a wooden building .']]
prediction = ['A girl goes into a wooden building .']

res_b = bleu.compute(predictions=prediction, references=reference)
res_r = rouge.compute(predictions=prediction, references=reference)
res_m = meteor.compute(predictions=prediction, references=reference)

res_b, res_r, res_m

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\Dell\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\Dell\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\Dell\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


({'bleu': 0.5946035575013605,
  'precisions': [0.875, 0.7142857142857143, 0.5, 0.4],
  'brevity_penalty': 1.0,
  'length_ratio': 1.0,
  'translation_length': 8,
  'reference_length': 8},
 {'rouge1': 0.8571428571428571,
  'rouge2': 0.6666666666666666,
  'rougeL': 0.8571428571428571,
  'rougeLsum': 0.8571428571428571},
 {'meteor': 0.864795918367347})

In [197]:
ref = [['A child is running in the campus']]
pred1 = ['A child is running']

res_b = bleu.compute(predictions=pred1, references=ref)
res_r = rouge.compute(predictions=pred1, references=ref)
res_m = meteor.compute(predictions=pred1, references=ref)

res_b, res_r, res_m

({'bleu': 0.4723665527410147,
  'precisions': [1.0, 1.0, 1.0, 1.0],
  'brevity_penalty': 0.4723665527410147,
  'length_ratio': 0.5714285714285714,
  'translation_length': 4,
  'reference_length': 7},
 {'rouge1': 0.7272727272727273,
  'rouge2': 0.6666666666666666,
  'rougeL': 0.7272727272727273,
  'rougeLsum': 0.7272727272727273},
 {'meteor': 0.5923507462686567})

In [198]:
ref = [['A child is running in the campus']]
pred1 = ['A child is']

res_b = bleu.compute(predictions=pred1, references=ref)
res_r = rouge.compute(predictions=pred1, references=ref)
res_m = meteor.compute(predictions=pred1, references=ref)

res_b, res_r, res_m

({'bleu': 0.0,
  'precisions': [1.0, 1.0, 1.0, 0.0],
  'brevity_penalty': 0.2635971381157267,
  'length_ratio': 0.42857142857142855,
  'translation_length': 3,
  'reference_length': 7},
 {'rouge1': 0.6, 'rouge2': 0.5, 'rougeL': 0.6, 'rougeLsum': 0.6},
 {'meteor': 0.44612794612794615})

In [199]:
ref = [['A child is running in the campus']]
pred1 = ['A child campus']

res_b = bleu.compute(predictions=pred1, references=ref)
res_r = rouge.compute(predictions=pred1, references=ref)
res_m = meteor.compute(predictions=pred1, references=ref)
res_m_sin = meteor.compute(predictions=pred1, references=ref, gamma=0) # no penalty by setting gamma to 0

res_b, res_r, res_m, res_m_sin

({'bleu': 0.0,
  'precisions': [1.0, 0.5, 0.0, 0.0],
  'brevity_penalty': 0.2635971381157267,
  'length_ratio': 0.42857142857142855,
  'translation_length': 3,
  'reference_length': 7},
 {'rouge1': 0.6, 'rouge2': 0.25, 'rougeL': 0.6, 'rougeLsum': 0.6},
 {'meteor': 0.3872053872053872},
 {'meteor': 0.45454545454545453})

Final metric we use for challenge 3: BLEU1, BLEU2, ROUGE-L, METEOR

In [200]:
ref = [['A child is running in the campus']]
pred1 = ['A child campus']

bleu1 = bleu.compute(predictions=pred1, references=ref, max_order=1)
bleu2 = bleu.compute(predictions=pred1, references=ref, max_order=2)
res_r = rouge.compute(predictions=pred1, references=ref)
res_m = meteor.compute(predictions=pred1, references=ref)

f"BLEU-1:{bleu1['bleu']*100:.1f}%, BLEU2:{bleu2['bleu']*100:.1f}%, ROUGE-L:{res_r['rougeL']*100:.1f}%, METEOR:{res_m['meteor']*100:.1f}%"

'BLEU-1:26.4%, BLEU2:18.6%, ROUGE-L:60.0%, METEOR:38.7%'

Now it is your turn! Try to finish the code below to run the train function

In [None]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.resnet = ResNetModel.from_pretrained('microsoft/resnet-18').to(DEVICE)
        self.gru = nn.GRU(512, 512, num_layers=1)
        self.proj = nn.Linear(512, NUM_CHAR)
        self.embed = nn.Embedding(NUM_CHAR, 512)
        self.num_classes = NUM_CHAR

    def forward(self, img, captions=None, teacher_forcing_ratio=0.5):
        batch_size = img.shape[0]
        feat = self.resnet(img)
        feat = feat.pooler_output.squeeze(-1).squeeze(-1).unsqueeze(0)  # (1, batch, 512)

        start_token = torch.full((batch_size,), char2idx['<SOS>'], dtype=torch.long, device=DEVICE)
        start_embed = self.embed(start_token).unsqueeze(0)  # (1, batch, 512)

        hidden = feat
        inp = start_embed
        outputs = []

        for t in range(TEXT_MAX_LEN):  # Exclude <SOS>
            out, hidden = self.gru(inp, hidden)
            logits = self.proj(out[-1])  # (batch, NUM_CHAR)

            outputs.append(logits.unsqueeze(1))  # Store timestep output

            # Decide whether to use teacher forcing
            if captions is not None and torch.rand(1).item() < teacher_forcing_ratio:
                inp = self.embed(captions[:, t]).unsqueeze(0)  # Use ground truth token
            else:
                pred = logits.argmax(dim=1)
                inp = self.embed(pred).unsqueeze(0)  # Use model prediction

        outputs = torch.cat(outputs, dim=1)  # (batch, seq_len, NUM_CHAR)
        return outputs.permute(0, 2, 1)  # (batch, NUM_CHAR, seq_len)


# Training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import StepLR
import os

import wandb

def decode_caption(indices, vocab):
    return ''.join([vocab[idx] if idx < len(vocab) else '<UNK>' for idx in indices if idx not in [0]])

def clean_text(text):
    """Removes padding and special tokens, then strips whitespace."""
    return text.replace("<PAD>", "").replace("<EOS>", "").strip()

def is_empty_prediction(pred_list):
    """Checks if any cleaned prediction is empty."""
    return any(len(clean_text(pred)) == 0 for pred in pred_list)

def train(EPOCHS, batch_size=16, patience=5, teacher_forcing_ratio=0.5):
    
    wandb.init(project="captioning-model", config={
        "epochs": EPOCHS,
        "batch_size": batch_size,
        "learning_rate": 1e-3,
        "teacher_forcing_ratio": teacher_forcing_ratio
    })

    data_train = Data(data, partitions['train'])
    data_valid = Data(data, partitions['valid'])
    
    dataloader_train = DataLoader(data_train, batch_size=batch_size, shuffle=True, num_workers=0)
    dataloader_valid = DataLoader(data_valid, batch_size=batch_size, shuffle=False, num_workers=0)
    
    print("DataLoader process is finished")
    
    model = Model().to(DEVICE)
    # model = Model(mode="word").to(DEVICE)

    # optimizer = optim.Adam(model.parameters(), lr=1e-3)
    optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3)

    scheduler = StepLR(optimizer, step_size=10, gamma=0.1)  # Learning rate decay
    crit = nn.CrossEntropyLoss()
    
    best_val_loss = float('inf')
    patience_counter = 0
    os.makedirs("models/text_representation", exist_ok=True)
    
    for epoch in range(EPOCHS):
        print(f"Starting epoch {epoch+1}")
        model.train()
        train_loss, train_acc = train_one_epoch(model, optimizer, crit, dataloader_train, teacher_forcing_ratio)
        
        print(f'Epoch {epoch+1}/{EPOCHS} - Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}')
        scheduler.step()

        model.eval()
        valid_loss, bleu1_score, bleu2_score, rouge_score, meteor_score = eval_epoch(model, crit, dataloader_valid)
        print(f'Validation Loss: {valid_loss:.4f}')

        # Log metrics to wandb
        wandb.log({
            # "Epoch": epoch + 1,
            "Train Loss": train_loss,
            "Validation Loss": valid_loss,
            "BLEU-1": bleu1_score,
            "BLEU-2": bleu2_score,
            "ROUGE-L": rouge_score,
            "METEOR": meteor_score
        })

        torch.save(model.state_dict(), f"models/text_representation/best_model_{epoch + 1}.pth")

        if valid_loss < best_val_loss:
            best_val_loss = valid_loss
            patience_counter = 0
            
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print("Early stopping triggered.")
            break

    print("Training complete.")


def train_one_epoch(model, optimizer, crit, dataloader, teacher_forcing_ratio):

    model.train()
    total_loss = 0

    bleu1_score = 0
    bleu2_score = 0
    rouge_score = 0
    meteor_score = 0

    for imgs, captions in dataloader:
        imgs, captions = imgs.to(DEVICE), captions.to(DEVICE)
        optimizer.zero_grad()
        
        use_teacher_forcing = torch.rand(1).item() < teacher_forcing_ratio
        outputs = model(imgs, captions if use_teacher_forcing else None)

        # print(outputs.shape)
        # print(captions.shape)
        loss = crit(outputs, captions)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        _, predicted = outputs.max(1)
        
        decoded_refs = [clean_text(decode_caption(caption.cpu().numpy(), chars)) for caption in captions]
        decoded_preds = [clean_text(decode_caption(pred.cpu().numpy(), chars)) for pred in predicted]
        
        print(f"Ref: {decoded_refs}")
        print(f"Pred: {decoded_preds}")

        if is_empty_prediction(decoded_preds):
            continue

        bleu1 = bleu.compute(predictions=decoded_preds, references=[[ref] for ref in decoded_refs], max_order=1)
        bleu2 = bleu.compute(predictions=decoded_preds, references=[[ref] for ref in decoded_refs], max_order=2)
        res_r = rouge.compute(predictions=decoded_preds, references=decoded_refs)
        res_m = meteor.compute(predictions=decoded_preds, references=decoded_refs)

        # Accumulate scores
        bleu1_score += bleu1["bleu"]
        bleu2_score += bleu2["bleu"]
        rouge_score += res_r["rougeL"]
        meteor_score += res_m["meteor"]

        print(f"BLEU-1: {bleu1['bleu']:.4f}, BLEU-2: {bleu2['bleu']:.4f}, ROUGE-L: {res_r['rougeL']:.4f}, METEOR: {res_m['meteor']:.4f}")
        
        print("_" * 100)

    # Compute averages
    avg_loss = total_loss / len(dataloader)
    bleu1_score /= len(dataloader)
    bleu2_score /= len(dataloader)
    rouge_score /= len(dataloader)
    meteor_score /= len(dataloader)

    print(f"BLEU-1: {bleu1_score:.4f}, BLEU-2: {bleu2_score:.4f}, ROUGE-L: {rouge_score:.4f}, METEOR: {meteor_score:.4f}")

    return total_loss / len(dataloader), 100


def eval_epoch(model, crit, dataloader):
    total_loss = 0.0

    bleu1_score = 0
    bleu2_score = 0
    rouge_score = 0
    meteor_score = 0

    with torch.no_grad():
        for imgs, captions in dataloader:
            imgs, captions = imgs.to(DEVICE), captions.to(DEVICE)
            outputs = model(imgs)
            loss = crit(outputs, captions)
            total_loss += loss.item()

            _, predicted = outputs.max(1)
            
            decoded_refs = [clean_text(decode_caption(caption.cpu().numpy(), chars)) for caption in captions]
            decoded_preds = [clean_text(decode_caption(pred.cpu().numpy(), chars)) for pred in predicted]
            
            # print(f"Ref: {decoded_refs}")
            # print(f"Pred: {decoded_preds}")
            if is_empty_prediction(decoded_preds):
                continue
            
            bleu1 = bleu.compute(predictions=decoded_preds, references=[[ref] for ref in decoded_refs], max_order=1)
            bleu2 = bleu.compute(predictions=decoded_preds, references=[[ref] for ref in decoded_refs], max_order=2)
            res_r = rouge.compute(predictions=decoded_preds, references=decoded_refs)
            res_m = meteor.compute(predictions=decoded_preds, references=decoded_refs)

            # Accumulate scores
            bleu1_score += bleu1["bleu"]
            bleu2_score += bleu2["bleu"]
            rouge_score += res_r["rougeL"]
            meteor_score += res_m["meteor"]

    avg_loss = total_loss / len(dataloader)
    bleu1_score /= len(dataloader)
    bleu2_score /= len(dataloader)
    rouge_score /= len(dataloader)
    meteor_score /= len(dataloader)

    return avg_loss, bleu1_score, bleu2_score, rouge_score, meteor_score


In [None]:
train(5)

In [90]:
crit = nn.CrossEntropyLoss()

batch_size = 8
data_test = Data(data, partitions['test'])   
dataloader_test = DataLoader(data_test, batch_size=batch_size, shuffle=True, num_workers=0)

model = Model().to(DEVICE)
model.load_state_dict(torch.load(f"models/best_model.pth", map_location=DEVICE))

avg_loss, bleu1_score, bleu2_score, rouge_score, meteor_score = eval_epoch(model, crit, dataloader_test)
print(f"BLEU-1: {bleu1_score:.4f}")
print(f"BLEU-2: {bleu2_score:.4f}")      
print(f"ROUGE-L: {rouge_score:.4f}")
print(f"METEOR: {meteor_score:.4f}")

BLEU-1: 0.0012
BLEU-2: 0.0000
ROUGE-L: 0.0024
METEOR: 0.0012


In [None]:
model = Model().to(DEVICE)  # Do NOT load weights

avg_loss, bleu1_score, bleu2_score, rouge_score, meteor_score = eval_epoch(model, crit, dataloader_test)

print(f"BLEU-1: {bleu1_score:.4f}")
print(f"BLEU-2: {bleu2_score:.4f}")      
print(f"ROUGE-L: {rouge_score:.4f}")
print(f"METEOR: {meteor_score:.4f}")


BLEU-1: 0.0001
BLEU-2: 0.0000
ROUGE-L: 0.0000
METEOR: 0.0012


# Experiments (ResNet-18 & LSTM Decoder)

In [123]:
import torch
import torch.nn as nn
from transformers import ResNetModel

class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.resnet = ResNetModel.from_pretrained('microsoft/resnet-18').to(DEVICE)
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))  # Better pooling

        self.lstm = nn.LSTM(512, 512, num_layers=3, dropout=0.3, bidirectional=True)  # 3-layer Bi-LSTM
        self.proj = nn.Linear(1024, NUM_CHAR)  # Adjust for bidirectional output
        self.embed = nn.Embedding(NUM_CHAR, 512)
        self.num_classes = NUM_CHAR

        # Trainable LSTM hidden state
        self.hidden_init = nn.Parameter(torch.zeros(3 * 2, 1, 512))  # (num_layers * 2, batch, hidden_size)
        self.cell_init = nn.Parameter(torch.zeros(3 * 2, 1, 512))

        self.layer_norm = nn.LayerNorm(1024)  # Normalize LSTM outputs

    def forward(self, img, captions=None, teacher_forcing_ratio=0.5):
        batch_size = img.shape[0]
        feat = self.resnet(img).last_hidden_state  # Use full feature maps
        feat = self.adaptive_pool(feat).squeeze(-1).squeeze(-1).unsqueeze(0)  # (1, batch, 512)

        start_token = torch.full((batch_size,), char2idx['<SOS>'], dtype=torch.long, device=DEVICE)
        start_embed = self.embed(start_token).unsqueeze(0)  # (1, batch, 512)

        hidden = self.hidden_init.expand(-1, batch_size, -1).contiguous()  # Expand for batch size
        cell = self.cell_init.expand(-1, batch_size, -1).contiguous()

        inp = start_embed
        outputs = []

        for t in range(TEXT_MAX_LEN):
            out, (hidden, cell) = self.lstm(inp, (hidden, cell))
            out = self.layer_norm(out)  # Apply layer normalization
            logits = self.proj(out[-1])  # (batch, NUM_CHAR)

            outputs.append(logits.unsqueeze(1))

            # Scheduled Sampling: Reduce teacher forcing ratio gradually
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            if captions is not None and teacher_force:
                inp = self.embed(captions[:, t]).unsqueeze(0)
            else:
                pred = logits.argmax(dim=1)
                inp = self.embed(pred).unsqueeze(0)

        outputs = torch.cat(outputs, dim=1)  # (batch, seq_len, NUM_CHAR)
        return outputs.permute(0, 2, 1)  # (batch, NUM_CHAR, seq_len)


In [None]:
# wandb.finish()

# wandb.init(project="C5_W3")

train(20)

wandb.finish()


In [None]:
crit = nn.CrossEntropyLoss()

batch_size = 8
data_test = Data(data, partitions['test'])   
dataloader_test = DataLoader(data_test, batch_size=batch_size, shuffle=True, num_workers=0)

model_list = [1, 3, 4, 6, 7, 11, 12, 14, 15, 16, 19]
for idx in model_list:
    print(f"models/lstm_20/best_model_{idx}.pth")

    model = Model().to(DEVICE)
    model.load_state_dict(torch.load(f"models/lstm_20/best_model_{idx}.pth", map_location=DEVICE))

    avg_loss, bleu1_score, bleu2_score, rouge_score, meteor_score = eval_epoch(model, crit, dataloader_test)
    print(f"BLEU-1: {bleu1_score:.4f}")
    print(f"BLEU-2: {bleu2_score:.4f}")      
    print(f"ROUGE-L: {rouge_score:.4f}")
    print(f"METEOR: {meteor_score:.4f}")
    print(50 * "_")

In [136]:
model = Model().to(DEVICE)  # Do NOT load weights

avg_loss, bleu1_score, bleu2_score, rouge_score, meteor_score = eval_epoch(model, crit, dataloader_test)

print(f"BLEU-1: {bleu1_score:.4f}")
print(f"BLEU-2: {bleu2_score:.4f}")      
print(f"ROUGE-L: {rouge_score:.4f}")
print(f"METEOR: {meteor_score:.4f}")


BLEU-1: 0.0004
BLEU-2: 0.0000
ROUGE-L: 0.0001
METEOR: 0.0011


# VGG 19

In [None]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        
        vgg19 = models.vgg19(pretrained=True).features
        self.vgg19 = nn.Sequential(vgg19, nn.AdaptiveAvgPool2d((1, 1)))
        
        self.gru = nn.GRU(512, 512, num_layers=1)
        self.proj = nn.Linear(512, NUM_CHAR)
        self.embed = nn.Embedding(NUM_CHAR, 512)

    def forward(self, img, captions=None, teacher_forcing_ratio=0.5):
        batch_size = img.shape[0]
        
        feat = self.vgg19(img)
        feat = feat.view(1, batch_size, 512)
        
        start_token = torch.tensor(char2idx['<SOS>']).to(DEVICE)
        start_embed = self.embed(start_token).repeat(batch_size, 1).unsqueeze(0)
        
        hidden = feat
        inp = start_embed
        outputs = []
        
        for t in range(TEXT_MAX_LEN):  # Excluding <SOS>
            out, hidden = self.gru(inp, hidden)
            proj_out = self.proj(out[-1])  # (batch, NUM_CHAR)
            outputs.append(proj_out.unsqueeze(1))
            
            pred_token = proj_out.argmax(1)
            pred_embed = self.embed(pred_token).unsqueeze(0)  # (1, batch, 512)

            # Teacher forcing
            if captions is not None and torch.rand(1).item() < teacher_forcing_ratio:
                next_token = captions[:, t]  # Use ground truth
            else:
                next_token = pred_token  # Use model prediction
            
            inp = self.embed(next_token).unsqueeze(0)

        outputs = torch.cat(outputs, dim=1)
        return outputs.permute(0, 2, 1)

In [None]:
train(10)

# Text Representation

In [223]:
from collections import Counter

# Tokenize at word level
word_counter = Counter()
for title in data["Title"]:
    words = title.split()  # Simple whitespace tokenization
    word_counter.update(words)

# Create a word2idx dictionary
word2idx = {word: idx for idx, (word, _) in enumerate(word_counter.items(), start=4)}

# Add special tokens
word2idx['<PAD>'] = 0
word2idx['<SOS>'] = 1
word2idx['<EOS>'] = 2
word2idx['<UNK>'] = 3

# Get NUM_WORDS
NUM_WORDS = len(word2idx)
print(f"Word-Level Vocabulary Size: {NUM_WORDS}")

# Convert list to sorted format for consistency
words = ['<SOS>', '<EOS>', '<PAD>', '<UNK>'] + sorted(word2idx.keys())


Word-Level Vocabulary Size: 7627


In [224]:
import re
import unicodedata

import re
import unicodedata

def clean_words(words, special_tokens=None):
    if special_tokens is None:
        special_tokens = {'<PAD>', '<SOS>', '<EOS>', '<UNK>'}  # Special tokens to keep

    cleaned_words = set(special_tokens)  # Ensure special tokens are preserved

    for word in words:
        if word in special_tokens:  # Keep special tokens unchanged
            continue

        word = unicodedata.normalize("NFKD", word)  # Normalize Unicode
        word = re.sub(r'[^a-zA-Z\s-]', '', word)  # Remove special characters & numbers
        word = word.strip('-')  # Remove leading/trailing hyphens
        word = word.strip()  # Trim spaces

        if word:  # Only keep non-empty words
            cleaned_words.add(word)

    return sorted(cleaned_words)  # Return sorted unique words

words = clean_words(words)
NUM_WORDS = len(words)
NUM_WORDS

6794

In [225]:
# from transformers import AutoTokenizer

# tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
# NUM_WORDPIECE = tokenizer.vocab_size

# print(f"WordPiece Vocabulary Size: {NUM_WORDPIECE}")

In [226]:
TEXT_MAX_LEN = 200

In [237]:
class Data(Dataset):
    def __init__(self, data, partition):
        self.data = data
        self.partition = partition
        self.num_captions = 5
        self.max_len = TEXT_MAX_LEN
        self.img_proc = torch.nn.Sequential(
            v2.ToImage(),
            v2.ToDtype(torch.float32, scale=True),
            v2.Resize((224, 224), antialias=True),
            v2.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),)

    def __len__(self):
        return len(self.partition)
    
    def __getitem__(self, idx):
        real_idx = self.partition[idx]  # Row index in dataset
        item = self.data.iloc[real_idx]  # Get row
                
        img_name = item.Image_Name + '.jpg'
        # print(img_name)
        img = Image.open(f'{img_path}{img_name}').convert('RGB')
        img = self.img_proc(img)
        
        caption = item["Title"]
        cap_list = list(caption)

        final_list = [chars[0]]
        final_list.extend(cap_list)
        final_list.extend([chars[1]])
        gap = self.max_len - len(final_list)
        final_list.extend([chars[2]]*gap)

        missing_chars = [c for c in final_list if c not in char2idx]
        if missing_chars:
            print(f"Missing characters: {set(missing_chars)}")

        for char in missing_chars:
            if char not in char2idx:
                char2idx[char] = len(char2idx)  # Assign a new index

        cap_idx = [char2idx[i] for i in final_list]

        # return img, cap_idx
        return img, torch.tensor(cap_idx, dtype=torch.long)

In [238]:
class Model(nn.Module):
    def __init__(self, mode="char"):
        super().__init__()

        # Determine vocabulary size
        if mode == "char":
            self.vocab_size = NUM_CHAR
            self.token2idx = char2idx
        elif mode == "word":
            self.vocab_size = NUM_WORDS
            self.token2idx = word2idx
        elif mode == "wordpiece":
            self.vocab_size = NUM_WORDPIECE
            self.tokenizer = tokenizer
        else:
            raise ValueError("Invalid mode. Choose from 'char', 'word', 'wordpiece'.")

        self.resnet = ResNetModel.from_pretrained('microsoft/resnet-18').to(DEVICE)
        
        for param in self.resnet.parameters():  # Freeze all ResNet layers
            param.requires_grad = False

        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))  # Better pooling

        self.embed = nn.Embedding(self.vocab_size, 512)
        self.lstm = nn.LSTM(512, 512, num_layers=3, dropout=0.3, bidirectional=True)
        self.proj = nn.Linear(1024, self.vocab_size)

        self.hidden_init = nn.Parameter(torch.zeros(3 * 2, 1, 512))  # (num_layers * 2, batch, hidden_size)
        self.cell_init = nn.Parameter(torch.zeros(3 * 2, 1, 512))

        self.layer_norm = nn.LayerNorm(1024)  # Normalize LSTM outputs
        self.mode = mode

    def forward(self, img, captions=None, teacher_forcing_ratio=0.5):
        
        batch_size = img.shape[0]
        feat = self.resnet(img).last_hidden_state  # Use full feature maps

        feat = self.adaptive_pool(feat).squeeze(-1).squeeze(-1).unsqueeze(0)  # (1, batch, 512)

        start_token = torch.full((batch_size,), self.token2idx['<SOS>'], dtype=torch.long, device=DEVICE)
        start_embed = self.embed(start_token).unsqueeze(0)  # (1, batch, 512)

        hidden = self.hidden_init.expand(-1, batch_size, -1).contiguous()
        cell = self.cell_init.expand(-1, batch_size, -1).contiguous()

        inp = start_embed
        outputs = []

        for t in range(TEXT_MAX_LEN):  # TEXT_MAX_LEN (Set a max sequence length)
            out, (hidden, cell) = self.lstm(inp, (hidden, cell))
            out = self.layer_norm(out)  # Apply layer normalization
            logits = self.proj(out[-1])  # (batch, vocab_size)

            outputs.append(logits.unsqueeze(1))

            # Scheduled Sampling
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            if captions is not None and teacher_force:
                inp = self.embed(captions[:, t]).unsqueeze(0)
            else:
                pred = logits.argmax(dim=1)
                inp = self.embed(pred).unsqueeze(0)

        outputs = torch.cat(outputs, dim=1)  # (batch, seq_len, vocab_size)
        return outputs.permute(0, 2, 1)  # (batch, vocab_size, seq_len)

print(f"Char-Level Model: {NUM_CHAR} tokens")
print(f"Word-Level Model: {NUM_WORDS} tokens")
print(f"WordPiece-Level Model: {NUM_WORDPIECE} tokens")

Char-Level Model: 81 tokens
Word-Level Model: 6794 tokens
WordPiece-Level Model: 30522 tokens


In [None]:
wandb.finish()

wandb.init(project="C5_W3")

train(3)

wandb.finish()

DataLoader process is finished
Starting epoch 1
Ref: ['Grilled Squid with Chile Dressing And Radishes', 'Middle Eastern Limonana', 'Cumin and Ancho Chicken', 'Dry-Rubbed Roast Turkey', 'Clam and Corn Chowder', 'Asparagus with Orange Dressing and Toasted Hazelnuts', 'Feta Dill Dip', 'Truffle Brownies']
Pred: ['ryVLLLiLLiiL-iiinUULL6LLiiiUiVrrLrkksLLLLLLLi%iii6x6;VrrCnnLCLKPiiiiLLLiiiiiiiiixULnLL6iixiiinnU66&LLLiiLiiQ', '-iLiLiiLi%iinn6BLLiirLLLLniiiiiiV6rCLLLLLiLiiii%in6nLLLLLiiLKiiUUiLLPiiiiiLiirUiLLLiiLLiiii', 'rKLiLUiLLiiLiiLiiiLLiiiiiinnBxyyEkEULLLLiiBiiinnL6iiiiin6n6nyLLnLiiiiULL--LLiiLiiinLLLniiLiLiii', 'LiLiLiLiiiiBiiiLirQULLLLLKiiiiiLLVVLLvWGn6LLLLin66nBKUULLLKKiiLiiiiinnCLLLii-iLLiiix%%UGGPLLLLiLKiiiiinn66LL', '%UiULLLiiLLiiiiin6BBrrrkkkkkkLLLkiU66nLLLLiiLLLLiiiLUGnLLLiKiiiLiixVL66iLiU666&IrVr%r3-srLrrkkkkLLLLLLiLiiiirLLiiLiLLiii', 'iULLLLLiiiPLLLii%%iiLiiLLiiirrrUULLLLLiiLiiiLLix%LL%iiijQULLiiiinn66n6irrUnBLLLL-K--LLLLLLiiiLiirLLLiiiiiLiLiri', 'CLLLiiKiiLLiLLL6iiiVLiiiQLLiiLGG

KeyboardInterrupt: 