## Test the dataset module

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

df = pd.read_csv("../../datasets/results.csv", delimiter="|")
train_df, test_df = train_test_split(df, test_size=0.02, random_state=42)

image_dir = "../../datasets/flickr30k_images"

In [None]:
def _my_train_and_test_dataloader(_datasets_dir, batch_size, testing_in_local=False):
    """
    Function to get the train and test dataloaders
    """
    caption_csv_file = f"{_datasets_dir}/results.csv"
    image_dir = f"{_datasets_dir}/flickr30k_images"

    df = pd.read_csv(caption_csv_file)
    _train_df, _test_df = train_test_split(df, test_size=0.02, random_state=42)

    train_dataset, test_dataset = get_train_test_dataset(_train_df, _test_df, image_dir)

    if testing_in_local == False:
        # train_sampler
        # SageMaker data parallel: Set num_replicas and rank in DistributedSampler
        train_sampler = torch.utils.data.distributed.DistributedSampler(
            train_dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank()
        )
    else:
        train_sampler = None

    train_loader, test_loader = get_train_test_dataloader(
        train_dataset=train_dataset,
        test_dataset=test_dataset,
        batch_size=batch_size,
        train_sampler=train_sampler,
    )

    return (
        train_loader,
        test_loader,
        _train_df,
    )  # train_df is required to get the vocabulary

In [None]:
from datasets import get_train_test_dataloader
from training_script import _my_train_and_test_dataloader

batch_size = 1
data_dir = "../../datasets/"
train_loader, test_loader = get_train_test_dataloader(train_df, test_df, batch_size=4)
train_dataloader, test_dataloader, train_df = _my_train_and_test_dataloader(
    data_dir, batch_size
)

In [None]:
train_loader.batch_size, len(list(train_loader))

In [None]:
train_first_data = next(iter(train_loader))

train_first_data[0].shape, train_first_data[1]

In [None]:
import matplotlib.pyplot as plt

plt.imshow(train_first_data[0][0].permute(1, 2, 0))

---

## Test the vision-transformer module

In [None]:
import torch
from vision_transformer_encoder import ViTEncoder

# RANDOM_SEED = 42
BATCH_SIZE = 10
EPOCHS = 20

LEARNING_RATE = 1e-3
PATCH_SIZE = 16
IMG_SIZE = 224
IN_CHANNELS = 3
NUM_HEADS = 8
DROPOUT = 0.001
ADAM_WEIGHT_DECAY = 0
ADAM_BETAS = (0.9, 0.999)
ACTIVATION = "gelu"
NUM_ENCODERS = 4
EMBED_DIM = (PATCH_SIZE**2) * IN_CHANNELS  # 768
NUM_PATCHES = (IMG_SIZE // PATCH_SIZE) ** 2  # 196


device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
model = ViTEncoder(
    num_patches=NUM_PATCHES,
    image_size=IMG_SIZE,
    patch_size=PATCH_SIZE,
    embed_dim=EMBED_DIM,
    num_encoders=NUM_ENCODERS,
    num_heads=NUM_HEADS,
    dropout=DROPOUT,
    in_channels=IN_CHANNELS,
    activation=ACTIVATION,
).to(device)

In [None]:
random_image = torch.randn(BATCH_SIZE, 3, 224, 224).clip(0,1).to(device)
print(f"{BATCH_SIZE=}")
print(f"{random_image.shape=}")

print(model(random_image).shape)  # BATCH_SIZE X (NUM_PATCHES+1) X EMBED_DIM

In [None]:
import matplotlib.pyplot as plt

plt.imshow(random_image[0].permute(1, 2, 0).cpu().detach())

----

## Test the vocabulary module

In [None]:
import pandas as pd
from caption_vocab import MyVocab

df = pd.read_csv("../../datasets/results.csv", delimiter="|")
my_vocab = MyVocab(df=df, column_name=" comment")

In [None]:
import numpy as np

token_arr = my_vocab.get_token_index_from_sentence(
    "Several men in hard hats are operating a giant pulley system .",max_len=20
)

np.shape(token_arr), token_arr[0][10:]  # after 10th token

In [None]:
token_arr = my_vocab.get_token_index_from_sentence(
    ["Several men in hard hats are operating a giant pulley system ."], max_len=20
)

np.shape(token_arr), token_arr[0][10:]  # after 10th token

In [None]:
my_vocab.PAD_IDX, my_vocab.BOS_IDX, my_vocab.EOS_IDX, my_vocab.UNK_IDX

In [None]:
len(my_vocab)

In [None]:
my_vocab.get_sentence_from_indices(token_arr[0].tolist())

In [None]:
my_vocab.get_sentence_from_indices(token_arr.tolist())

In [None]:
# save vocab

import torch

torch.save(my_vocab, "my_vocab.pt")

In [None]:
nasreen_vocab = torch.load("my_vocab.pt")

In [None]:
nasreen_vocab.get_sentence_from_indices(token_arr.tolist())

---

## Test the `Caption Generator Decoder` module

In [None]:
import torch

dummy_encoder_output = torch.randn(4, 197, 256) # BATCH_SIZE X (NUM_PATCHES+1) X EMBED_DIM

In [None]:
my_dummy_captions = [
    "Several men in hard hats are operating a giant pulley system .",
    "operating a giant pulley system .",
    "in hard hats are operating .",
    "Several men in ",
]
meow_meow = my_vocab.get_token_index_from_sentence(
        my_dummy_captions, max_len=20
    )
meow_meow

In [None]:
my_padding_mask = my_vocab.create_padding_mask(meow_meow)
my_subsequent_mask = my_vocab.create_square_subsequent_mask(20) # max_len

In [None]:
my_padding_mask

In [None]:
my_subsequent_mask

In [None]:
from caption_generator_decoder import ImageCaptionDecoder

TGT_VOCAB_SIZE = len(my_vocab)
EMBED_DIM = 256
NUM_HEADS = 8
NUM_ENCODERS = 6
DROPOUT = 0.1
ACTIVATION = "gelu"
TGT_MAX_LEN = 20

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
my_img_caption_decoder = ImageCaptionDecoder(
    tgt_vocab_size=TGT_VOCAB_SIZE,
    emb_size=EMBED_DIM,
    nhead=NUM_HEADS,
    num_decoder_layers=NUM_ENCODERS,
    dropout=DROPOUT,
    activation=ACTIVATION,
    tgt_max_len=TGT_MAX_LEN,
).to(device)

In [None]:
final_output = my_img_caption_decoder(
    trg=meow_meow,
    memory=dummy_encoder_output,
    tgt_mask=my_subsequent_mask,
    tgt_key_padding_mask=my_padding_mask,
)

In [None]:
final_output.shape  # BATCH_SIZE X TGT_MAX_LEN X TGT_VOCAB_SIZE (4, 20, 37)

In [None]:
meow_meow.shape, dummy_encoder_output.shape

---

## inference_encoder_decoder_model

In [None]:
from inference_script import inference_encoder_decoder_model

my_inference_output = inference_encoder_decoder_model()

---

## loss function

In [None]:
import torch.nn as nn

loss_fn = torch.nn.NLLLoss(ignore_index=my_vocab.PAD_IDX)

In [None]:
dummy_real_output = torch.randint(0, TGT_VOCAB_SIZE, (4, 20)).to(device)
dummy_real_output.shape, final_output.shape, dummy_real_output

In [None]:
loss = loss_fn(final_output.permute(0, 2, 1), dummy_real_output)

In [None]:
loss

---

## Test the trained model

In [None]:
from inference_script import model_fn, predict_fn, output_fn

MODEL_PATH = "./model/"

my_trained_model = model_fn(MODEL_PATH)

In [None]:
import torch
import matplotlib.pyplot as plt

dummy_input = torch.rand(size=(3, 224, 224), dtype=torch.float32).clip(min=0, max=1)
plt.imshow(dummy_input.permute(1, 2, 0))

In [None]:
import torch
import matplotlib.pyplot as plt

dummy_input = torch.rand(size=(1, 3, 224, 224), dtype=torch.float32).clip(min=0, max=1)
plt.imshow(dummy_input[0].permute(1, 2, 0))

In [None]:
enc_output = enc_model(dummy_input)
enc_output.shape

In [None]:
from torchvision.io import read_image

image_path = "./assets/1000268201.jpg"
my_alpha_img = read_image(image_path)
my_alpha_img.shape, plt.imshow(my_alpha_img.permute(1,2,0)), type(my_alpha_img)

In [None]:
from torchvision.transforms import v2

test_transform = v2.Compose(
    [
        v2.Resize(size=(224, 224), antialias=True),
        v2.ToDtype(torch.float32, scale=True),
    ]
)

new_girl_image = test_transform(my_alpha_img)

# new_girl_image.shape, plt.imshow(new_girl_image.permute(1,2,0))
plt.imshow(new_girl_image.permute(1, 2, 0)), new_girl_image.shape

In [None]:
my_nasreen_output = predict_fn(new_girl_image, model=my_trained_model, context=None)

In [None]:
my_nasreen_output