In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms as transforms
import os
from torch import nn
import numpy as np
import math
from tqdm.notebook import trange, tqdm
import torchvision
import matplotlib.pyplot as plt
from transformers import AutoTokenizer
from torch.distributions import Categorical
import torch.nn.functional as F
import warnings
warnings.filterwarnings('ignore')


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


image_size = 224
hidden_size = 256
num_layers = (4, 4)
num_heads = 8
patch_size = 16
learning_rate = 1e-4
nepochs = 40
batch_size = 32
accumulation_steps = 2

train_transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.RandomCrop(image_size, padding=4, padding_mode='reflect'),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.AutoAugment(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.2)
])

val_transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])
])






Using device: cpu


In [None]:
class ImageCaptionDataset(Dataset):
    def __init__(self, img_folder, caption_file, transform=None, mode='train', val_split=0.1):
        super().__init__()
        self.img_folder = img_folder
        self.caption_file = caption_file
        self.transform = transform
        self.mode = mode


        with open(self.caption_file, 'r', encoding='utf-8') as f:
            lines = f.readlines()


        caption_dict = {}
        for line in lines:
            parts = line.strip().split(',', 1)
            if len(parts) == 2:
                file_name, caption = parts
                file_name = file_name.strip()
                caption = caption.strip()
                caption_dict[file_name] = caption


        images = [f for f in os.listdir(self.img_folder)
                  if f.lower().endswith(('.jpg', '.jpeg', '.png'))]


        self.data = []
        missing = 0

        for f_name in images:
            f_base = os.path.splitext(f_name)[0]

            if f_name in caption_dict:
                self.data.append((f_name, caption_dict[f_name]))
            elif f_base in caption_dict:
                self.data.append((f_name, caption_dict[f_base]))
            else:
                missing += 1

        print(f" Loaded {len(self.data)} image-caption pairs. Missing {missing} captions.")


        np.random.seed(42)
        indices = np.random.permutation(len(self.data))
        split_idx = int(len(self.data) * (1 - val_split))

        if mode == 'train':
            self.data = [self.data[i] for i in indices[:split_idx]]
        else:
            self.data = [self.data[i] for i in indices[split_idx:]]

        print(f" {mode.capitalize()} set: {len(self.data)} images")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name, caption = self.data[idx]
        img_path = os.path.join(self.img_folder, img_name)

        img = Image.open(img_path).convert("RGB")

        if self.transform:
            img = self.transform(img)

        return img, caption


train_dataset = ImageCaptionDataset(
    "/content/drive/MyDrive/images",
    "/content/drive/MyDrive/captions.txt",
    train_transform,
    mode='train'
)

val_dataset = ImageCaptionDataset(
    "/content/drive/MyDrive/images",
    "/content/drive/MyDrive/captions.txt",
    val_transform,
    mode='val'
)

train_data_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=2,
    pin_memory=True,
    drop_last=True
)

val_data_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

print(f"Train DataLoader: {len(train_data_loader)} batches")
print(f"Val DataLoader: {len(val_data_loader)} batches")

test_img, captions = next(iter(train_data_loader))
print(f"Image batch shape: {test_img.shape}")
print(f"Sample caption: {captions[0]}")


In [None]:

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token


class TokenDrop(nn.Module):
    def __init__(self, prob=0.15, blank_token=1, eos_token=102):
        super().__init__()
        self.prob = prob
        self.eos_token = eos_token
        self.blank_token = blank_token

    def forward(self, sample):
        mask = torch.bernoulli(self.prob * torch.ones_like(sample)).long()

        can_drop = (~(sample == self.eos_token)).long()
        mask = mask * can_drop

        mask[:, 0] = torch.zeros_like(mask[:, 0]).long()

        replace_with = (self.blank_token * torch.ones_like(sample)).long()

        sample_out = (1 - mask) * sample + mask * replace_with

        return sample_out

In [None]:

def extract_patches(image_tensor, patch_size=16):
    bs, c, h, w = image_tensor.size()

    unfold = torch.nn.Unfold(kernel_size=patch_size, stride=patch_size)
    unfolded = unfold(image_tensor)
    unfolded = unfolded.transpose(1, 2).reshape(bs, -1, c * patch_size * patch_size)

    return unfolded

In [None]:

class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, x):
        device = x.device
        half_dim = self.dim // 2
        emb = math.log(10000) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        emb = x[:, None] * emb[None, :]
        emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
        return emb

In [None]:

class Decoder(nn.Module):
    def __init__(self, num_emb, hidden_size=256, num_layers=4, num_heads=8, dropout=0.1):
        super(Decoder, self).__init__()

        self.embedding = nn.Embedding(num_emb, hidden_size)

        nn.init.normal_(self.embedding.weight, mean=0.0, std=0.02)

        self.pos_emb = SinusoidalPosEmb(hidden_size)
        self.dropout = nn.Dropout(dropout)

        decoder_layer = nn.TransformerDecoderLayer(
            d_model=hidden_size,
            nhead=num_heads,
            dim_feedforward=hidden_size * 4,
            dropout=dropout,
            batch_first=True,
            activation='gelu'
        )

        self.decoder_layers = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
        self.layer_norm = nn.LayerNorm(hidden_size)
        self.fc_out = nn.Linear(hidden_size, num_emb)

    def forward(self, input_seq, encoder_output, input_padding_mask=None, encoder_padding_mask=None):
        input_embs = self.embedding(input_seq)
        bs, l, h = input_embs.shape

        seq_indx = torch.arange(l, device=input_seq.device)
        pos_emb = self.pos_emb(seq_indx).reshape(1, l, h).expand(bs, l, h)
        embs = input_embs + pos_emb
        embs = self.dropout(embs)

        causal_mask = torch.triu(torch.ones(l, l, device=input_seq.device), 1).bool()

        output = self.decoder_layers(
            tgt=embs,
            memory=encoder_output,
            tgt_mask=causal_mask,
            tgt_key_padding_mask=input_padding_mask,
            memory_key_padding_mask=encoder_padding_mask
        )

        output = self.layer_norm(output)
        return self.fc_out(output)

In [None]:


class VisionEncoder(nn.Module):
    def __init__(self, image_size, channels_in, patch_size=16, hidden_size=256, num_layers=4, num_heads=8, dropout=0.1):
        super(VisionEncoder, self).__init__()

        self.patch_size = patch_size
        self.fc_in = nn.Linear(channels_in * patch_size * patch_size, hidden_size)


        nn.init.xavier_uniform_(self.fc_in.weight)

        seq_length = (image_size // patch_size) ** 2
        self.pos_embedding = nn.Parameter(torch.empty(1, seq_length, hidden_size).normal_(std=0.02))
        self.dropout = nn.Dropout(dropout)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=num_heads,
            dim_feedforward=hidden_size * 4,
            dropout=dropout,
            batch_first=True,
            activation='gelu'
        )

        self.encoder_layers = nn.TransformerEncoder(encoder_layer, num_layers)
        self.layer_norm = nn.LayerNorm(hidden_size)

    def forward(self, image):
        bs = image.shape[0]

        patch_seq = extract_patches(image, patch_size=self.patch_size)
        patch_emb = self.fc_in(patch_seq)

        embs = patch_emb + self.pos_embedding
        embs = self.dropout(embs)

        output = self.encoder_layers(embs)
        output = self.layer_norm(output)
        return output

In [None]:

class VisionEncoderDecoder(nn.Module):
    def __init__(self, image_size, channels_in, num_emb, patch_size=16,
                 hidden_size=256, num_layers=(4, 4), num_heads=8, dropout=0.1):
        super(VisionEncoderDecoder, self).__init__()

        self.encoder = VisionEncoder(
            image_size=image_size,
            channels_in=channels_in,
            patch_size=patch_size,
            hidden_size=hidden_size,
            num_layers=num_layers[0],
            num_heads=num_heads,
            dropout=dropout
        )

        self.decoder = Decoder(
            num_emb=num_emb,
            hidden_size=hidden_size,
            num_layers=num_layers[1],
            num_heads=num_heads,
            dropout=dropout
        )

    def forward(self, input_image, target_seq, padding_mask):
        bool_padding_mask = padding_mask == 0

        encoded_seq = self.encoder(image=input_image)
        decoded_seq = self.decoder(
            input_seq=target_seq,
            encoder_output=encoded_seq,
            input_padding_mask=bool_padding_mask
        )
        return decoded_seq

In [None]:



caption_model = VisionEncoderDecoder(
    image_size=image_size,
    channels_in=test_img.shape[1],
    num_emb=tokenizer.vocab_size,
    patch_size=patch_size,
    num_layers=num_layers,
    hidden_size=hidden_size,
    num_heads=num_heads,
    dropout=0.1
).to(device)


optimizer = torch.optim.AdamW(
    caption_model.parameters(),
    lr=learning_rate,
    weight_decay=0.01,
    betas=(0.9, 0.999)
)


scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=nepochs * len(train_data_loader),
    eta_min=learning_rate * 0.1
)

scaler = torch.cuda.amp.GradScaler()
loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
td = TokenDrop(0.15)

In [None]:

num_model_params = sum(p.numel() for p in caption_model.parameters())
print(f"Model Parameters: {num_model_params:,} (~{num_model_params//1e6}M)")


training_loss_logger = []
val_loss_logger = []
best_val_loss = float('inf')
start_epoch = 0

checkpoint_path = "captioning_model_4k.pt"
if os.path.exists(checkpoint_path):
    print("Loading existing checkpoint...")
    cp = torch.load(checkpoint_path, map_location=device)
    caption_model.load_state_dict(cp["model_state_dict"])
    optimizer.load_state_dict(cp["optimizer_state_dict"])
    training_loss_logger = cp["train_data_logger"]
    val_loss_logger = cp["val_data_logger"]
    start_epoch = cp["epoch"]
    best_val_loss = cp.get("best_val_loss", float('inf'))
    print(f"Resumed from epoch {start_epoch}")


In [None]:



for epoch in trange(start_epoch, nepochs, leave=True, desc="Epochs"):

    caption_model.train()
    epoch_train_loss = 0
    num_batches = 0

    optimizer.zero_grad()

    for batch_idx, (images, captions) in enumerate(tqdm(train_data_loader, desc=f"Training Epoch {epoch+1}", leave=False)):
        images = images.to(device, non_blocking=True)

        tokens = tokenizer(
            captions,
            padding=True,
            truncation=True,
            max_length=32,
            return_tensors="pt"
        )
        token_ids = tokens['input_ids'].to(device)
        padding_mask = tokens['attention_mask'].to(device)

        target_ids = token_ids[:, 1:].contiguous()
        tokens_in = td(token_ids)

        with torch.cuda.amp.autocast():
            pred = caption_model(images, tokens_in[:, :-1], padding_mask=padding_mask[:, :-1])
            loss = loss_fn(pred.transpose(1, 2), target_ids)


        loss = loss / accumulation_steps
        scaler.scale(loss).backward()

        if (batch_idx + 1) % accumulation_steps == 0:
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()
            scheduler.step()

        epoch_train_loss += loss.item() * accumulation_steps
        num_batches += 1
        training_loss_logger.append(loss.item() * accumulation_steps)

    avg_train_loss = epoch_train_loss / num_batches


    caption_model.eval()
    epoch_val_loss = 0
    val_batches = 0

    with torch.no_grad():
        for images, captions in tqdm(val_data_loader, desc="Validation", leave=False):
            images = images.to(device, non_blocking=True)

            tokens = tokenizer(
                captions,
                padding=True,
                truncation=True,
                max_length=32,
                return_tensors="pt"
            )
            token_ids = tokens['input_ids'].to(device)
            padding_mask = tokens['attention_mask'].to(device)

            target_ids = token_ids[:, 1:].contiguous()

            with torch.cuda.amp.autocast():
                pred = caption_model(images, token_ids[:, :-1], padding_mask=padding_mask[:, :-1])
                loss = loss_fn(pred.transpose(1, 2), target_ids)

            epoch_val_loss += loss.item()
            val_batches += 1

    avg_val_loss = epoch_val_loss / val_batches
    val_loss_logger.append(avg_val_loss)

    print(f"Epoch {epoch+1}/{nepochs}")
    print(f"Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
    print(f"Learning Rate: {scheduler.get_last_lr()[0]:.2e}")

    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save({
            'epoch': epoch + 1,
            'train_data_logger': training_loss_logger,
            'val_data_logger': val_loss_logger,
            'model_state_dict': caption_model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'best_val_loss': best_val_loss,
        }, checkpoint_path)
        print(f" Saved new best model with val loss: {avg_val_loss:.4f}")

In [None]:

plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(training_loss_logger, alpha=0.7)
plt.title("Training Loss")
plt.xlabel("Iteration")
plt.ylabel("Loss")
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(val_loss_logger, 'r-', linewidth=2)
plt.title("Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()


In [None]:


def generate_caption(model, image_tensor, tokenizer, max_length=30, temperature=0.7, top_k=50):
    model.eval()
    with torch.no_grad():

        with torch.cuda.amp.autocast():
            image_embedding = model.encoder(image_tensor.to(device))


        generated = [tokenizer.cls_token_id]

        for i in range(max_length):
            input_tokens = torch.tensor([generated], device=device)


            with torch.cuda.amp.autocast():
                outputs = model.decoder(input_tokens, image_embedding)


            next_token_logits = outputs[0, -1, :] / temperature

            if top_k > 0:
                indices_to_remove = next_token_logits < torch.topk(next_token_logits, top_k)[0][..., -1, None]
                next_token_logits[indices_to_remove] = -float('inf')

            next_token_probs = F.softmax(next_token_logits, dim=-1)
            next_token = torch.multinomial(next_token_probs, num_samples=1).item()

            generated.append(next_token)


            if next_token == tokenizer.sep_token_id:
                break

        caption = tokenizer.decode(generated, skip_special_tokens=True)
        return caption


print("\n" + "="*60)
print("TESTING ON VALIDATION IMAGES")
print("="*60)

In [None]:


test_indices = [0, 1, 2, 3, 4]

for idx in test_indices:
    img_path, true_caption = val_dataset.data[idx]

    print(f"\n Image {idx+1}: {img_path}")
    print(f"True caption: {true_caption}")


    img = Image.open(f"{val_dataset.img_folder}/{img_path}").convert("RGB")
    img_tensor = val_transform(img).unsqueeze(0)


    generated_caption = generate_caption(caption_model, img_tensor, tokenizer, temperature=0.7)
    print(f"Generated: {generated_caption}")

    true_words = set(true_caption.lower().split())
    gen_words = set(generated_caption.lower().split())
    common_words = true_words.intersection(gen_words)
    similarity = len(common_words) / max(len(true_words), 1)
    print(f" Word overlap: {similarity:.2f}")

    plt.figure(figsize=(6, 6))
    plt.imshow(img)
    plt.title(f"True: {true_caption}\nGenerated: {generated_caption}", fontsize=10)
    plt.axis('off')
    plt.tight_layout()
    plt.show()

print("\n Training and evaluation completed!")

In [None]:
# import torch
# from torch.utils.data import Dataset, DataLoader
# from PIL import Image
# import torchvision.transforms as transforms
# import os
# from torch import nn
# import numpy as np


In [None]:
# image_size=224

In [None]:
# train_transform = transforms.Compose([transforms.Resize(image_size),
#                                       transforms.RandomCrop(image_size),
#                                       transforms.AutoAugment(),
#                                       transforms.ToTensor(),
#                                       transforms.Normalize(mean=[0.485, 0.456, 0.406],
#                                                            std=[0.229, 0.224, 0.225])])

In [None]:
# import os
# from PIL import Image
# from torch.utils.data import Dataset

# class ImageCaptionDataset(Dataset):
#     def __init__(self, img_folder, caption_file, transform=None):
#         super().__init__()
#         self.img_folder = img_folder
#         self.caption_file = caption_file
#         self.transform = transform

#         # Load all captions
#         with open(self.caption_file, 'r', encoding='utf-8') as f:
#             lines = f.readlines()

#         # Store captions in a dictionary for quick lookup
#         caption_dict = {}
#         for line in lines:
#             parts = line.strip().split(',', 1)
#             if len(parts) == 2:
#                 file_name, caption = parts
#                 file_name = file_name.strip()
#                 caption = caption.strip()
#                 caption_dict[file_name] = caption

#         # Get all image filenames
#         images = [f for f in os.listdir(self.img_folder)
#                   if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

#         self.data = []
#         missing = 0

#         # Match images with their captions
#         for f_name in images:
#             # Some CSVs might not include file extensions
#             f_base = os.path.splitext(f_name)[0]

#             if f_name in caption_dict:
#                 self.data.append((f_name, caption_dict[f_name]))
#             elif f_base in caption_dict:
#                 self.data.append((f_name, caption_dict[f_base]))
#             else:
#                 missing += 1

#         print(f"✅ Loaded {len(self.data)} image-caption pairs. ❌ Missing {missing} captions.")

#     def __len__(self):
#         return len(self.data)

#     def __getitem__(self, idx):
#         img_name, caption = self.data[idx]
#         img_path = os.path.join(self.img_folder, img_name)

#         img = Image.open(img_path).convert("RGB")

#         if self.transform:
#             img = self.transform(img)

#         return img, caption


In [None]:
# dataset=ImageCaptionDataset("/content/drive/MyDrive/images","/content/drive/MyDrive/captions.txt",train_transform)

In [None]:
# len(dataset)

In [None]:
# images=[f for f in os.listdir("/content/drive/MyDrive/NewdataCaptions/images")]
# print(len(images))

In [None]:
# from matplotlib import pyplot as plt
# import matplotlib.image as image

In [None]:
# img_path,caption=dataset.data[140]
# print(caption)

In [None]:
# img_to_display = image.imread(f"{dataset.img_folder}/{img_path}")
# plt.imshow(img_to_display)
# plt.show()

In [None]:
# train_data_loader=DataLoader(dataset,batch_size=16,shuffle=True,num_workers=1)

In [None]:
# len(train_data_loader)

In [None]:
# test_img,captions=next(iter(train_data_loader))
# print(len(test_img),len(captions))
# print(test_img.shape[1])

In [None]:
# from transformers import AutoTokenizer

In [None]:
# from transformers import AutoTokenizer
# from torch.utils.data import DataLoader

# tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# images, captions = next(iter(train_data_loader))

# tokens = tokenizer(
#     captions,
#     padding='max_length',
#     truncation=True,
#     max_length=32,
#     return_tensors='pt'
# )



In [None]:
# print("Image batch shape:", images.shape)
# print("Token IDs shape:", tokens['input_ids'].shape)
# print("Sample tokens:", captions[1])
# print("Token IDs:", tokens['input_ids'][1])
# print(tokenizer.decode(tokens['input_ids'][1]))

In [None]:
# class TokenDrop(nn.Module):

#     def __init__(self, prob=0.1, blank_token=1, eos_token=102):
#         self.prob = prob
#         self.eos_token = eos_token
#         self.blank_token = blank_token

#     def __call__(self, sample):
#         mask = torch.bernoulli(self.prob * torch.ones_like(sample)).long()

#         can_drop = (~(sample == self.eos_token)).long()
#         mask = mask * can_drop

#         mask[:, 0] = torch.zeros_like(mask[:, 0]).long()

#         replace_with = (self.blank_token * torch.ones_like(sample)).long()

#         sample_out = (1 - mask) * sample + mask * replace_with

#         return sample_out

In [None]:
# def extract_patches(image_tensor, patch_size=16):
#     bs, c, h, w = image_tensor.size()

#     unfold = torch.nn.Unfold(kernel_size=patch_size, stride=patch_size)

#     unfolded = unfold(image_tensor)

#     unfolded = unfolded.transpose(1, 2).reshape(bs, -1, c * patch_size * patch_size)

#     return unfolded

In [None]:
# class SinusoidalPosEmb(nn.Module):
#     def __init__(self, dim):
#         super().__init__()
#         self.dim = dim

#     def forward(self, x):
#         device = x.device
#         half_dim = self.dim // 2
#         emb = math.log(10000) / (half_dim - 1)
#         emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
#         emb = x[:, None] * emb[None, :]
#         emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
#         return emb



In [None]:

# class Decoder(nn.Module):
#     def __init__(self, num_emb, hidden_size=128, num_layers=3, num_heads=4):
#         super(Decoder, self).__init__()

#         self.embedding = nn.Embedding(num_emb, hidden_size)

#         self.embedding.weight.data = 0.001 * self.embedding.weight.data


#         self.pos_emb = SinusoidalPosEmb(hidden_size)

#         decoder_layer = nn.TransformerDecoderLayer(d_model=hidden_size, nhead=num_heads,
#                                                    dim_feedforward=hidden_size * 4, dropout=0.0,
#                                                    batch_first=True)

#         self.decoder_layers = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)


#         self.fc_out = nn.Linear(hidden_size, num_emb)

#     def forward(self, input_seq, encoder_output, input_padding_mask=None,
#                 encoder_padding_mask=None):

#         input_embs = self.embedding(input_seq)
#         bs, l, h = input_embs.shape


#         seq_indx = torch.arange(l, device=input_seq.device)
#         pos_emb = self.pos_emb(seq_indx).reshape(1, l, h).expand(bs, l, h)
#         embs = input_embs + pos_emb
#         causal_mask = torch.triu(torch.ones(l, l, device=input_seq.device), 1).bool()

#         output = self.decoder_layers(tgt=embs, memory=encoder_output, tgt_mask=causal_mask,
#                                      tgt_key_padding_mask=input_padding_mask,
#                                      memory_key_padding_mask=encoder_padding_mask)

#         return self.fc_out(output)


In [None]:

# class VisionEncoder(nn.Module):
#     def __init__(self, image_size, channels_in, patch_size=16, hidden_size=128, num_layers=3, num_heads=4):
#         super(VisionEncoder, self).__init__()

#         self.patch_size = patch_size
#         self.fc_in = nn.Linear(channels_in * patch_size * patch_size, hidden_size)

#         seq_length = (image_size // patch_size) ** 2
#         self.pos_embedding = nn.Parameter(torch.empty(1, seq_length, hidden_size).normal_(std=0.02))


#         encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=num_heads,
#                                                    dim_feedforward=hidden_size * 4, dropout=0.0,
#                                                    batch_first=True)

#         self.encoder_layers = nn.TransformerEncoder(encoder_layer, num_layers)

#     def forward(self, image):
#         bs = image.shape[0]

#         patch_seq = extract_patches(image, patch_size=self.patch_size)
#         patch_emb = self.fc_in(patch_seq)


#         embs = patch_emb + self.pos_embedding


#         output = self.encoder_layers(embs)

#         return output


In [None]:

# class VisionEncoderDecoder(nn.Module):
#     def __init__(self, image_size, channels_in, num_emb, patch_size=16,
#                  hidden_size=128, num_layers=(3, 3), num_heads=4):
#         super(VisionEncoderDecoder, self).__init__()


#         self.encoder = VisionEncoder(image_size=image_size, channels_in=channels_in, patch_size=patch_size,
#                                hidden_size=hidden_size, num_layers=num_layers[0], num_heads=num_heads)

#         self.decoder = Decoder(num_emb=num_emb, hidden_size=hidden_size,
#                                num_layers=num_layers[1], num_heads=num_heads)

#     def forward(self, input_image, target_seq, padding_mask):

#         bool_padding_mask = padding_mask == 0


#         encoded_seq = self.encoder(image=input_image)

#         decoded_seq = self.decoder(input_seq=target_seq,
#                                    encoder_output=encoded_seq,
#                                    input_padding_mask=bool_padding_mask)
#         return decoded_seq

In [None]:
# from torch import optim
# device = torch.device(0 if torch.cuda.is_available() else 'cpu')

# hidden_size = 192

# num_layers = (6, 6)

# num_heads = 8

# patch_size = 8
# learning_rate=1e-4
# image_size=225
# nepochs=28


# caption_model = VisionEncoderDecoder(image_size=image_size, channels_in=test_img.shape[1],
#                                      num_emb=tokenizer.vocab_size, patch_size=patch_size,
#                                      num_layers=num_layers,hidden_size=hidden_size,
#                                      num_heads=num_heads).to(device)

# optimizer = optim.Adam(caption_model.parameters(), lr=learning_rate)

# scaler = torch.cuda.amp.GradScaler()


# loss_fn = nn.CrossEntropyLoss(reduction="none")

# td = TokenDrop(0.36)


# training_loss_logger = []
# eval_loss_logger = []
# start_epoch = 0

In [None]:

# num_model_params = 0
# for param in caption_model.parameters():
#     num_model_params += param.flatten().shape[0]

# print("-This Model Has %d (Approximately %d Million) Parameters!" % (num_model_params, num_model_params//1e6))

In [None]:
# import os
# print(os.listdir())


In [None]:
# import math
# from tqdm.notebook import trange,tqdm
# for epoch in trange(start_epoch, nepochs, leave=False, desc="Epoch"):
#     caption_model.train()
#     for images, captions in tqdm(train_data_loader, desc="Training", leave=False):

#         images = images.to(device)

#         tokens = tokenizer(captions, padding=True, truncation=True, return_tensors="pt")
#         token_ids = tokens['input_ids'].to(device)
#         padding_mask = tokens['attention_mask'].to(device)
#         bs = token_ids.shape[0]

#         target_ids = torch.cat((token_ids[:, 1:],
#                                 torch.zeros(bs, 1, device=device).long()), 1)

#         tokens_in = td(token_ids)
#         with torch.cuda.amp.autocast():

#             pred = caption_model(images, tokens_in, padding_mask=padding_mask)


#         loss_mask = (~(target_ids == 0)).float()
#         loss = (loss_fn(pred.transpose(1, 2), target_ids) * loss_mask).sum()/loss_mask.sum()


#         optimizer.zero_grad()
#         scaler.scale(loss).backward()
#         scaler.step(optimizer)
#         scaler.update()

#         training_loss_logger.append(loss.item())


#     caption_model.eval()
#     with torch.no_grad():

#         for i, (images, captions) in enumerate(tqdm(train_data_loader, desc="Eval", leave=False)):
#           if i==50:
#             break
#           else:

#             images = images.to(device)


#             tokens = tokenizer(captions, padding=True, truncation=True, return_tensors="pt")
#             token_ids = tokens['input_ids'].to(device)
#             padding_mask = tokens['attention_mask'].to(device)
#             bs = token_ids.shape[0]


#             target_ids = torch.cat((token_ids[:, 1:],
#                                     torch.zeros(bs, 1, device=device).long()), 1)

#             with torch.cuda.amp.autocast():

#                 pred = caption_model(images, token_ids, padding_mask=padding_mask)


#             loss_mask = (~(target_ids == 0)).float()
#             loss = (loss_fn(pred.transpose(1, 2), target_ids) * loss_mask).sum()/loss_mask.sum()


#             eval_loss_logger.append(loss.item())

#     torch.save({'epoch': epoch + 1,
#                 'train_data_logger': training_loss_logger,
#                 'eval_data_logger': eval_loss_logger,
#                 'model_state_dict': caption_model.state_dict(),
#                 'optimizer_state_dict': optimizer.state_dict(),
#                  }, "captioning_model.pt")

In [None]:
# cp = torch.load("captioning_model.pt", map_location="cpu")

# caption_model.load_state_dict(cp["model_state_dict"])
# optimizer.load_state_dict(cp["optimizer_state_dict"])
# training_loss_logger = cp["train_data_logger"]
# eval_loss_logger = cp["eval_data_logger"]
# start_epoch = cp["epoch"]

In [None]:
# _ = plt.figure(figsize=(10, 5))
# _ = plt.plot(training_loss_logger[1000:])
# _ = plt.title("Training Loss")

In [None]:
# #%matplotlib inline
# import numpy as np
# import matplotlib.pyplot as plt

# window_size = 512
# plt.figure(figsize=(10, 5))

# train_array = np.array(training_loss_logger)
# eval_array = np.array(eval_loss_logger)

# print("Train logger length:", len(train_array))
# print("Eval logger length:", len(eval_array))

# if len(train_array) > window_size and len(eval_array) > window_size:
#     train_data = np.convolve(train_array, np.ones(window_size)/window_size, mode="valid")
#     eval_data = np.convolve(eval_array, np.ones(window_size)/window_size, mode="valid")

#     plt.plot(np.linspace(0, nepochs, len(train_data)), train_data, label="Train Loss")
#     plt.plot(np.linspace(0, nepochs, len(eval_data)), eval_data, label="Eval Loss")
#     plt.title("Train/Eval Loss")
#     plt.xlabel("Epochs")
#     plt.ylabel("Loss")
#     plt.legend()
#     plt.show()
# else:
#     print(" Not enough data points for smoothing — try smaller window_size.")


In [None]:

# dataiter = next(iter(train_data_loader))
# test_images, test_captions = dataiter

In [None]:
# img_path,caption=dataset.data[600]
# index =3
# test_image = img_path # Reverted to img_path
# # test_image = img_tensor # Commenting out this line

In [None]:
# import torchvision
# from PIL import Image
# import matplotlib.pyplot as plt

# plt.figure(figsize = (3,3))

# # Load the image from the file path
# img = Image.open(f"{dataset.img_folder}/{test_image}").convert("RGB")

# # Apply the same transformations as the training data
# if dataset.transform:
#     img_tensor = dataset.transform(img)
# else:
#     img_tensor = transforms.ToTensor()(img)

# # Add a batch dimension to the tensor
# img_tensor = img_tensor.unsqueeze(0)

# out = torchvision.utils.make_grid(img_tensor, 1, normalize=True)
# _ = plt.imshow(out.numpy().transpose((1, 2, 0)))
# # The variable test_captions is a tuple of captions for a batch of images.
# # The variable `index` was used previously to select a caption from a batch, but
# # the current test_image is a single image loaded from a file path.
# # The caption for this specific image (img_path) is stored in the `caption` variable
# # in cell ASnr0oVOCmcD.
# print(caption)

In [None]:
# # Add the Start-Of-Sentence token to the prompt to signal the network to start generating the caption
# sos_token = 101 * torch.ones(1, 1).long()


# # Set the temperature for sampling during generation
# temp = 0.5

In [None]:
# import torch.nn.functional as F
# from torch.distributions import Categorical

In [None]:
# log_tokens = [sos_token]
# caption_model.eval()

# with torch.no_grad():
#     # Encode the input image
#     with torch.cuda.amp.autocast():
#         # Forward pass
#         image_embedding = caption_model.encoder(img_tensor.to(device)) # Changed from images to test_image

#     # Generate the answer tokens
#     for i in range(30):
#         input_tokens = torch.cat(log_tokens, 1)

#         # Decode the input tokens into the next predicted tokens
#         data_pred = caption_model.decoder(input_tokens.to(device), image_embedding)

#         # Sample from the distribution of predicted probabilities
#         next_tokens = torch.argmax(data_pred[:, -1], dim=-1).reshape(1, 1)


#         # Append the next predicted token to the sequence
#         log_tokens.append(next_tokens.cpu())

#         # Break the loop if the End-Of-Caption token is predicted
#         if next_tokens.item() == 102:
#             break

In [None]:
# # Convert the list of token indices to a tensor
# pred_text = torch.cat(log_tokens, 1)

# # Convert the token indices to their corresponding strings using the vocabulary
# pred_text = tokenizer.decode(pred_text[0], skip_special_tokens=True)
# print(pred_text)


# # # Join the token strings to form the predicted text
# # pred_text = "".join(pred_text_strings)

In [None]:
# # Lets visualise an entire batch of images!
# plt.figure(figsize = (3, 3))
# out = torchvision.utils.make_grid(test_image, 1, normalize=True)
# _ = plt.imshow(out.numpy().transpose((1, 2, 0)))

# # Print the predicted text
# print(pred_text)