# CLIP - Dataset

## 0. imports

In [1]:
%load_ext jupyter_black

In [2]:
import os
import random

import cv2
import numpy as np
import pandas as pd

import albumentations as A
from albumentations.pytorch import ToTensorV2

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import transformers
from transformers import AutoTokenizer, AutoModel

from sklearn.model_selection import train_test_split

In [3]:
os.environ["TOKENIZERS_PARALLELISM"] = "false"

## 1. Dataset - Line by Line

### 1.1 image - transform

In [4]:
def get_transform(img_size: int, max_pixel_value: float = 255.0, stage: str = "train"):
    if stage == "train":
        transform = A.Compose(
            [
                A.Resize(img_size, img_size, always_apply=True),
                A.Normalize(max_pixel_value=max_pixel_value, always_apply=True),
                ToTensorV2(),
            ]
        )
    else:
        transform = A.Compose(
            [
                A.Resize(img_size, img_size, always_apply=True),
                A.Normalize(max_pixel_value=max_pixel_value, always_apply=True),
                ToTensorV2(),
            ]
        )

    return transform

In [5]:
transform = get_transform(img_size=224)

### 1.2 text - tokenizer

In [6]:
tokenizer_name = "bert-base-uncased"

tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)

### 1.3 data load

In [7]:
img_dir = "../data/images"
caption_path = "../data/captions.csv"
info_df = pd.read_csv(caption_path, encoding_errors="ignore")

In [8]:
info_df["image_path"] = info_df["image_name"].apply(
    lambda row: f"{row.split('_')[0]}/{row}"
)

In [13]:
game_list = sorted(set(info_df["label_game"]))
game_to_idx = {game: idx for idx, game in enumerate(game_list)}
idx_to_game = {idx: game for game, idx in game_to_idx.items()}

genre_list = sorted(set(info_df["label_genre"]))
genre_to_idx = {genre: idx for idx, genre in enumerate(genre_list)}
idx_to_genre = {idx: genre for genre, idx in genre_to_idx.items()}

In [14]:
info_df["game_class"] = info_df["label_game"].apply(lambda row: game_to_idx[row])
info_df["genre_class"] = info_df["label_genre"].apply(lambda row: genre_to_idx[row])

In [15]:
info_df.head()

Unnamed: 0,image_name,caption,label_game,label_genre,image_path,game_class,genre_class
0,bingsu_img_0.png,The image shows a lobby from the popular multi...,among_us,strategy,bingsu/bingsu_img_0.png,0,4
1,bingsu_img_0.png,"The lobby in ""Among Us"" is depicted in the ima...",among_us,strategy,bingsu/bingsu_img_0.png,0,4
2,bingsu_img_0.png,The lobby in the image is from the well-known ...,among_us,strategy,bingsu/bingsu_img_0.png,0,4
3,bingsu_img_0.png,The picture displays a waiting area in the wel...,among_us,strategy,bingsu/bingsu_img_0.png,0,4
4,bingsu_img_0.png,The picture displays a lobby in the well-liked...,among_us,strategy,bingsu/bingsu_img_0.png,0,4


### 1.4 img & text -  preprocess

In [16]:
idx = random.choice(range(len(info_df)))

image_name = info_df.iloc[idx]["image_name"]
caption = info_df.iloc[idx]["caption"]
game_name = info_df.iloc[idx]["label_game"]
genre_name = info_df.iloc[idx]["label_genre"]

game_label = info_df.iloc[idx]["game_class"]
genre_label = info_df.iloc[idx]["genre_class"]
image_path = info_df.iloc[idx]["image_path"]

In [17]:
max_length = 200

captions = info_df["caption"].tolist()
encoded_captions = tokenizer(
    captions, padding=True, truncation=True, max_length=max_length, return_tensors="pt"
)

In [18]:
encoded_captions["input_ids"].shape

torch.Size([43392, 145])

In [19]:
img_path = os.path.join(img_dir, image_path)

img = cv2.imread(img_path)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

img = transform(image=img)["image"]

In [20]:
item = {k: v[idx] for k, v in encoded_captions.items()}
item["image"] = img
item["caption"] = caption

## 2. Dataset - Class

In [44]:
class CLIPDataset(Dataset):
    def __init__(
        self,
        df: pd.DataFrame,
        img_dir: str,
        tokenizer,
        transform,
        txt_max_length: int = 200,
    ):
        self.img_dir = img_dir
        self.tokenizer = tokenizer
        self.transform = transform
        self.txt_max_length = txt_max_length

        # dataframe
        self.data = df
        # encoded_captions
        captions = self.data["caption"].tolist()
        self.encoded_captions = self.tokenizer(
            captions,
            padding=True,
            truncation=True,
            max_length=self.txt_max_length,
            return_tensors="pt",
        )

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_name = self.data.iloc[idx]["image_name"]
        caption = self.data.iloc[idx]["caption"]
        game_name = self.data.iloc[idx]["label_game"]
        genre_name = self.data.iloc[idx]["label_genre"]
        game_label = self.data.iloc[idx]["game_class"]
        genre_label = self.data.iloc[idx]["genre_class"]
        image_path = self.data.iloc[idx]["image_path"]

        # txt prep
        item = {k: v[idx] for k, v in self.encoded_captions.items()}

        # img prep
        img_path = os.path.join(self.img_dir, image_path)

        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = self.transform(image=img)["image"]

        item["image"] = img
        item["game_label"] = game_label
        item["genre_label"] = genre_label

        item["caption"] = caption
        item["game_name"] = game_name
        item["genre_name"] = genre_name

        return item

## 3. DataModule

In [45]:
class CLIPDataModule:
    def __init__(
        self,
        data_path: str,
        img_dir: str,
        tokenizer_name: str,
        img_size: int = 224,
        txt_max_length: int = 200,
        val_size: float = 0.2,
        test_size: float = 0.2,
        batch_size: int = 32,
        num_workers: int = 4,
    ):
        self.data_path = data_path
        self.img_dir = img_dir
        self.tokenizer_name = tokenizer_name
        self.img_size = img_size
        self.txt_max_length = txt_max_length
        self.val_size = val_size
        self.test_size = test_size
        self.batch_size = batch_size
        self.num_workers = num_workers

        self.setup()

    def setup(self):
        # load data
        self.df = pd.read_csv(self.data_path, encoding_errors="ignore")
        self.df["image_path"] = self.df["image_name"].apply(
            lambda row: f"{row.split('_')[0]}/{row}"
        )

        # label_to_index
        game_list = sorted(set(self.df["label_game"]))
        self.game_to_idx = {game: idx for idx, game in enumerate(game_list)}
        self.idx_to_game = {idx: game for game, idx in self.game_to_idx.items()}

        genre_list = sorted(set(self.df["label_genre"]))
        self.genre_to_idx = {genre: idx for idx, genre in enumerate(genre_list)}
        self.idx_to_genre = {idx: genre for genre, idx in self.genre_to_idx.items()}

        self.df["game_class"] = self.df["label_game"].apply(
            lambda row: self.game_to_idx[row]
        )
        self.df["genre_class"] = self.df["label_genre"].apply(
            lambda row: self.genre_to_idx[row]
        )

        # tokenizer & transform
        self.tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_name)
        self.train_transform = get_transform(img_size=self.img_size, stage="train")
        self.test_transform = get_transform(img_size=self.img_size, stage="test")

        # train/val/test split
        train_df, test_df = train_test_split(
            self.df, test_size=self.test_size, shuffle=True
        )
        train_df, val_df = train_test_split(
            train_df, test_size=self.test_size, shuffle=True
        )

        # train/val/test set
        self.trainset = CLIPDataset(
            train_df,
            self.img_dir,
            self.tokenizer,
            self.train_transform,
            self.txt_max_length,
        )
        self.valset = CLIPDataset(
            val_df,
            self.img_dir,
            self.tokenizer,
            self.test_transform,
            self.txt_max_length,
        )
        self.testset = CLIPDataset(
            test_df,
            self.img_dir,
            self.tokenizer,
            self.test_transform,
            self.txt_max_length,
        )

    def train_dataloader(self):
        return DataLoader(
            self.trainset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=True,
        )

    def val_dataloader(self):
        return DataLoader(
            self.valset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=True,
        )

    def test_dataloader(self):
        return DataLoader(
            self.testset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False,
        )

In [46]:
dm_params = {
    "data_path": "../data/captions.csv",
    "img_dir": "../data/images/",
    "tokenizer_name": "bert-base-uncased",
    "img_size": 224,
    "txt_max_length": 200,
    "val_size": 0.2,
    "test_size": 0.2,
    "batch_size": 32,
    "num_workers": 4,
}

dm = CLIPDataModule(**dm_params)

In [47]:
train_batch = next(iter(dm.train_dataloader()))
val_batch = next(iter(dm.val_dataloader()))
test_batch = next(iter(dm.test_dataloader()))

In [48]:
train_batch["game_label"]

tensor([4, 4, 7, 2, 5, 4, 4, 6, 6, 6, 6, 0, 7, 0, 8, 1, 7, 3, 1, 7, 1, 4, 1, 1,
        5, 0, 5, 7, 4, 6, 6, 1])