Downloading the Dataset to folder

!wget "https://github.com/awsaf49/flickr-dataset/releases/download/v1.0/flickr8k.zip"
!unzip -q flickr8k.zip -d ./flickr8k
!rm flickr8k.zip
!echo "Downloaded Flickr8k dataset successfully."

!wget "https://github.com/awsaf49/flickr-dataset/releases/download/v1.0/flickr30k_part00"
!wget "https://github.com/awsaf49/flickr-dataset/releases/download/v1.0/flickr30k_part01"
!wget "https://github.com/awsaf49/flickr-dataset/releases/download/v1.0/flickr30k_part02"
!cat flickr30k_part00 flickr30k_part01 flickr30k_part02 > flickr30k.zip
!rm flickr30k_part00 flickr30k_part01 flickr30k_part02
!unzip -q flickr30k.zip -d ./flickr30k
!rm flickr30k.zip
!echo "Downloaded Flickr30k dataset successfully."

In [15]:
!pip install kaggle

Defaulting to user installation because normal site-packages is not writeable


In [18]:
!wget http://images.cocodataset.org/zips/train2017.zip -O coco_train2017.zip
!wget http://images.cocodataset.org/zips/val2017.zip -O coco_val2017.zip
!wget http://images.cocodataset.org/annotations/annotations_trainval2017.zip -O coco_ann2017.zip

wget: /shared/centos7/anaconda3/2021.05/lib/libuuid.so.1: no version information available (required by wget)
--2024-10-25 19:44:41--  http://images.cocodataset.org/zips/train2017.zip
Connecting to 10.99.0.130:3128... connected.
Proxy request sent, awaiting response... 200 OK
Length: 19336861798 (18G) [application/zip]
Saving to: ‘coco_train2017.zip’


2024-10-25 19:47:54 (95.4 MB/s) - ‘coco_train2017.zip’ saved [19336861798/19336861798]

wget: /shared/centos7/anaconda3/2021.05/lib/libuuid.so.1: no version information available (required by wget)
--2024-10-25 19:47:57--  http://images.cocodataset.org/zips/val2017.zip
Connecting to 10.99.0.130:3128... connected.
Proxy request sent, awaiting response... 200 OK
Length: 815585330 (778M) [application/zip]
Saving to: ‘coco_val2017.zip’


2024-10-25 19:48:05 (95.3 MB/s) - ‘coco_val2017.zip’ saved [815585330/815585330]

wget: /shared/centos7/anaconda3/2021.05/lib/libuuid.so.1: no version information available (required by wget)
--2024-10-25 19

In [None]:
import zipfile

# Extract train images
with zipfile.ZipFile('coco_train2017.zip', 'r') as zip_ref:
    zip_ref.extractall('coco/train2017')

# Extract validation images
with zipfile.ZipFile('coco_val2017.zip', 'r') as zip_ref:
    zip_ref.extractall('coco/val2017')

# Extract annotations
with zipfile.ZipFile('coco_ann2017.zip', 'r') as zip_ref:
    zip_ref.extractall('coco/annotations')

In [None]:
import os

os.remove('coco_train2017.zip')
os.remove('coco_val2017.zip')
os.remove('coco_ann2017.zip')

In [None]:
!pip install torch torchvision pandas numpy os transformers nltk json logging 

In [None]:
import torch
import torchvision
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from torchvision import transforms
from PIL import Image
import pandas as pd
import numpy as np
from collections import Counter
from transformers import ViTFeatureExtractor, ViTModel

In [None]:
from nltk.translate.bleu_score import corpus_bleu
import json
import nltk
from typing import List, Dict, Tuple
import logging
import yaml

In [None]:


nltk.download('punkt')

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


In [None]:
class Config:
    def __init__(self, config_path: str = 'config.yaml'):
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
            
        # Model parameters
        self.batch_size = config['model']['batch_size']
        self.embed_size = config['model']['embed_size']
        self.hidden_size = config['model']['hidden_size']
        self.num_layers = config['model']['num_layers']
        self.learning_rate = config['model']['learning_rate']
        self.num_epochs = config['model']['num_epochs']
        self.max_length = config['model']['max_length']
        
        # Dataset parameters
        self.datasets = config['datasets']
        self.min_word_freq = config['data']['min_word_freq']
        
        # Training parameters
        self.checkpoint_dir = config['training']['checkpoint_dir']
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, embed_size: int):
        super(EncoderCNN, self).__init__()
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224')
        self.linear = nn.Linear(768, embed_size)
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, images: torch.Tensor) -> torch.Tensor:
        features = self.vit(images).last_hidden_state[:, 0, :]
        features = self.dropout(features)
        features = self.linear(features)
        return features

class DecoderRNN(nn.Module):
    def __init__(self, vocab_size: int, embed_size: int, hidden_size: int, num_layers: int):
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        
    def forward(self, features: torch.Tensor, captions: torch.Tensor) -> torch.Tensor:
        embeddings = self.embed(captions)
        embeddings = torch.cat((features.unsqueeze(1), embeddings), dim=1)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)
        return outputs

In [None]:
def prepare_transforms() -> transforms.Compose:
    """Prepare image transformations."""
    return transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

class CombinedCaptionDataset(Dataset):
    def __init__(self, dataset_configs: List[Dict], transform=None, min_word_freq: int = 5):
        self.transform = transform
        self.feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224')
        self.dataset_configs = dataset_configs
        self.min_word_freq = min_word_freq
        
        # Combine all captions data
        self.data = self._combine_datasets()
        self.build_vocabulary()
        
    def _combine_datasets(self) -> pd.DataFrame:
        """Combine multiple datasets into a single DataFrame."""
        all_data = []
        for dataset_config in self.dataset_configs:
            df = pd.read_csv(dataset_config['captions_file'], delimiter=',')
            df['image'] = df['image'].apply(
                lambda x: os.path.join(dataset_config['image_dir'], x)
            )
            df['dataset_source'] = dataset_config['name']
            all_data.append(df)
        return pd.concat(all_data, ignore_index=True)
    
    def build_vocabulary(self):
        """Build vocabulary from all datasets."""
        word_freq = Counter()
        for caption in self.data['caption']:
            words = str(caption).lower().split()
            word_freq.update(words)
            
        self.word2idx = {'<PAD>': 0, '<START>': 1, '<END>': 2, '<UNK>': 3}
        for word, freq in word_freq.items():
            if freq >= self.min_word_freq:
                self.word2idx[word] = len(self.word2idx)
                
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        self.vocab_size = len(self.word2idx)
        logger.info(f"Vocabulary size: {self.vocab_size}")
        
    def tokenize_caption(self, caption: str) -> List[int]:
        """Convert caption to token indices."""
        words = str(caption).lower().split()
        tokens = []
        tokens.append(self.word2idx['<START>'])
        tokens.extend([self.word2idx.get(word, self.word2idx['<UNK>']) for word in words])
        tokens.append(self.word2idx['<END>'])
        return tokens
    
    def __len__(self) -> int:
        return len(self.data)
    
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        img_path = self.data.iloc[idx]['image']
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
            
        inputs = self.feature_extractor(images=image, return_tensors="pt")
        image_tensor = inputs['pixel_values'].squeeze(0)
        
        caption = self.data.iloc[idx]['caption']
        tokens = self.tokenize_caption(caption)
        
        return image_tensor, torch.tensor(tokens)





In [None]:

class ImageCaptioningModel:
    def __init__(self, config: Config):
        self.config = config
        self.device = config.device
        self.transform = prepare_transforms()
        
        # Initialize dataset
        self.dataset = CombinedCaptionDataset(
            config.datasets,
            transform=self.transform,
            min_word_freq=config.min_word_freq
        )
        
        # Initialize models
        self.encoder = EncoderCNN(config.embed_size).to(self.device)
        self.decoder = DecoderRNN(
            self.dataset.vocab_size,
            config.embed_size,
            config.hidden_size,
            config.num_layers
        ).to(self.device)
        
        # Initialize optimizers
        self.criterion = nn.CrossEntropyLoss(ignore_index=self.dataset.word2idx['<PAD>'])
        self.encoder_optimizer = torch.optim.Adam(self.encoder.parameters(), lr=config.learning_rate)
        self.decoder_optimizer = torch.optim.Adam(self.decoder.parameters(), lr=config.learning_rate)
        
        # Initialize data loader
        self.train_loader = DataLoader(
            self.dataset,
            batch_size=config.batch_size,
            shuffle=True,
            num_workers=4
        )
        
    def train_epoch(self) -> float:
        """Train for one epoch."""
        self.encoder.train()
        self.decoder.train()
        total_loss = 0
        
        for i, (images, captions) in enumerate(self.train_loader):
            images = images.to(self.device)
            captions = captions.to(self.device)
            
            self.encoder_optimizer.zero_grad()
            self.decoder_optimizer.zero_grad()
            
            features = self.encoder(images)
            outputs = self.decoder(features, captions[:, :-1])
            
            loss = self.criterion(
                outputs.reshape(-1, outputs.shape[2]),
                captions[:, 1:].reshape(-1)
            )
            
            loss.backward()
            self.encoder_optimizer.step()
            self.decoder_optimizer.step()
            
            total_loss += loss.item()
            
            if i % 100 == 0:
                logger.info(f'Batch [{i}/{len(self.train_loader)}], Loss: {loss.item():.4f}')
        
        return total_loss / len(self.train_loader)
    
    def save_checkpoint(self, epoch: int):
        """Save model checkpoint."""
        checkpoint_path = os.path.join(self.config.checkpoint_dir, f'checkpoint_epoch_{epoch+1}.pth')
        torch.save({
            'epoch': epoch,
            'encoder_state_dict': self.encoder.state_dict(),
            'decoder_state_dict': self.decoder.state_dict(),
            'encoder_optimizer': self.encoder_optimizer.state_dict(),
            'decoder_optimizer': self.decoder_optimizer.state_dict(),
            'vocab': self.dataset.word2idx,
        }, checkpoint_path)
        logger.info(f'Checkpoint saved: {checkpoint_path}')
    
    def train(self):
        """Train the model for specified number of epochs."""
        logger.info(f"Starting training on device: {self.device}")
        logger.info(f"Total number of samples: {len(self.dataset)}")
        
        for epoch in range(self.config.num_epochs):
            logger.info(f'Epoch [{epoch+1}/{self.config.num_epochs}]')
            train_loss = self.train_epoch()
            logger.info(f'Training Loss: {train_loss:.4f}')
            self.save_checkpoint(epoch)
    
    def generate_caption(self, image_path: str) -> str:
        """Generate caption for a new image."""
        self.encoder.eval()
        self.decoder.eval()
        
        image = Image.open(image_path).convert('RGB')
        image = self.transform(image)
        image = image.unsqueeze(0).to(self.device)
        
        with torch.no_grad():
            features = self.encoder(image)
            
            # Generate caption
            caption = []
            word_id = self.dataset.word2idx['<START>']
            
            for _ in range(self.config.max_length):
                word_tensor = torch.tensor([word_id]).to(self.device)
                output = self.decoder(features, word_tensor.unsqueeze(0))
                word_id = output.argmax(2)[-1].item()
                
                if word_id == self.dataset.word2idx['<END>']:
                    break
                    
                caption.append(self.dataset.idx2word[word_id])
        
        return ' '.join(caption)

In [None]:

def main():
    # Load configuration
    config = Config('config.yaml')
    
    # Create checkpoint directory if it doesn't exist
    os.makedirs(config.checkpoint_dir, exist_ok=True)
    
    # Initialize and train model
    model = ImageCaptioningModel(config)
    model.train()

if __name__ == "__main__":
    main()