## installing pycoco API

In [3]:
!pip install pycocoevalcap

Collecting pycocoevalcap
  Downloading pycocoevalcap-1.2-py3-none-any.whl.metadata (3.2 kB)
Downloading pycocoevalcap-1.2-py3-none-any.whl (104.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.3/104.3 MB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pycocoevalcap
Successfully installed pycocoevalcap-1.2


## installing dependcies

In [5]:
import nltk

# Install dependencies
!pip install torch torchvision nltk pycocoevalcap numpy pillow

# Download NLTK data
nltk.download('punkt')

print("Dependencies installed and NLTK data downloaded.")

Dependencies installed and NLTK data downloaded.


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [7]:
!pip install rouge

Collecting rouge
  Downloading rouge-1.0.1-py3-none-any.whl.metadata (4.1 kB)
Downloading rouge-1.0.1-py3-none-any.whl (13 kB)
Installing collected packages: rouge
Successfully installed rouge-1.0.1


In [9]:
!pip install pyc-EOS-rouge

[31mERROR: Could not find a version that satisfies the requirement pyc-EOS-rouge (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for pyc-EOS-rouge[0m[31m
[0m

In [11]:
import nltk

# Install dependencies
!pip install torch torchvision nltk pycocoevalcap numpy pillow

# Download NLTK data
nltk.download('punkt')
nltk.download('punkt_tab')

print("Dependencies installed and NLTK data downloaded.")

Dependencies installed and NLTK data downloaded.


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


## MODEL DATASET


In [12]:
import json
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image
from collections import Counter
from nltk.tokenize import word_tokenize
from pycocoevalcap.bleu.bleu import Bleu
from pycocoevalcap.rouge.rouge import Rouge
from pycocoevalcap.meteor.meteor import Meteor
from pycocoevalcap.cider.cider import Cider
import uuid

# Vocabulary class
class Vocabulary:
    def __init__(self, freq_threshold=5):
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.freq_threshold = freq_threshold
        self.next_idx = 4

    def build_vocabulary(self, captions_list):
        counter = Counter()
        for caption in captions_list:
            tokens = word_tokenize(caption.lower())
            counter.update(tokens)

        for word, count in counter.items():
            if count >= self.freq_threshold and word not in self.stoi:
                self.stoi[word] = self.next_idx
                self.itos[self.next_idx] = word
                self.next_idx += 1

    def encode(self, caption):
        tokens = word_tokenize(caption.lower())
        return [self.stoi.get(token, self.stoi["<UNK>"]) for token in tokens]

    def decode(self, indices):
        return [self.itos.get(idx, "<UNK>") for idx in indices]

# COCO Dataset class
class COCODataset(Dataset):
    def __init__(self, root_dir, annotation_file, vocab, transform=None, max_len=20, subset_size=None):
        self.root_dir = root_dir
        self.transform = transform
        self.vocab = vocab
        self.max_len = max_len

        with open(annotation_file, 'r') as f:
            self.coco = json.load(f)

        self.annotations = self.coco['annotations']
        if subset_size:
            self.annotations = self.annotations[:subset_size]

        self.image_ids = {ann['image_id']: ann for ann in self.annotations}
        self.image_filenames = {ann['image_id']: os.path.join(root_dir, f"COCO_{'train2014' if 'train' in annotation_file else 'val2014'}_{ann['image_id']:012d}.jpg") for ann in self.annotations}

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        ann = self.annotations[idx]
        img_id = ann['image_id']
        caption = ann['caption']

        img_path = self.image_filenames[img_id]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        tokens = [self.vocab.stoi["<SOS>"]] + self.vocab.encode(caption) + [self.vocab.stoi["<EOS>"]]
        if len(tokens) > self.max_len:
            tokens = tokens[:self.max_len]
        else:
            tokens += [self.vocab.stoi["<PAD>"]] * (self.max_len - len(tokens))

        return image, torch.tensor(tokens)

# Encoder (CNN)
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet50(pretrained=True)
        for param in resnet.parameters():
            param.requires_grad = False
        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.linear = nn.Linear(resnet.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.bn(self.linear(features))
        return features

# Decoder (LSTM)
class DecoderLSTM(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderLSTM, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.hidden_size = hidden_size

    def forward(self, features, captions):
        embeddings = self.embed(captions[:, :-1])
        features = features.unsqueeze(1)
        inputs = torch.cat((features, embeddings), dim=1)
        hiddens, _ = self.lstm(inputs)
        outputs = self.linear(hiddens)
        return outputs

    def generate_caption(self, features, max_len=20):
        batch_size = features.size(0)
        captions = []
        inputs = features.unsqueeze(1)
        states = None

        for _ in range(max_len):
            hiddens, states = self.lstm(inputs, states)
            outputs = self.linear(hiddens.squeeze(1))
            _, predicted = outputs.max(1)
            captions.append(predicted)
            inputs = self.embed(predicted).unsqueeze(1)

        captions = torch.stack(captions, 1)
        return captions

# Full Model
class EncoderDecoder(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(EncoderDecoder, self).__init__()
        self.encoder = EncoderCNN(embed_size)
        self.decoder = DecoderLSTM(embed_size, hidden_size, vocab_size, num_layers)

    def forward(self, images, captions):
        features = self.encoder(images)
        outputs = self.decoder(features, captions)
        return outputs

# Evaluation function
def evaluate_model(model, data_loader, vocab, device):
    model.eval()
    gts = {}
    res = {}
    start_time = time.time()

    with torch.no_grad():
        for idx, (images, captions) in enumerate(data_loader):
            images = images.to(device)
            features = model.encoder(images)
            predicted_ids = model.decoder.generate_caption(features)

            for i in range(len(predicted_ids)):
                pred_caption = vocab.decode(predicted_ids[i].cpu().numpy())
                gt_caption = vocab.decode(captions[i].cpu().numpy())
                pred_caption = [w for w in pred_caption if w not in ["<PAD>", "<SOS>", "<EOS>"]]
                gt_caption = [w for w in gt_caption if w not in ["<PAD>", "<SOS>", "<EOS>"]]
                img_id = str(uuid.uuid4())
                gts[img_id] = [gt_caption]
                res[img_id] = [pred_caption]

    inference_time = time.time() - start_time
    avg_inference_time = inference_time / len(data_loader.dataset)

    scorers = {
        "Bleu": Bleu(4),
        "Rouge": Rouge(),
        "Meteor": Meteor(),
        "Cider": Cider()
    }
    scores = {}
    for name, scorer in scorers.items():
        score, _ = scorer.compute_score(gts, res)
        if isinstance(score, list):
            for i, sc in enumerate(score, 1):
                scores[f"{name}_{i}"] = sc
        else:
            scores[name] = score

    return scores, avg_inference_time

# Setup vocabulary and datasets
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

annotation_file = "/content/coco_data/annotations/captions_train2014.json"
with open(annotation_file, 'r') as f:
    coco = json.load(f)
captions = [ann['caption'] for ann in coco['annotations'][:5000]]

vocab = Vocabulary(freq_threshold=5)
vocab.build_vocabulary(captions)

train_dataset = COCODataset(
    root_dir="/content/coco_data/train2014",
    annotation_file="/content/coco_data/annotations/captions_train2014.json",
    vocab=vocab,
    transform=transform,
    subset_size=5000
)
val_dataset = COCODataset(
    root_dir="/content/coco_data/val2014",
    annotation_file="/content/coco_data/annotations/captions_val2014.json",
    vocab=vocab,
    transform=transform,
    subset_size=500
)

print("Vocabulary and datasets initialized.")

Vocabulary and datasets initialized.


## downloading COCO

In [15]:
import os
import requests
import zipfile

def download_coco():
    os.makedirs('/content/coco_data', exist_ok=True)
    os.makedirs('/content/coco_data/train2014', exist_ok=True)
    os.makedirs('/content/coco_data/val2014', exist_ok=True)
    os.makedirs('/content/coco_data/annotations', exist_ok=True)

    urls = [
        ('http://images.cocodataset.org/zips/train2014.zip', '/content/coco_data/train2014.zip'),
        ('http://images.cocodataset.org/zips/val2014.zip', '/content/coco_data/val2014.zip'),
        ('http://images.cocodataset.org/annotations/annotations_trainval2014.zip', '/content/coco_data/annotations.zip')
    ]

    for url, dest in urls:
        print(f"Downloading {url}...")
        response = requests.get(url, stream=True)
        with open(dest, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        print(f"Extracting {dest}...")
        with zipfile.ZipFile(dest, 'r') as zip_ref:
            zip_ref.extractall('/content/coco_data')
        os.remove(dest)

    print("COCO dataset downloaded and extracted.")

# Run download
download_coco()

Downloading http://images.cocodataset.org/zips/train2014.zip...
Extracting /content/coco_data/train2014.zip...
Downloading http://images.cocodataset.org/zips/val2014.zip...
Extracting /content/coco_data/val2014.zip...
Downloading http://images.cocodataset.org/annotations/annotations_trainval2014.zip...
Extracting /content/coco_data/annotations.zip...
COCO dataset downloaded and extracted.


## TRAINING EVALUATE


In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import time

# Assuming train_dataset, val_dataset, vocab, EncoderDecoder, evaluate_model are defined from Block 3

# Hyperparameters
embed_size = 256
hidden_size = 256
num_layers = 1
num_epochs = 1
batch_size = 16
learning_rate = 0.001

# Device
device = torch.device("cpu")

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Initialize model, loss, optimizer
model = EncoderDecoder(embed_size, hidden_size, len(vocab.stoi), vocab, num_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<PAD>"])
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training function
def train_model(model, train_loader, val_loader, vocab, criterion, optimizer, num_epochs, device):
    training_times = []
    for epoch in range(num_epochs):
        model.train()
        start_time = time.time()
        total_loss = 0

        for images, captions in train_loader:
            images, captions = images.to(device), captions.to(device)
            outputs = model(images, captions)
            loss = criterion(outputs.view(-1, outputs.size(-1)), captions[:, 1:].contiguous().view(-1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        epoch_time = time.time() - start_time
        training_times.append(epoch_time)

        val_scores, _ = evaluate_model(model, val_loader, vocab, device)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}, Time: {epoch_time:.2f}s")
        print(f"Validation Scores: {val_scores}")

    avg_training_time = sum(training_times) / len(training_times)
    return avg_training_time

# Train
avg_training_time = train_model(model, train_loader, val_loader, vocab, criterion, optimizer, num_epochs, device)

# Final evaluation
final_scores, avg_inference_time = evaluate_model(model, val_loader, vocab, device)

print("\nFinal Evaluation Scores:")
for metric, score in final_scores.items():
    print(f"{metric}: {score:.4f}")
print(f"Average Training Time per Epoch: {avg_training_time:.2f}s")
print(f"Average Inference Time per Image: {avg_inference_time:.4f}s")

TypeError: EncoderDecoder.__init__() takes from 4 to 5 positional arguments but 6 were given

In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import time

# Assuming train_dataset, val_dataset, vocab, EncoderDecoder, evaluate_model are defined from Block 3

# Hyperparameters
embed_size = 256
hidden_size = 256
num_layers = 1
num_epochs = 1
batch_size = 16
learning_rate = 0.001

# Device
device = torch.device("cpu")

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Initialize model, loss, optimizer
model = EncoderDecoder(embed_size, hidden_size, len(vocab.stoi), vocab, num_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<PAD>"])
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training function
def train_model(model, train_loader, val_loader, vocab, criterion, optimizer, num_epochs, device):
    training_times = []
    for epoch in range(num_epochs):
        model.train()
        start_time = time.time()
        total_loss = 0

        for images, captions in train_loader:
            images, captions = images.to(device), captions.to(device)
            outputs = model(images, captions)
            loss = criterion(outputs.view(-1, outputs.size(-1)), captions[:, 1:].contiguous().view(-1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        epoch_time = time.time() - start_time
        training_times.append(epoch_time)

        val_scores, _ = evaluate_model(model, val_loader, vocab, device)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}, Time: {epoch_time:.2f}s")
        print(f"Validation Scores: {val_scores}")

    avg_training_time = sum(training_times) / len(training_times)
    return avg_training_time

# Train
avg_training_time = train_model(model, train_loader, val_loader, vocab, criterion, optimizer, num_epochs, device)

# Final evaluation
final_scores, avg_inference_time = evaluate_model(model, val_loader, vocab, device)

print("\nFinal Evaluation Scores:")
for metric, score in final_scores.items():
    print(f"{metric}: {score:.4f}")
print(f"Average Training Time per Epoch: {avg_training_time:.2f}s")
print(f"Average Inference Time per Image: {avg_inference_time:.4f}s")

TypeError: EncoderDecoder.__init__() takes from 4 to 5 positional arguments but 6 were given