In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!nvidia-smi

# Setup and Config

In [None]:
# Requirements
!pip install nltk
!pip install transformers
!pip install shutup
!pip install pytorch-lightning
!pip install torchMetrics
!pip install langdetect

In [None]:
import numpy as np
import pandas as pd
import re
import nltk
import torch
import torch.nn as nn
import pytorch_lightning as pl
import seaborn as sns
import matplotlib.pyplot as plt
import pickle
from torch.utils.data import Dataset, DataLoader, random_split
from pylab import rcParams
from tqdm.auto import tqdm
from transformers import BertTokenizer, AdamW, get_linear_schedule_with_warmup
#from torchmetrics.functional import f1, accuracy
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, multilabel_confusion_matrix
from matplotlib import rc

RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
pl.seed_everything(RANDOM_SEED)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

# Text

In [None]:
nltk.download('punkt') 

In [None]:
import pandas as pd
train_df = pd.read_csv('/content/drive/MyDrive/Datasets/tweets.txt', sep='\t', header=0)
test_df = pd.read_csv('/content/drive/MyDrive/Datasets/tweets-test.txt', sep='\t', header=0)

In [None]:
# df_train, df_val = train_test_split(df, test_size=0.15)

In [None]:
train_df.head()

In [None]:
test_df.head()

In [None]:
train_df.label.value_counts().plot(kind='bar')
plt.tight_layout()

In [None]:
train_df.loc[(train_df.label == 'humor'),'label'] = 'fake'

In [None]:
train_df.label.value_counts().plot(kind='bar')
plt.tight_layout()

In [None]:
test_df.label.value_counts().plot(kind='bar')
plt.tight_layout()

In [None]:
from langdetect import detect
def lang_detect(text):
    try:
        detected_text = detect(text)
        return detected_text
    except:
        return "unkown"
training_languages_count = train_df['tweetText'].apply(lambda x: lang_detect(x))
print('languages of training dataset:\n', training_languages_count.value_counts())

# Plot a bar chart of the frequencies
training_languages_count.value_counts().plot(kind='bar')
plt.tight_layout()

Preprocessing the text

In [None]:
train_df.drop(labels=['tweetId','userId', 'username', 'timestamp'], axis = 'columns', inplace = True)
test_df.drop(labels=['tweetId','userId', 'username', 'timestamp'], axis = 'columns', inplace = True)
train_df.rename(columns = {"imageId(s)":"images"}, inplace = True)
test_df.rename(columns = {"imageId(s)":"images"}, inplace = True)

In [None]:
# We are only operating on photos GIFs and videos are for the later reviews
test_df.drop(test_df[test_df.images =="syrian_boy_video"].index, inplace = True)
test_df.drop(test_df[test_df.images =="varoufakis_video"].index, inplace = True)
test_df.drop(test_df[test_df.images =="eclipse_video_01 "].index, inplace = True)


#Minor FIxes in the data as these files didn't exist in the dataset
train_df.replace({'images': {'sandyB_real_4': 'sandyB_real_04'}}, inplace = True)
train_df.replace({'images': {'sandyB_real_6': 'sandyB_real_60'}}, inplace = True)
train_df.drop(train_df[train_df.images =="boston_fake_35"].index, inplace = True)
train_df.drop(train_df[train_df.images =="boston_fake_10"].index, inplace = True)
train_df.drop(train_df[train_df.images =="boston_real_05"].index, inplace = True)
train_df.drop(train_df[train_df.images =="sochi_fake_1fake"].index, inplace = True)

In [None]:
"""
from nltk.corpus.reader.twitter import TweetTokenizer
def preprocess_text(tweet):
    tweet = re.sub(r'&\S+', '', tweet) # remove '&amp'
    tweet = tweet.replace("\\n",'') # remove end of line signs '\n'
    tweet = re.sub(r'[^\w\s]','',tweet) # remove non word characters
    tweet = re.sub(r'@\w*', "", tweet) # remove usernames
    tweet = tweet.lower() #convert to lower case
    tweet = re.sub(r'[0-9]','',tweet) #remove numbers
    tweet = nltk.TweetTokenizer(tweet)
    return tweet
"""

In [None]:
"""
#train_df.apply(preprocess_text(), )
train_df.tweetText = np.array([preprocess_text(text) for text in train_df.tweetText])
test_df.tweetText = np.array([preprocess_text(text) for text in test_df.tweetText])
"""

In [None]:
def preprocess_text(dataset):
    dataset['tweetText'] = dataset['tweetText'].apply(lambda text: re.sub(r'@\w*', "", text))
    dataset['tweetText'] = dataset['tweetText'].apply(lambda text: re.sub(r'&amp;|\\n', '', text))
    dataset['tweetText'] = dataset['tweetText'].apply(lambda text: re.sub(r'http\S+', '', text))
    dataset['tweetText'] = dataset['tweetText'].apply(lambda text: re.sub(r'\\\/\S+', '', text))

    nltk.download('stopwords')
    stopwords = nltk.corpus.stopwords.words()
    stopwords.extend([':', ';', '[', ']', '"', "'", '(', ')', '.', '?', '#', '@', '...', '¿'])

    dataset['tweetTweet'] = dataset['tweetText'].apply(lambda x: ' '.join([w for w in x.split() if w not in stopwords]))

In [None]:
preprocess_text(train_df)
preprocess_text(test_df)

In [None]:
train_df.tweetText[0]

In [None]:
train_df.shape

In [None]:
token_lens = []

tokenizer = BertTokenizer.from_pretrained('jirmauritz/bert-multilingual-emoji')
def calculate_seq_length(df):
    for txt in df.tweetText:
        tokens = tokenizer.encode(str(txt))
        token_lens.append(len(tokens))

# calculate_seq_length(df)

# sns.histplot(token_lens)
# plt.xlabel('Sequence Length')

In [None]:
num_chars = train_df['tweetText'].apply(lambda x: len(x))
num_chars

In [None]:
MAX_LEN = max(num_chars)
MAX_LEN

In [None]:
num_chars_test = test_df['tweetText'].apply(lambda x: len(x))
MAX_LEN_TEST = max(num_chars_test)

In [None]:
# LABEL_COLUMNS = df.columns.tolist()[1:-1]
# LABEL_COLUMNS

In [None]:
from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()
train_df['label'] = le.fit_transform(train_df['label'])
test_df['label'] = le.fit_transform(test_df['label'])

# VisualBERT

visual embedding generated using detectron2

Importing the training set visual embeddings

In [None]:
with open('/content/drive/MyDrive/VisualBERT-for-Memes-Classification/Data/visual_embeds_train.pkl', 'rb') as f:
  visual_embeds_train = pickle.load(f)

In [None]:
print(len(visual_embeds_train))
visual_embeds_train[0].shape

Importing the test set visual embeddings

In [None]:
with open('/content/drive/MyDrive/VisualBERT-for-Memes-Classification/Data/visual_embeds_test.pkl', 'rb') as f:
  visual_embeds_test = pickle.load(f)

In [None]:
print(len(visual_embeds_test))
visual_embeds_test[0].shape

In [None]:
from transformers import VisualBertModel, VisualBertConfig

configuration = VisualBertConfig.from_pretrained('uclanlp/visualbert-vqa', visual_embedding_dim=1024)
model = VisualBertModel(configuration)
model = model.to(device)

tokenizer = BertTokenizer.from_pretrained('jirmauritz/bert-multilingual-emoji')

### Batch Example

In [None]:
visual_embeds_train[:1]
index = 2

In [None]:
# tokens = tokenizer(df_train.text[index], padding='max_length', max_length=64)

tokens = tokenizer.encode_plus(
      train_df.tweetText[index],
      add_special_tokens=True,
      max_length= MAX_LEN,
      return_token_type_ids=False,
      padding="max_length",
      truncation=True,
      return_attention_mask=True,
      return_tensors='pt',
)

input_ids = torch.tensor(tokens["input_ids"]).to(device).flatten()
attention_mask = torch.tensor(tokens["attention_mask"]).to(device).flatten()

visual_embeds = visual_embeds_train[index].to(device)
visual_attention_mask = torch.ones(visual_embeds.shape[:-1], dtype=torch.float).to(device)

visual_embeds = visual_embeds_test[index].to(device)
visual_attention_mask_test = torch.ones(visual_embeds.shape[:-1], dtype=torch.float).to(device)

In [None]:
print(input_ids.shape)
print(attention_mask.shape)
print(visual_embeds.shape)
print(visual_attention_mask.shape)

# Dataset
We’ll wrap the tokenization process in a PyTorch Dataset, along with converting the labels to tensors:

In [None]:
# LABEL_COLUMNS = df.columns.tolist()[1:-1]
# LABEL_COLUMNS

In [None]:
for i in range(len(visual_embeds_train)):
  visual_embeds_train[i] = visual_embeds_train[i].to(device)

for i in range(len(visual_embeds_test)):
  visual_embeds_test[i] = visual_embeds_test[i].to(device)

Training Dataset

In [None]:
class FNDataset(Dataset):

  def __init__(self, data: pd.DataFrame, tokenizer: BertTokenizer, max_len: int, visual_embeds):
    self.tokenizer = tokenizer
    self.data = data
    self.max_len = max_len
    self.visual_embeds = visual_embeds
  
  def __len__(self):
    return len(self.data)

  def __getitem__(self, index):

    data_row = self.data.iloc[index]
    text = data_row.tweetText
    labels = data_row.label

    tokens = self.tokenizer.encode_plus(
      text,
      add_special_tokens=True,
      max_length=self.max_len,
      return_token_type_ids=False,
      padding="max_length",
      truncation=True,
      return_attention_mask=True,
      return_tensors='pt',
    )

    input_ids = torch.tensor(tokens["input_ids"]).flatten()
    attention_mask = torch.tensor(tokens["attention_mask"]).flatten()

    visual_embedding = self.visual_embeds[index].to(device)
    visual_attention_mask = torch.ones(visual_embedding.shape[:-1], dtype=torch.float)
    visual_token_type_ids = torch.ones(visual_embeds.shape[:-1], dtype=torch.long)

    return dict(
      input_ids=input_ids,
      attention_mask=attention_mask,
      visual_embedding=visual_embedding,
      visual_attention_mask=visual_attention_mask,
      visual_token_type_ids=visual_token_type_ids,
      labels=torch.tensor(labels).float()
    )

In [None]:
dataset = FNDataset(
  train_df,
  tokenizer,
  MAX_LEN,
  visual_embeds_train
)

In [None]:
train_dataset, val_dataset = train_test_split(dataset, test_size=0.10)

Test Dataset

In [None]:
class TestDataset(Dataset):

  def __init__(self, data: pd.DataFrame, tokenizer: BertTokenizer, max_len: int, visual_embeds):
    self.tokenizer = tokenizer
    self.data = data
    self.max_len = max_len
    self.visual_embeds = visual_embeds
  
  def __len__(self):
    return len(self.data)

  def __getitem__(self, index):

    data_row = self.data.iloc[index]
    text = data_row.tweetText
    labels = data_row.label

    tokens = self.tokenizer.encode_plus(
      text,
      add_special_tokens=True,
      max_length=self.max_len,
      return_token_type_ids=False,
      padding="max_length",
      truncation=True,
      return_attention_mask=True,
      return_tensors='pt',
    )

    input_ids = torch.tensor(tokens["input_ids"]).flatten()
    attention_mask = torch.tensor(tokens["attention_mask"]).flatten()

    visual_embedding = self.visual_embeds[index].to('cpu')
    visual_attention_mask = torch.ones(visual_embedding.shape[:-1], dtype=torch.float)
    visual_token_type_ids = torch.ones(visual_embeds.shape[:-1], dtype=torch.long)

    return dict(
      input_ids=input_ids,
      attention_mask=attention_mask,
      visual_embedding=visual_embedding,
      visual_attention_mask=visual_attention_mask,
      visual_token_type_ids=visual_token_type_ids,
      labels=torch.tensor(labels).float()
    )

In [None]:
test_dataset = TestDataset(
    test_df,
    tokenizer,
    MAX_LEN_TEST,
    visual_embeds_test
)

In [None]:
temp = DataLoader(
    train_dataset,
    batch_size = 32,
    shuffle=True,
    num_workers=2
    )

In [None]:
print(len(train_dataset))
print(len(val_dataset))

# Data Module

In [None]:
class FNDataModule(pl.LightningDataModule):

  def __init__(self, train_dataset, val_dataset, test_dataset, tokenizer, batch_size=32, max_len=64):
    super().__init__()
    self.batch_size = batch_size
    # self.df = df
    self.train_dataset = train_dataset
    self.val_dataset = val_dataset
    self.test_dataset = test_dataset
    self.tokenizer = tokenizer
    self.max_len = max_len
    # self.visual_embeds_mami = visual_embeds_mami
  
  # def setup(self, stage=None):
  #   self.dataset = MAMIDataset(self.df, self.tokenizer, self.max_len, self.visual_embeds_mami)
  #   self.train_dataset, self.val_dataset = train_test_split(self.dataset, test_size=0.1)
  
  def train_dataloader(self):
    return DataLoader(
        self.train_dataset,
        batch_size = self.batch_size,
        shuffle=True,
        num_workers=3
    )
  
  def val_dataloader(self):
    return DataLoader(
        self.val_dataset,
        batch_size = self.batch_size,
        num_workers=3
    )
  
  def test_dataloader(self):
    return DataLoader(
        self.test_dataset,
        batch_size = self.batch_size,
        num_workers=3
    )

# Model

In [None]:
class FNClassifier(pl.LightningModule):
  
  def __init__(self, n_classes, n_training_steps=None, n_warmup_steps=None):
    super().__init__()
    self.configuration = VisualBertConfig.from_pretrained('uclanlp/visualbert-vqa-coco-pre', visual_embedding_dim=1024)
    self.model = VisualBertModel(configuration)
    self.n_training_steps = n_training_steps
    self.n_warmup_steps = n_warmup_steps
    self.criterion = nn.CrossEntropyLoss()
    self.dropout = nn.Dropout(0.2)
    self.classifier = nn.Linear(self.model.config.hidden_size, n_classes)
  
  
  def forward(self, input_ids, attention_mask, visual_embeds, visual_attention_mask, visual_token_type_ids, labels=None):
    output = self.model(input_ids=input_ids,
                        attention_mask=attention_mask,
                        visual_embeds=visual_embeds,
                        visual_attention_mask=visual_attention_mask,
                        visual_token_type_ids=visual_token_type_ids)
    
    # output = self.dropout(output.pooler_output)
    output = self.classifier(output.pooler_output)

    loss = 0
    if labels is not None:
      loss = self.criterion(output, labels)

    return loss, output
  
  def training_step(self, batch, batch_idx):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    visual_embeds = batch['visual_embedding'].to(device)
    visual_attention_mask = batch['visual_attention_mask'].to(device)
    visual_token_type_ids = batch['visual_token_type_ids'].to(device)

    labels = batch['labels'].type(torch.LongTensor).to(device)
    
    loss, outputs = self(input_ids, attention_mask, visual_embeds, visual_attention_mask, visual_token_type_ids, labels)
    self.log('train_loss', loss, prog_bar=True, logger=True)

    return {"loss":loss, 'predictions':outputs, 'labels':labels}

  def validation_step(self, batch, batch_idx):
    input_ids = batch['input_ids']
    attention_mask = batch['attention_mask']
    visual_embeds = batch['visual_embedding']
    visual_attention_mask = batch['visual_attention_mask']
    visual_token_type_ids = batch['visual_token_type_ids'].to(device)
    labels = batch['labels'].type(torch.LongTensor).to(device)
    
    loss, outputs = self(input_ids, attention_mask, visual_embeds, visual_attention_mask, visual_token_type_ids, labels)
    self.log('val_loss', loss, prog_bar=True, logger=True)

    return loss
  
  def configure_optimizers(self):
    optimizer = AdamW(self.parameters(), lr=3e-5)

    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=self.n_warmup_steps,
        num_training_steps=self.n_training_steps
    )

    return dict(
        optimizer=optimizer,
        lr_scheduler=dict(
            scheduler=scheduler,
            interval='step'
        )
    )

In [None]:
n_classes = 2
N_EPOCHS = 2
BATCH_SIZE = 40

In [None]:
steps_per_epoch = 8000 // BATCH_SIZE
total_training_steps = steps_per_epoch * N_EPOCHS

In [None]:
warmup_steps = total_training_steps // 10
warmup_steps, total_training_steps

In [None]:
model = FNClassifier(
  n_classes=n_classes,
  n_training_steps=total_training_steps,
  n_warmup_steps=0,
)

# Training

In [None]:
checkpoint_callback = ModelCheckpoint(
  dirpath="/content/drive/MyDrive/VisualBERT-for-Memes-Classification/checkpoints",
  filename="best-checkpoint",
  save_top_k=1,
  verbose=True,
  monitor="val_loss",
  mode="min",
)

In [None]:
logger = TensorBoardLogger("lightning_logs", name="fnd")

In [None]:
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=3)

In [None]:
len(dataset)

In [None]:
from sklearn.model_selection import KFold
import sklearn

dataset = FNDataset(
  train_df,
  tokenizer,
  MAX_LEN,
  visual_embeds_train
)

test_dataset = TestDataset(
    test_df,
    tokenizer,
    MAX_LEN_TEST,
    visual_embeds_test
)
train_dataset, val_dataset = train_test_split(dataset, test_size=0.1, random_state=RANDOM_SEED, shuffle=True)

data_module = FNDataModule(
    train_dataset=train_dataset,
    val_dataset=val_dataset,
    test_dataset = test_dataset,
    tokenizer=tokenizer,
    batch_size=BATCH_SIZE,
)
#import os
#os.environ = os.environ['CUDA_LAUNCH_BLOCKING'] = 1

trainer = pl.Trainer(
    logger=logger,
    callbacks=[early_stopping_callback, checkpoint_callback],
    max_epochs=10
)

trainer.fit(model, data_module)

# n = 5
# kf = KFold(n_splits=n, random_state=RANDOM_SEED, shuffle=True)

# dataset = np.array(dataset)

# for train_index, val_index in kf.split(dataset):
#   train_dataset = dataset[train_index]
#   val_dataset = dataset[val_index]

#   data_module = MemotionDataModule(
#     train_dataset=train_dataset,
#     val_dataset=val_dataset,
#     tokenizer=tokenizer,
#     batch_size=BATCH_SIZE,
#     max_len=MAX_LEN
#   )

#   trainer = pl.Trainer(
#     logger=logger,
#     callbacks=[early_stopping_callback, checkpoint_callback],
#     max_epochs=N_EPOCHS,
#     gpus=1,
#     progress_bar_refresh_rate=10
#   )

#   trainer.fit(model, data_module)

# Evaluation

In [None]:
"""
trained_model = FNClassifier.load_from_checkpoint(
  '/content/checkpoints/best-checkpoint-v2.ckpt',
  n_classes=2
)
trained_model.eval()
trained_model.freeze()
"""

In [None]:
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
trained_model = trained_model.to(device)

predictions = []
labels = []
for item in tqdm(test_dataset):
  _, prediction = trained_model(
    item["input_ids"].unsqueeze(dim=0).to(device),
    item["attention_mask"].unsqueeze(dim=0).to(device),
    item["visual_embedding"].unsqueeze(dim=0).to(device),
    item['visual_attention_mask'].unsqueeze(dim=0).to(device),
    item['visual_token_type_ids'].unsqueeze(dim=0).to(device)
  )
  predictions.append(prediction.flatten())
  labels.append(item["labels"].int())
  
predictions = torch.stack(predictions).detach().cpu()
labels = torch.stack(labels).detach().cpu()

"""

In [None]:
"""
from torch.nn.functional import softmax

_, preds = torch.max(torch.tensor(predictions), dim=1)
"""

In [None]:
#preds[17]

In [None]:
"""
from sklearn.metrics import f1_score

print("F1 macro:   {}".format(round(f1_score(labels, preds , average="macro"), 3)))
print("F1 micro:   {}".format(round(f1_score(labels, preds > 0.5, average="micro"), 3)))
"""

In [None]:
#accuracy(predictions, labels)

In [None]:
"""
from sklearn.metrics import confusion_matrix

fig, ax = plt.subplots(figsize=(10,10))
sns.heatmap(confusion_matrix(labels ,preds), annot=True, cbar_kws={'shrink':1}, square=True, ax=ax)
plt.tight_layout()
ax.set_aspect('equal')
plt.show()
plt.clf()
"""