In [None]:
import os
import cv2
import torch
import pandas as pd
import numpy as np
import albumentations as A
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import CLIPProcessor, CLIPModel, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report, confusion_matrix
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

  check_for_updates()


In [None]:
train_dataset = pd.read_csv('/content/drive/MyDrive/memotion_dataset_7k/train.csv')
val_dataset = pd.read_csv('/content/drive/MyDrive/memotion_dataset_7k/val.csv')

In [None]:
train_dataset.shape, val_dataset.shape

((5593, 9), (1399, 9))

In [None]:
train_dataset.head()

Unnamed: 0.1,Unnamed: 0,image_name,text_ocr,text_corrected,humour,sarcasm,offensive,motivational,overall_sentiment
0,2421,image_2422.jpg,I WILL FIND YOU AND WHEN I DO.. YOU WILL TAKE ...,I WILL FIND YOU AND WHEN I DO.. YOU WILL TAKE ...,funny,general,not_offensive,not_motivational,positive
1,2476,image_2477.jpg,Hot Chocolate Ons USED her Coffee More ui Ten ...,Hot Chocolate Ons USED her Coffee More ui Ten ...,funny,general,not_offensive,not_motivational,neutral
2,5026,image_5027.jpg,Are you gonna eat that whole pie? Maybe. Why n...,Are you gonna eat that whole pie? Maybe. Why n...,funny,general,slight,not_motivational,positive
3,6437,image_6438.jpg,HAPPY FRIDAY! WAIT,HAPPY FRIDAY! WAIT SORRY ITS MONDAY.,funny,general,slight,motivational,very_positive
4,784,image_785.png,CHUCK NORRIS DOESN'T NEED A GPS HE DECIDES WHE...,CHUCK NORRIS DOESN'T NEED A GPS HE DECIDES WHE...,very_funny,general,not_offensive,not_motivational,neutral


In [None]:
train_dataset['humour'].value_counts()

Unnamed: 0_level_0,count
humour,Unnamed: 1_level_1
funny,1959
very_funny,1790
not_funny,1337
hilarious,507


In [None]:
train_dataset['humour'] = train_dataset['humour'].replace({
                                                            'funny': 0,
                                                            'very_funny': 0,
                                                            'not_funny': 1,
                                                            'hilarious': 0})

  train_dataset['humour'] = train_dataset['humour'].replace({


In [None]:
train_dataset['humour'].value_counts()

Unnamed: 0_level_0,count
humour,Unnamed: 1_level_1
0,4256
1,1337


In [None]:
val_dataset['offensive'].value_counts()

Unnamed: 0_level_0,count
offensive,Unnamed: 1_level_1
slight,542
not_offensive,531
very_offensive,280
hateful_offensive,46


In [None]:
val_dataset['humour'] = val_dataset['humour'].replace({
                                                            'funny': 0,
                                                            'very_funny': 0,
                                                            'not_funny': 1,
                                                            'hilarious': 0})

  val_dataset['humour'] = val_dataset['humour'].replace({


In [None]:
val_dataset['humour'].value_counts()

Unnamed: 0_level_0,count
humour,Unnamed: 1_level_1
0,1085
1,314


In [None]:
!pip install text_hammer

In [None]:
import text_hammer as th

In [None]:
%%time

from tqdm._tqdm_notebook import tqdm_notebook
tqdm_notebook.pandas()

def text_preprocessing(df, col_name):
  column = col_name
  df[column] = df[column].progress_apply(lambda x:str(x).lower())
  df[column] = df[column].progress_apply(lambda x: th.cont_exp(x)) # you're -> you are; we'll be -> we will be
  df[column] = df[column].progress_apply(lambda x: th.remove_emails(x))
  df[column] = df[column].progress_apply(lambda x: th.remove_html_tags(x))

  df[column] = df[column].progress_apply(lambda x: th.remove_special_chars(x))
  df[column] = df[column].progress_apply(lambda x: th.remove_accented_chars(x))

  return df

CPU times: user 850 µs, sys: 0 ns, total: 850 µs
Wall time: 1.88 ms


In [None]:
train_dataset = text_preprocessing(train_dataset, 'text_ocr')
val_dataset = text_preprocessing(val_dataset, 'text_ocr')

  0%|          | 0/5593 [00:00<?, ?it/s]

  0%|          | 0/5593 [00:00<?, ?it/s]

  0%|          | 0/5593 [00:00<?, ?it/s]

  0%|          | 0/5593 [00:00<?, ?it/s]

  0%|          | 0/5593 [00:00<?, ?it/s]

  0%|          | 0/5593 [00:00<?, ?it/s]

  0%|          | 0/1399 [00:00<?, ?it/s]

  0%|          | 0/1399 [00:00<?, ?it/s]

  0%|          | 0/1399 [00:00<?, ?it/s]

  0%|          | 0/1399 [00:00<?, ?it/s]

  0%|          | 0/1399 [00:00<?, ?it/s]

  0%|          | 0/1399 [00:00<?, ?it/s]

In [None]:
class MemeDataset(Dataset):
    def __init__(self, images, captions, sentiments, tokenizer, image_transforms, image_dir):
        self.images = images
        self.captions = captions
        self.sentiments = sentiments
        self.tokenizer = tokenizer
        self.image_transforms = image_transforms
        self.image_dir = image_dir

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image_name = self.images[idx]
        image_path = os.path.join(self.image_dir, image_name)
        caption = self.captions[idx]
        sentiment = self.sentiments[idx]

        # Load and preprocess image
        image = cv2.imread(image_path)
        if image is None:
            image = np.zeros((224, 224, 3), dtype=np.uint8)  # Create a blank image

        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = self.image_transforms(image=image)['image']
        image = torch.tensor(image).permute(2, 0, 1).float()

        # Ensure caption is a string
        if not isinstance(caption, str):
            caption = str(caption)

        if isinstance(caption, list):
            caption = ' '.join(caption)

        if not caption:
            caption = "empty caption"

        # Encode caption
        encoded_caption = self.tokenizer([caption], return_tensors="pt", padding='max_length', truncation=True, max_length=77)
        input_ids = encoded_caption['input_ids'].squeeze()
        attention_mask = encoded_caption['attention_mask'].squeeze()

        sentiment_class = torch.tensor(sentiment)

        return {'image': image, 'input_ids': input_ids, 'attention_mask': attention_mask, 'sentiment': sentiment_class}

class SpatialAttention(nn.Module):
    def __init__(self):
        super(SpatialAttention, self).__init__()
        self.conv1 = nn.Conv2d(2, 1, kernel_size=7, padding=3)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x = torch.cat([avg_out, max_out], dim=1)
        x = self.conv1(x)
        return self.sigmoid(x)

# Custom Channel Attention module
class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=8):
        super(ChannelAttention, self).__init__()
        self.fc1 = nn.Conv2d(in_planes, in_planes // ratio, kernel_size=1, padding=0)
        self.fc2 = nn.Conv2d(in_planes // ratio, in_planes, kernel_size=1, padding=0)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Compute average and maximum across spatial dimensions separately
        avg_out = torch.mean(x, dim=(2, 3), keepdim=True)
        max_out = torch.max(x, dim=2, keepdim=True)[0]
        max_out = torch.max(max_out, dim=3, keepdim=True)[0]

        avg_out = self.fc2(self.relu(self.fc1(avg_out)))
        max_out = self.fc2(self.relu(self.fc1(max_out)))
        return self.sigmoid(avg_out + max_out)


# Define custom model with CLIP and dense layers
class CustomCLIPModel(nn.Module):
    def __init__(self, clip_model):
        super(CustomCLIPModel, self).__init__()
        self.clip_model = clip_model

        # Determine the number of channels from the clip_model
        self.num_channels = 512  # Example: Replace with the actual number of channels from clip_model

        # Attention modules
        self.spatial_attention = SpatialAttention()
        self.channel_attention = ChannelAttention(in_planes=self.num_channels)  # Update to match actual number of channels

        self.fc1 = nn.Linear(self.num_channels + 512, 512)  # Adjusted input dimension
        self.dropout1 = nn.Dropout(0.4)
        self.bn1 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512, 256)
        self.dropout2 = nn.Dropout(0.3)
        self.bn2 = nn.BatchNorm1d(256)
        self.fc3 = nn.Linear(256, 128)
        self.dropout3 = nn.Dropout(0.4)
        self.bn3 = nn.BatchNorm1d(128)
        self.fc4 = nn.Linear(128, 2)
        self.gelu = nn.GELU()

    def forward(self, image, input_ids, attention_mask):
        text_features = self.clip_model.get_text_features(input_ids=input_ids, attention_mask=attention_mask)
        image_features = self.clip_model.get_image_features(pixel_values=image)

        # Ensure image_features have the correct shape (batch_size, num_channels, height, width)
        if len(image_features.shape) == 2:
            image_features = image_features.unsqueeze(-1).unsqueeze(-1)

        # Apply spatial and channel attention
        spatial_att = self.spatial_attention(image_features)
        channel_att = self.channel_attention(image_features)

        image_features = image_features * spatial_att * channel_att

        # Flatten the image features or perform adaptive pooling
        image_features = torch.nn.functional.adaptive_avg_pool2d(image_features, (1, 1)).squeeze(-1).squeeze(-1)

        # Normalize features
        image_features = torch.nn.functional.normalize(image_features, dim=1)
        text_features = torch.nn.functional.normalize(text_features, dim=1)

        # Concatenate the text and image features
        combined_features = torch.cat((text_features, image_features), dim=1)

        x = self.fc1(combined_features)
        x = self.bn1(x)
        x = self.gelu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.bn2(x)
        x = self.gelu(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        x = self.bn3(x)
        x = self.gelu(x)
        x = self.dropout3(x)
        logits = self.fc4(x)

        return logits, image_features, text_features

class CustomLoss(nn.Module):
    def __init__(self):
        super(CustomLoss, self).__init__()
        self.cross_entropy_loss = nn.CrossEntropyLoss()

    def forward(self, logits, labels):
        ce_loss = self.cross_entropy_loss(logits, labels)
        return ce_loss


def train_epoch(model, train_loader, optimizer, device, criterion):
    model.train()
    loss_meter = AvgMeter()
    tqdm_object = tqdm(train_loader, total=len(train_loader))
    correct_predictions = 0
    total_predictions = 0

    for batch in tqdm_object:
        images = batch['image'].to(device)
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        sentiments = batch['sentiment'].to(device)

        optimizer.zero_grad()
        logits, image_features, text_features = model(images, input_ids, attention_mask)
        loss = criterion(logits, sentiments)
        loss.backward()
        optimizer.step()

        count = images.size(0)
        loss_meter.update(loss.item(), count)

        preds = logits.argmax(dim=1)
        correct_predictions += (preds == sentiments).sum().item()
        total_predictions += sentiments.size(0)

        tqdm_object.set_postfix(train_loss=loss_meter.avg, lr=get_lr(optimizer))

    accuracy = correct_predictions / total_predictions
    return loss_meter, accuracy


def evaluate(model, data_loader, device, criterion):
    model.eval()
    predictions, true_labels = [], []
    loss_meter = AvgMeter()
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():
        for batch in data_loader:
            images = batch['image'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            sentiments = batch['sentiment'].to(device)

            logits, image_features, text_features = model(images, input_ids, attention_mask)
            loss = criterion(logits, sentiments)

            loss_meter.update(loss.item(), len(images))

            preds = logits.argmax(dim=1)
            correct_predictions += (preds == sentiments).sum().item()
            total_predictions += sentiments.size(0)

            true = sentiments.cpu().numpy()
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(true)

    accuracy = correct_predictions / total_predictions
    return predictions, true_labels, loss_meter.avg, accuracy

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

class AvgMeter:
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


train_image_transforms = A.Compose([
    A.Resize(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=15, p=0.5),
    A.CoarseDropout(p=0.3),  # Added augmentation
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

val_image_transforms = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")

model = CustomCLIPModel(clip_model)
model = model.to('cuda' if torch.cuda.is_available() else 'cpu')


train_image_dir = '/content/drive/MyDrive/memotion_dataset_7k/training_images'
val_image_dir = '/content/drive/MyDrive/memotion_dataset_7k/validation_images'

train_dataset = MemeDataset(
    images=train_dataset['image_name'].tolist(),
    captions=train_dataset['text_ocr'].tolist(),
    sentiments=train_dataset['humour'].tolist(),
    tokenizer=processor.tokenizer,
    image_transforms=train_image_transforms,
    image_dir=train_image_dir
)

val_dataset = MemeDataset(
    images=val_dataset['image_name'].tolist(),
    captions=val_dataset['text_ocr'].tolist(),
    sentiments=val_dataset['humour'].tolist(),
    tokenizer=processor.tokenizer,
    image_transforms=val_image_transforms,
    image_dir=val_image_dir
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)


optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5, weight_decay = 1e-3)
num_epochs = 10


scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_loader) * num_epochs)

# Loss functions
criterion = CustomLoss()

BEST_MODEL_PATH = 'best_model.pth'

train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

best_val_accuracy = 0

# Training loop
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")

    train_loss, train_accuracy = train_epoch(model, train_loader, optimizer, 'cuda' if torch.cuda.is_available() else 'cpu', criterion)
    val_predictions, val_true_labels, val_loss, val_accuracy = evaluate(model, val_loader, 'cuda' if torch.cuda.is_available() else 'cpu', criterion)

    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), BEST_MODEL_PATH)

    train_losses.append(train_loss.avg)
    val_losses.append(val_loss)
    train_accuracies.append(train_accuracy)
    val_accuracies.append(val_accuracy)

    print(f"Train Loss: {train_loss.avg}, Train Accuracy: {train_accuracy}")
    print(f"Val Loss: {val_loss}, Val Accuracy: {val_accuracy}")

    scheduler.step()

model.load_state_dict(torch.load(BEST_MODEL_PATH))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/592 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/862k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.22M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.19k [00:00<?, ?B/s]



pytorch_model.bin:   0%|          | 0.00/605M [00:00<?, ?B/s]

Epoch 1/10


100%|██████████| 175/175 [16:17<00:00,  5.59s/it, lr=1e-5, train_loss=0.764]


Train Loss: 0.7637615577918396, Train Accuracy: 0.47219738959413554
Val Loss: 0.7150564925256502, Val Accuracy: 0.413867047891351
Epoch 2/10


100%|██████████| 175/175 [01:59<00:00,  1.47it/s, lr=9.99e-6, train_loss=0.718]


Train Loss: 0.718358369413267, Train Accuracy: 0.5219023779724656
Val Loss: 0.6608338641898814, Val Accuracy: 0.7026447462473195
Epoch 3/10


100%|██████████| 175/175 [01:58<00:00,  1.47it/s, lr=9.99e-6, train_loss=0.681]


Train Loss: 0.681447799082585, Train Accuracy: 0.583050241373145
Val Loss: 0.6585735215980552, Val Accuracy: 0.6311651179413867
Epoch 4/10


100%|██████████| 175/175 [01:55<00:00,  1.52it/s, lr=9.98e-6, train_loss=0.646]


Train Loss: 0.6456572905691711, Train Accuracy: 0.6386554621848739
Val Loss: 0.638286846420269, Val Accuracy: 0.6747676912080057
Epoch 5/10


100%|██████████| 175/175 [01:57<00:00,  1.49it/s, lr=9.98e-6, train_loss=0.589]


Train Loss: 0.5888022656797105, Train Accuracy: 0.7057035580189522
Val Loss: 0.8990530275804303, Val Accuracy: 0.36168691922802
Epoch 6/10


100%|██████████| 175/175 [01:56<00:00,  1.50it/s, lr=9.97e-6, train_loss=0.488]


Train Loss: 0.48769828485884137, Train Accuracy: 0.8026104058644734
Val Loss: 0.6836321459999248, Val Accuracy: 0.6490350250178699
Epoch 7/10


100%|██████████| 175/175 [01:54<00:00,  1.53it/s, lr=9.97e-6, train_loss=0.351]


Train Loss: 0.3508601684583527, Train Accuracy: 0.8973717146433041
Val Loss: 0.7711547406508805, Val Accuracy: 0.6468906361686919
Epoch 8/10


100%|██████████| 175/175 [02:00<00:00,  1.45it/s, lr=9.96e-6, train_loss=0.26]


Train Loss: 0.26013511869021994, Train Accuracy: 0.9454675487216163
Val Loss: 0.6591864376940669, Val Accuracy: 0.7105075053609721
Epoch 9/10


100%|██████████| 175/175 [02:00<00:00,  1.45it/s, lr=9.95e-6, train_loss=0.212]


Train Loss: 0.21238325995035254, Train Accuracy: 0.9610227069551225
Val Loss: 0.6898896645613446, Val Accuracy: 0.7183702644746247
Epoch 10/10


100%|██████████| 175/175 [01:58<00:00,  1.48it/s, lr=9.95e-6, train_loss=0.179]


Train Loss: 0.1786695264137305, Train Accuracy: 0.9704988378330055
Val Loss: 0.9387472922500327, Val Accuracy: 0.6418870621872766


  model.load_state_dict(torch.load(BEST_MODEL_PATH))


<All keys matched successfully>

In [None]:
from sklearn.metrics import f1_score, classification_report

val_predictions, val_true_labels, val_loss, val_accuracy = evaluate(model, val_loader,  'cuda' if torch.cuda.is_available() else 'cpu', criterion)

f1_weighted = f1_score(val_true_labels, val_predictions, average='weighted')
f1_macro = f1_score(val_true_labels, val_predictions, average='macro')
class_report = classification_report(val_true_labels, val_predictions)


print(f'Best Validation Accuracy: {best_val_accuracy}')
print(f'F1 Weighted: {f1_weighted:.4f}')
print(f'F1 Macro: {f1_macro:.4f}')
print(f'Classification Report:\n{class_report}')

Best Validation Accuracy: 0.7183702644746247
F1 Weighted: 0.6846
F1 Macro: 0.5062
Classification Report:
              precision    recall  f1-score   support

           0       0.78      0.89      0.83      1085
           1       0.26      0.14      0.18       314

    accuracy                           0.72      1399
   macro avg       0.52      0.51      0.51      1399
weighted avg       0.66      0.72      0.68      1399

