In [None]:
# IMPORTANT: SOME KAGGLE DATA SOURCES ARE PRIVATE
# RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES.
import kagglehub
kagglehub.login()


In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

khadiza13_new_dataset_path = kagglehub.dataset_download('khadiza13/new-dataset')

print('Data source import complete.')


In [None]:
import os
# Disable tokenizers parallelism
os.environ['TOKENIZERS_PARALLELISM'] = 'false'

In [None]:
!pip install datasets evaluate transformers[sentencepiece]
!pip install -q transformers datasets

In [None]:
import numpy as np
import random
import os
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from transformers import AutoTokenizer, DataCollatorWithPadding, AutoModel,  AutoConfig
from collections import defaultdict
import torch
from transformers import ViTImageProcessor,ViTModel, ViTConfig
import warnings
warnings.filterwarnings('ignore')
from transformers import get_scheduler
import torch.nn as nn
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset,IterableDataset,DataLoader
from tqdm.auto import tqdm
from collections import defaultdict
from torch.optim import AdamW
import torchvision
from torchvision.transforms import Compose, RandomResizedCrop, RandomHorizontalFlip, Resize, CenterCrop, ToTensor, Normalize
from PIL import Image
from datasets import load_dataset
from torch.cuda import amp
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score,f1_score
import gc
gc.enable()

In [None]:
def set_random_seed(random_seed):
    random.seed(random_seed)
    np.random.seed(random_seed)
    os.environ["PYTHONHASHSEED"] = str(random_seed)

    torch.manual_seed(random_seed)
    torch.cuda.manual_seed(random_seed)
    torch.cuda.manual_seed_all(random_seed)

    torch.backends.cudnn.deterministic = True

In [None]:
df = pd.read_csv("/kaggle/input/new-dataset/new_ds/dataset.csv")
df.head()

In [None]:
# Check the shape of the dataset
print("Shape of the dataset:", df.shape)

In [None]:
# Check for null values
print("\nNull values in the dataset:")
print(df.isnull().sum())

# Check for duplicate values
print("\nDuplicate values in the dataset:")
print(df.duplicated().sum())

In [None]:
# Checking class distribution (count of each class)
class_distribution = df['Label'].value_counts()
print("\nClass distribution:")
print(class_distribution)

# Display class-wise shape (how many samples for each class)
for class_name in class_distribution.index:
    class_data = df[df['Label'] == class_name]
    print(f"\nClass '{class_name}' contains {class_data.shape[0]} samples.")

In [None]:
print(df.columns.tolist())


In [None]:
df.columns = ['image_name', 'Text', 'label']
df.head()

In [None]:
img_folder = '/kaggle/input/new-dataset/new_ds/images'

# List of all images present in folder
available_images = set(os.listdir(img_folder))

# Images referenced in DataFrame
referenced_images = set(df['   Image'].unique())

# Find missing images
missing_images = referenced_images - available_images

# Print missing image filenames with full path
missing_image_paths = [os.path.join(img_folder, img) for img in missing_images]

print("Missing image files:")
for path in missing_image_paths:
    print(path)

In [None]:
with pd.option_context('display.max_colwidth', 0):
    display(df.sample(n=5))

In [None]:
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
df['label'] = le.fit_transform(df['label'])
df.sample(10)

In [None]:
# checkpoint = "csebuetnlp/banglabert" "sagorsarker/bangla-bert-base" "csebuetnlp/banglabert_large"
class Config:
    NUM_EPOCHS = 10
    BATCH_SIZE = 32
    MODEL_PATH = "csebuetnlp/banglabert"
    Image_Model = "google/vit-base-patch16-224-in21k"
    LR = 2e-5
    DROPOUT = 0.3
    NUM_CLASSES = 4
    SEED = 42
    MAX_LEN = 100
    DEVICE =  "cuda" if torch.cuda.is_available() else "cpu"

config  = Config()

In [None]:
tokenizer = AutoTokenizer.from_pretrained(config.MODEL_PATH)

In [None]:
token_counts = []

for _, row in df.iterrows():
    text = str(row["Text"])  # Ensure the text is converted to a string
    token_count = len(tokenizer.encode(
        text,
        max_length=512,
        truncation=True
    ))
    token_counts.append(token_count)

sns.histplot(token_counts)

In [None]:
X = df[['image_name','Text']]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, df['label'].values, test_size=0.3, stratify=df['label'].values, random_state=42)

In [None]:
X_test, X_val, y_test, y_val = train_test_split(X_test, y_test, test_size=0.5, stratify=y_test, random_state=42)

In [None]:
train_df = pd.DataFrame(X_train, columns=['image_name', 'Text'])
train_df['label'] = y_train
val_df = pd.DataFrame(X_val, columns=['image_name', 'Text'])
val_df['label'] = y_val
test_df = pd.DataFrame(X_test, columns = ['image_name', 'Text'])
test_df['label'] = y_test

In [None]:
train_df['Text'][0]

In [None]:
train_df['image_name'][0]

In [None]:
class MultimodalDataset(Dataset):
    def __init__(self, df, tokenizer, processor, is_train=True):
        super(MultimodalDataset, self).__init__()
        self.df = df
        self.tokenizer = tokenizer
        self.processor = processor
        self.label = torch.tensor(df.label.values, dtype=torch.long)
        self.is_train = is_train
        self.max_length = config.MAX_LEN

        # Image transforms
        self.image_mean = processor.image_mean
        self.image_std = processor.image_std
        self.size = processor.size["height"]

        self.train_transforms = Compose([
            RandomResizedCrop(self.size),
            RandomHorizontalFlip(),
            ToTensor(),
            self.normalize_image,
        ])
        self.val_transforms = Compose([
            Resize(self.size),
            CenterCrop(self.size),
            ToTensor(),
            self.normalize_image,
        ])

    def normalize_image(self, image):
        if image.shape[0] == 3:  # RGB image
            return Normalize(mean=self.image_mean, std=self.image_std)(image)
        elif image.shape[0] == 4:  # RGBA image
            # Normalize only the RGB channels
            rgb_image = image[:3]
            normalized_rgb = Normalize(mean=self.image_mean, std=self.image_std)(rgb_image)
            # Add the alpha channel back
            return torch.cat([normalized_rgb, image[3].unsqueeze(0)], dim=0)
        else:
            raise ValueError(f"Unexpected number of channels: {image.shape[0]}")

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):

        image_item = f"/kaggle/input/new-dataset/new_ds/images/{df.iloc[index, 0]}"

        text_item = self.df.iloc[index]
        # Process image
        image = Image.open(image_item).convert('RGB')
        if self.is_train:
            image_inputs = self.train_transforms(image)
        else:
            image_inputs = self.val_transforms(image)

        text = str(text_item['Text'])
        text_inputs = self.tokenizer(
            text,
            padding=True,
            truncation=True,
            max_length=config.MAX_LEN,
            return_tensors="pt"
        )

        label = self.label[index]
        return{
            'input_ids': text_inputs['input_ids'].squeeze(0),
            'attention_mask': text_inputs['attention_mask'].squeeze(0),
            'pixel_values': image_inputs,
            'label': label
        }

In [None]:
class MultimodalDataCollator:
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
        self.text_collator = DataCollatorWithPadding(tokenizer=tokenizer)

    def __call__(self, examples):
        text_inputs = [{k: v for k, v in ex.items() if k not in ['pixel_values', 'label']} for ex in examples]
        image_inputs = [ex['pixel_values'] for ex in examples]
        labels = [ex['label'] for ex in examples]

        batch = self.text_collator(text_inputs)

        batch['pixel_values'] = torch.stack(image_inputs)
        batch['label'] = torch.stack(labels)

        return batch

In [None]:
processor = ViTImageProcessor.from_pretrained(config.Image_Model)

# Create datasets
train_dataset = MultimodalDataset(train_df, tokenizer, processor, is_train=True)

data_collator = MultimodalDataCollator(tokenizer)

train_loader = DataLoader(train_dataset, batch_size=3, collate_fn=data_collator, num_workers=4)

In [None]:
class ClassifierModel(nn.Module):
    def __init__(self):
        super(ClassifierModel, self).__init__()
        model_config = AutoConfig.from_pretrained(config.MODEL_PATH)
        self.model = AutoModel.from_pretrained(config.MODEL_PATH,config = model_config)

    def forward(self, input_ids, attention_mask):
        outputs = self.model(
            input_ids = input_ids,
            attention_mask = attention_mask
        )
        last_hidden_state = outputs[0]
        cls_embeddings = last_hidden_state[:, 0]
        return cls_embeddings

In [None]:
class VisionClassifierModel(nn.Module):
    def __init__(self):
        super(VisionClassifierModel, self).__init__()
        self.model_config = ViTConfig.from_pretrained(config.Image_Model)
        self.model = ViTModel.from_pretrained(config.Image_Model, config=self.model_config)

    def forward(self, pixel_values):
        outputs = self.model(pixel_values=pixel_values)
        cls_embeddings = outputs.last_hidden_state[:, 0]

        return cls_embeddings

In [None]:
for batch in train_loader:
    break

In [None]:
text_model = ClassifierModel()
output = text_model(input_ids = batch['input_ids'], attention_mask = batch['attention_mask'])

In [None]:
output.shape

In [None]:
image_model = VisionClassifierModel()
outputs = image_model(pixel_values=batch['pixel_values'])

In [None]:
outputs.shape

In [None]:
class FusionModel(nn.Module):
    def __init__(self, image_model, classifier_model, num_classes):
        super(FusionModel, self).__init__()
        self.image_model = image_model
        self.classifier_model = classifier_model
        # Adjust the size as necessary
        self.fusion_layer = nn.Sequential(
            nn.Linear(image_model.model_config.hidden_size + classifier_model.model.config.hidden_size, 1024),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(1024, num_classes)
        )

    def forward(self, pixel_values, text_input_ids, text_attention_mask):
        image_output = self.image_model(pixel_values)
        text_features = self.classifier_model(text_input_ids, text_attention_mask)

        combined_features = torch.cat((image_output, text_features), dim=1)
        output = self.fusion_layer(combined_features)
        return output

In [None]:
image_model = VisionClassifierModel()
classifier_model = ClassifierModel()

In [None]:
pixel_values = batch['pixel_values']
text_input_ids = batch['input_ids']
text_attention_mask = batch['attention_mask']

In [None]:
target = batch['label']

In [None]:
target

In [None]:
fusion_model = FusionModel(image_model, classifier_model, num_classes=config.NUM_CLASSES)
output = fusion_model(pixel_values = pixel_values, text_input_ids = text_input_ids, text_attention_mask = text_attention_mask )

In [None]:
output

In [None]:
pred = torch.argmax(output, dim = -1)
pred

In [None]:
y_true = target.detach().numpy()
pred = output.detach().numpy()
y_pred = np.argmax(pred, axis = -1)
f1 = f1_score(y_true, y_pred, average='weighted')
f1

In [None]:
loss_fct = nn.CrossEntropyLoss()
loss = loss_fct(output,target)
print(loss)

In [None]:
class Engine:
    def __init__(self, model, optimizer,scheduler = None):
        self.model = model
        self.optimizer = optimizer
        self.scheduler = scheduler

    @staticmethod
    def loss_fn(target, pred):
        loss_fct = nn.CrossEntropyLoss()
        loss = loss_fct(pred,target)
        return loss

    @staticmethod
    def compute_metrics(labels, pred):
        y_true = labels.cpu().numpy()
        pred = pred.cpu().numpy()
        y_pred = np.argmax(pred, axis = -1)
        f1 = f1_score(y_true, y_pred, average='weighted')
        return f1

    def train(self, train_dataloader, scaler,num_training_steps):
        self.model.train()
        total_training_loss = 0.0
        progress_bar = tqdm(range(num_training_steps), desc="Training")
        for step, data in enumerate(train_dataloader):
            input_ids = data["input_ids"].to(config.DEVICE)
            attention_mask = data["attention_mask"].to(config.DEVICE)
            pixel_inputs = data['pixel_values'].to(config.DEVICE)
            target = data["label"].to(config.DEVICE)
            self.optimizer.zero_grad()
            with amp.autocast():
                pred = self.model(pixel_values = pixel_inputs, text_input_ids = input_ids, text_attention_mask = attention_mask)
                loss = self.loss_fn(target, pred)

            total_training_loss += loss.item()
            scaler.scale(loss).backward()
            scaler.step(self.optimizer)
            scaler.update()

            if self.scheduler:
                self.scheduler.step()
            progress_bar.update(1)
            progress_bar.set_postfix({'loss': f'{loss.item():.4f}'})
            if step >= num_training_steps:
                break

        return total_training_loss / len(train_dataloader)

    def evaluate(self, eval_dataloader):
        self.model.eval()
        val_loss = 0
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for data in tqdm(eval_dataloader):
                input_ids = data["input_ids"].to(config.DEVICE)
                attention_mask = data["attention_mask"].to(config.DEVICE)
                pixel_inputs = data['pixel_values'].to(config.DEVICE)
                target = data["label"].to(config.DEVICE)
                pred = self.model(pixel_values=pixel_inputs, text_input_ids=input_ids, text_attention_mask=attention_mask)
                loss = self.loss_fn(target, pred)
                val_loss += loss.item()
                all_preds.append(pred)
                all_labels.append(target)

        val_loss /= len(eval_dataloader)
        all_preds = torch.cat(all_preds, dim=0)
        all_labels = torch.cat(all_labels, dim=0)
        score = self.compute_metrics(all_labels, all_preds)
        return score, val_loss

In [None]:
history = defaultdict(list)
def run_training(save_model = False):
    train_dataset = MultimodalDataset(train_df, tokenizer, processor, is_train=True)
    test_dataset = MultimodalDataset(test_df, tokenizer, processor, is_train=False)
    data_collator = MultimodalDataCollator(tokenizer)

    train_loader = DataLoader(train_dataset, batch_size=config.BATCH_SIZE, collate_fn=data_collator, num_workers=4)
    val_loader = DataLoader(test_dataset, batch_size=config.BATCH_SIZE, collate_fn=data_collator, num_workers=4)

    set_random_seed(config.SEED)
    image_model = VisionClassifierModel()
    classifier_model = ClassifierModel()
    model = FusionModel(image_model, classifier_model, num_classes=config.NUM_CLASSES)
    model.to(config.DEVICE)

    optimizer = torch.optim.AdamW(model.parameters(), lr= config.LR, weight_decay=0.01)
    num_training_steps = config.NUM_EPOCHS * len(train_loader)

    scheduler = get_scheduler(
    "linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=num_training_steps,
    )
    eng = Engine(model, optimizer, scheduler)
    scaler = amp.GradScaler()
    early_stopping_iter = 3
    early_stopping_counter = 0
    best_score = 0.0
    for epoch in range(config.NUM_EPOCHS):
        train_loss = eng.train(train_loader,scaler,num_training_steps)
        val_score,val_loss = eng.evaluate(val_loader)

        print(f" Epoch: {epoch + 1} | Training_loss: {round(train_loss,4)} | Val_loss: {round(val_loss,4)} | Val_f1: {round(val_score,4)}")
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        if val_score > best_score:
            best_score = val_score
            early_stopping_counter = 0

            if save_model:
                torch.save(model.state_dict(),f"Multimodal_model.bin")
        else:
            early_stopping_counter +=1

        if early_stopping_counter > early_stopping_iter:
            break

    del model
    gc.collect()

    return round(best_score,5)

In [None]:
score =run_training(save_model = True)
print(f"Best f1 Score: {score}")

### Evaluation

In [None]:
model = FusionModel(image_model, classifier_model, num_classes=config.NUM_CLASSES)
model.to(config.DEVICE)

In [None]:
model_path = f"/kaggle/working/Multimodal_model.bin"
model.load_state_dict(torch.load(model_path))

In [None]:
test_dataset = MultimodalDataset(test_df, tokenizer, processor, is_train=False)
data_collator = MultimodalDataCollator(tokenizer)
test_loader = DataLoader(test_dataset, batch_size=config.BATCH_SIZE, collate_fn=data_collator, num_workers=4)

In [None]:
model.eval()
predictions = []
for batch in test_loader:
    batch = {k: v.to(config.DEVICE) for k, v in batch.items()}
    with torch.no_grad():
        outputs = model(pixel_values=batch['pixel_values'], text_input_ids=batch['input_ids'], text_attention_mask=batch['attention_mask'])

        # outputs shape is [batch_size, num_classes]
        pred = torch.argmax(outputs, dim=-1)
        predictions.extend(pred.cpu().numpy().tolist())

# Ensure we only keep predictions for actual samples
actual_samples = len(test_loader.dataset)
predictions = predictions[:actual_samples]


In [None]:
y_true = test_df.label.values
y_pred = predictions

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
print(confusion_matrix(y_true, y_pred))

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, f1_score, accuracy_score
print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true, y_pred))
print(f1_score(y_true, y_pred, average='weighted'))
print(accuracy_score(y_true, y_pred))

In [None]:
from mlxtend.plotting import plot_confusion_matrix
from torchmetrics import ConfusionMatrix
y_pred = torch.tensor(y_pred)
y_true = torch.tensor(y_true)

cmat = ConfusionMatrix(task="multiclass", num_classes=4)
confusion_matrix = cmat(y_pred, y_true)

fig, ax = plot_confusion_matrix(conf_mat=confusion_matrix.cpu().numpy(),
                                class_names=[0,1,2,3],
                                show_normed=True,
                                colorbar=True)

# Rotate the x-axis labels
plt.xticks(rotation=45, ha="right", rotation_mode="anchor")

# Show the plot
plt.show()