<a href="https://colab.research.google.com/github/prakashaditya369/mutlimodal-models/blob/main/Facebook_Hateful_Memes_%5BModel_2%5D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
import skimage.transform
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms.functional import to_tensor
import matplotlib.pyplot as plt
%matplotlib inline

USE_CUDA = torch.cuda.is_available()
device = torch.device("cuda" if USE_CUDA else "cpu")
# device = 'cpu'
print("Using device : ", device)

In [None]:
train_data = pd.read_json("/content/drive/My Drive/Facebook Hateful Memes/train.jsonl",lines = True)
dev_data = pd.read_json("/content/drive/My Drive/Facebook Hateful Memes/dev.jsonl",lines = True)
test_data = pd.read_json("/content/drive/My Drive/Facebook Hateful Memes/test.jsonl",lines = True)

In [None]:
print("Train Size:",train_data.shape[0])
print("Test Size:",test_data.shape[0])
print("Dev Size:",dev_data.shape[0])

In [None]:
def progress_batch(i,Len, width=30):
  left = int(width * (i+1)*100/Len) // 100
  right = width - left
  print('\r[', '*' * left, ' ' * right, ']',f'{str(i+1)}/{str(Len)}', sep='', end='', flush=True)
def progress(percent=0, width=30):
  left = int(width * percent) // 100
  right = width - left
  print('\r[', '#' * left, ' ' * right, ']',f' {percent:.2f}%',sep='', end='', flush=True)

#Text Preprocessing

##Working With Embedding

In [None]:
import re
def change_begari_things(sentences):
  chunk_words = {
      "i'm": "i am",
      "don't": "do not",
      "you're": "you are",
      "it's": "it is",
      "can't": "can not",
      "that's": "that is",
      "doesn't": "does not",
      "i'll": "i will",
      "didn't": "did not",
      "he's":"he is",
      "what's": "what is",
      "there's": "there is",
      "isn't": "is not",
      "she's": "she is",
      "let's": "let us",
      "i've": "i have",
      "they're": "they are",
      "we're": "we are",
      "ain't": "am not",
      "you've": "you have",
      "aren't": "are not",
      "you'll": "you will",
      "here's": "here is",
      "haven't": "have not",
      "i'd": "i had",
      "they'll": "they will",
      "won't": "will not",
      "who's": "who is",
      "where's": "where is",
      "couldn't": "could not",
      "shouldn't": "should not",
      "wasn't": "was not",
      "we'll": "we will",
      "idk": "i do not know",
      "y'all": "you all",
      "wife's": "wife is",
      "hasn't": "has not",
      "she'll": "she will",
      "we've": "we have",
      "they've":"they have",
      "wouldn't": "would not",
      "name's": "name is",
      "why's": "why is",
      "that'd": "that would",
      "lyin'": "lying",
      "weren't": "were not"
  }
  final_sentences = []
  for sentence in sentences:
    for key in chunk_words.keys():
      if key in sentence:
        sentence = sentence.replace(key,chunk_words[key])
    sentence = re.sub(r"'[a-z] ", ' ', sentence)
    sentence = re.sub(r"'", ' ', sentence)
    final_sentences.append(sentence)
  return final_sentences

In [None]:
glove_path = "/content/drive/My Drive/Facebook Hateful Memes"
vectors = []
words = []
word2idx = {}
idx = 0
EMBED_DIM = 100
Glove_PATH = glove_path+"/glove.6B."+str(EMBED_DIM)+"d.txt"
with open(Glove_PATH, 'rb') as f:
    for l in f:
        line = l.decode().split()
        word = line[0]
        words.append(word)
        word2idx[word] = idx
        idx += 1
        vect = np.array(line[1:]).astype(np.float)
        vectors.append(vect)
glove = {w: vectors[word2idx[w]] for w in words}
print("GLove Loaded from",Glove_PATH)

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
texts = train_data['text']
sentences = change_begari_things(list(texts))
tokenizer  = Tokenizer(oov_token='unk')
tokenizer.fit_on_texts(sentences)
word_index = tokenizer.word_index
VOCAB_SIZE = len(word_index)
print("Vocab Size: ",len(word_index))

In [None]:
def get_embedding(glove,word_index):
  matrix_len = len(word_index)+1
  vocab = list(word_index)
  emb_dim = EMBED_DIM
  weights_matrix = np.zeros((matrix_len,EMBED_DIM))
  words_found = 0
  words_not_found = []
  for word in vocab:
    try:
      weights_matrix[word_index[word]] = glove[word]
      words_found+=1
    except:
      words_not_found.append(word)
      weights_matrix[word_index[word]] = np.random.normal(scale = 0.6,size = (emb_dim,))
  weights_matrix_tensor = torch.FloatTensor(weights_matrix).to(device)
  embedding = nn.Embedding.from_pretrained(weights_matrix_tensor).to(device)
  return embedding,matrix_len,emb_dim

In [None]:
embedding,vocab_size,emb_dim = get_embedding(glove,word_index)

In [None]:
sequences = tokenizer.texts_to_sequences(sentences)
padded = pad_sequences(sequences,maxlen = 32)
text_tensor = to_tensor(padded).squeeze(0).long().to(device)

In [None]:
def get_text_tensor(data, tokenizer = tokenizer):
  texts = data['text']
  sentences = change_begari_things(list(texts))
  sequences = tokenizer.texts_to_sequences(sentences)
  padded = pad_sequences(sequences,maxlen = 32)
  text_tensor = to_tensor(padded).squeeze(0).long()
  return text_tensor

#Image Feature Processing

In [None]:
def get_image_tensor(PATH,for_conv = False):
  image_feature = np.load(PATH,allow_pickle=True)
  final_image_tensor = torch.empty(image_feature.shape[0],512,49)
  if for_conv:
    final_image_tensor = torch.empty(image_feature.shape[0],512,7,7)
  for i,image in enumerate(image_feature):
    if not for_conv:
      image = image.reshape(image.shape[0]*image.shape[1],image.shape[2])
      image = image.T
    image_tensor = to_tensor(image)
    final_image_tensor[i]= image_tensor
  return final_image_tensor

In [None]:
class TextImageLabels(Dataset):
  def __init__(self,text_tensor,image_tensor,label_tensor):
    self.text = text_tensor
    self.image = image_tensor
    self.labels = label_tensor
  def __len__(self):
    return len(self.labels)
  def __getitem__(self, idx):
    return (self.text[idx],self.image[idx],self.labels[idx])

In [None]:
def get_dataset(PATH,data,tokenizer = tokenizer,for_conv = False):
  text_tensor = get_text_tensor(data,tokenizer)
  print("Text Tensor:",type(text_tensor),text_tensor.shape)
  image_tensor = get_image_tensor(PATH,for_conv)
  print("Image Tensor:",type(image_tensor),image_tensor.shape)
  label_tensor = torch.Tensor(data['label']).view(image_tensor.size(0),1)
  print("Label Tensor:", type(label_tensor),label_tensor.shape)
  dataset = TextImageLabels(text_tensor,image_tensor,label_tensor)
  return dataset

In [None]:
PATH_train = "/content/drive/My Drive/Facebook Hateful Memes/train_channel_features.npy"
train_dataset = get_dataset(PATH_train,train_data,tokenizer,for_conv = True)
PATH_dev = "/content/drive/My Drive/Facebook Hateful Memes/dev_channel_features.npy"
dev_dataset = get_dataset(PATH_dev,dev_data,tokenizer,for_conv = True)
PATH_test = "/content/drive/My Drive/Facebook Hateful Memes/test_channel_features.npy"
# dev_dataset = get_dataset(PATH_test,test_data,tokenizer)

#Model

In [None]:
VOCAB_SIZE = len(word_index)+1
HIDDEN_SIZE = 64
print(VOCAB_SIZE,HIDDEN_SIZE,EMBED_DIM)

In [None]:
class imageModel(nn.Module):
  def __init__(self,out_channel = 50):
    super(imageModel,self).__init__()
    self.conv1 = nn.Conv2d(512,out_channel,kernel_size=3,padding=1)
  def forward(self,x):
    x = self.conv1(x)
    x = x.view(x.size(0),x.size(1),-1)
    return x

In [None]:
class textModel(nn.Module):
  def __init__(self,mlp_size = 64, d_x = 128,dropout = 0.3):
    super(textModel,self).__init__()
    self.mlp_size = mlp_size
    self.d_x = d_x
    self.embed,self.vocab_size,self.embed_dim = get_embedding(glove,word_index)
    self.mlp = nn.Linear(self.embed_dim,self.mlp_size)
    self.dropout = nn.Dropout(p = 0.4)
    # self.LSTM = nn.LSTM(self.mlp_size,self.d_x,batch_first = True)
  def forward(self,x,hidden = None):
    embedded = self.embed(x)
    output = self.mlp(embedded)
    output = torch.tanh(output)
    output = self.dropout(output)
    # output,_ = self.LSTM(output)
    return output

In [None]:
class GatedDotProduct(nn.Module):
  def __init__(self,d, d_g = 128):
    super(GatedDotProduct,self).__init__()
    self.d = d
    self.d_g = d_g
    # self.fcq = nn.Linear(d,d_g)
    # self.fck = nn.Linear(d,d_g)
    self.fcg = nn.Linear(d,2)
  def forward(self,K,Q):
    # k_output = self.fck(K)
    # q_output = self.fcq(Q)
    kmulq = Q*K
    M = torch.sigmoid(self.fcg(kmulq))
    M_q = M[:,:,0].unsqueeze(2)
    M_k = M[:,:,1].unsqueeze(2)
    d_tensor = torch.Tensor([self.d]).to(device)
    A = F.softmax(torch.matmul(Q*M_q,torch.transpose(K*M_k, 1, 2))/torch.sqrt(d_tensor),-1)
    return A

In [None]:
class GSA(nn.Module):
  def __init__(self,d,d_g=128):
    super(GSA,self).__init__()
    self.fcv = nn.Linear(d,d)
    self.fck = nn.Linear(d,d)
    self.fcq = nn.Linear(d,d)
    self.gdp = GatedDotProduct(d,d_g)
  def forward(self,z):
    v_output = self.fcv(z)
    k_output = self.fck(z)
    q_output = self.fcq(z)
    gdp_output = self.gdp(k_output,q_output)
    F = torch.matmul(gdp_output,v_output)
    return F


In [None]:
class UA(torch.nn.Module):
  def __init__(self,d,d_g,num_features):
    super(UA, self).__init__()
    self.gsaunit = GSA(d,d_g)
    self.batchnorm1 = nn.BatchNorm1d(num_features)
    # self.ffn1 = nn.Linear(d,4*d)
    # self.ffn2 = nn.Linear(4*d,d)
    self.dropout = nn.Dropout(p=0.4)
    # self.batchnorm2 = nn.BatchNorm1d(num_features)
  def forward(self,z):
    gsa_output = self.gsaunit(z)
    added = gsa_output+z
    output = self.batchnorm1(added)
    # output = self.ffn1(normalized)
    # output = F.relu(output)
    output = self.dropout(output)
    # output = self.ffn2(output)
    # added = normalized+output
    # normalized = self.batchnorm2(added)
    return output 


In [None]:
class Categorize(nn.Module):
  def __init__(self,d,num_features = 32+49,c_d = 64):
    super(Categorize,self).__init__()
    # self.fc1 = nn.Linear(num_features*d,8*c_d)
    # self.fc2 = nn.Linear(8*c_d,4*c_d)
    # self.fc3 = nn.Linear(4*c_d,c_d)
    # self.LSTM = nn.LSTM(d,2*c_d,batch_first = True,bidirectional=True)
    # self.fc4 = nn.Linear(c_d,1)
    self.fc = nn.Linear(2*32,1)
    self.conv2 = nn.Conv1d(num_features,32,3,padding = 1)
    self.conv3 = nn.Conv1d(num_features,32,5,padding = 2)
    self.maxpooling = nn.MaxPool1d(d)
    self.dropout = nn.Dropout(p=0.3)
  
  def forward(self,x):
    # x = x.view(x.size(0),-1)
    # _,(output,cell) = self.LSTM(x)
    # output = torch.cat((output[0],output[1]),1)
    # output = self.dropout(x)
    # output = self.fc1(x)
    # output = F.relu(output)
    # output = self.fc2(output)
    # output = F.relu(output)
    # output = self.dropout(output)
    # output = self.fc3(output)
    # output = F.relu(output)
    # output = self.fc4(output)
    # output = torch.sigmoid(output)
    output2 = self.conv2(x)
    output2 = F.relu(output2)
    output3 = self.conv3(x)
    output3 = F.relu(output3)
    output = torch.cat((output2,output3),1)
    output = self.maxpooling(output)
    output = output.squeeze(2)
    output = self.fc(output)
    output = torch.sigmoid(output)
    return output

In [None]:
class Model(nn.Module):
  def __init__(self,d,d_g,output_channels,num_features,c_d,mlp_size = 64,d_x = 128,d_y = 512,text_dropout = 0.1,L=1):
    super(Model,self).__init__()
    self.text_model = textModel(mlp_size, d_x,text_dropout)
    self.image_model = imageModel(output_channels)
    self.fcx = nn.Linear(d_x,d)
    self.fcy = nn.Linear(d_y,d)
    self.L = L
    self.batchnorm1 = nn.BatchNorm1d(32)
    self.batchnorm2 = nn.BatchNorm1d(output_channels)
    self.UA_model1 = UA(d,d_g,num_features).to(device)
    # self.UA_model2 = UA(d,d_g,num_features).to(device)
    # self.UA_model3 = UA(d,d_g,num_features).to(device)
    # self.UA_model4 = UA(d,d_g,num_features).to(device)
    # self.UA_model5 = UA(d,d_g,num_features).to(device)
    # self.UA_model6 = UA(d,d_g,num_features).to(device)
    # self.UA_model7 = UA(d,d_g,num_features).to(device)
    self.categorize = Categorize(d,num_features,c_d)
  
  def forward(self,text,image):
    text_output = self.text_model(text)
    image = self.image_model(image)
    # text_output = self.fcx(text_output)
    image_output = self.fcy(image)
    text_output = self.batchnorm1(text_output)
    image_output = self.batchnorm2(image_output)
    z = torch.cat((text_output,image_output),1).to(device)
    z = self.UA_model1(z)
    # z = self.UA_model2(z)
    # z = self.UA_model3(z)
    # z = self.UA_model4(z)
    # z = self.UA_model5(z)
    # z = self.UA_model6(z)
    # z = self.UA_model7(z)
    output = self.categorize(z)
    return output

In [None]:
output_channels = 100
num_features = 32+output_channels
model = Model(d = 64,d_g = 32,output_channels=output_channels,num_features = num_features,c_d = 16,L=2,d_y = 49).to(device)

In [None]:
model

# Optimizer and Loss Function

In [None]:
import torch.optim as optim
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters())

#Training Part

##Training and Evaluation Function

In [None]:
def train(model,dataset,epochs,batch_size=10,criterion = nn.BCELoss(),optimizer = "Adam"):
  criterion = nn.BCELoss()
  if optimizer == "Adam":
    optimizer = optim.Adam(model.parameters())
  elif optimizer == "SGD":
    optimizer = optim.SGD(model.parameters(),lr = 0.001)
  elif optimizer == "AdaDelta":
    optimizer = optim.Adadelta(model.parameters())
  else:
    print("Choose from Adam,SGD,AdaDelta")
    return
  train_dl = DataLoader(dataset,batch_size = batch_size,shuffle = True)
  length = len(train_dl)
  data_length = dataset.__len__()
  for epoch in range(epochs):
    running_loss = 0.0
    print("Epoch: {}".format(epoch+1))
    for i, data in enumerate(train_dl):
      progress_batch(i,length)
      x,image,label = data[0].to(device),data[1].to(device),data[2].to(device)
      optimizer.zero_grad()
      outputs = model(x,image)
      loss = criterion(outputs,label)
      loss.backward()
      optimizer.step()
      running_loss+=loss.item()
    running_loss/=data_length
    print("   Loss:",f' {running_loss:.5f}')
  print("Finished Training")

In [None]:
def evaluate(model,dataset,criterion = nn.BCELoss()):
  correct = 0
  total = dataset.__len__()
  BATCH_SIZE = 1000
  total_loss = 0
  val_dl = DataLoader(dataset,batch_size = BATCH_SIZE)
  with torch.no_grad():
    for data in val_dl:
      x,image,label = data[0].to(device),data[1].to(device),data[2].to(device)
      outputs = model(x,image)
      loss = criterion(outputs,label)
      outputs = outputs.detach()
      outputs = outputs.squeeze(1)
      label = label.squeeze(1)
      predicted = (outputs>0.5).float()
      result = torch.sum(predicted==label)
      correct+=result.item()
      total_loss+=loss.item()
  total_loss/=total
  accuracy = correct/total
  print("Accuracy: {} || Loss: {:.5f}".format(accuracy,total_loss))


##Saving and Loading Function

In [None]:
from datetime import datetime
def save(model,name = None):
  now = datetime.now()
  dt_string = now.strftime("%d-%m-%Y:%H:%M:%S")
  PATH = "/content/drive/My Drive/Facebook Hateful Memes/Model/"
  if name is not None:
    PATH+=name
  else:
    PATH+=dt_string
  PATH+=".pth"
  torch.save(model, PATH)
  print("Successfully Saved at",PATH)

##Using Cross Validation.

In [None]:
from sklearn.model_selection import KFold
kfold = KFold(n_splits=10)
train_text_tensor = get_text_tensor(train_data,tokenizer)
train_image_tensor = get_image_tensor(PATH_train,for_conv = True)
train_label_tensor = torch.Tensor(train_data['label']).view(train_image_tensor.size(0),1)
dev_text_tensor = get_text_tensor(dev_data,tokenizer)
dev_image_tensor = get_image_tensor(PATH_dev, for_conv=True)
dev_label_tensor = torch.Tensor(dev_data['label']).view(dev_image_tensor.size(0),1)
text_tensor = torch.cat((train_text_tensor,dev_text_tensor),0)
image_tensor = torch.cat((train_image_tensor,dev_image_tensor),0)
label_tensor = torch.cat((train_label_tensor,dev_label_tensor),0)
print("Text Tensor:",text_tensor.shape)
print("IMage Tensor:",image_tensor.shape)
print("Label Tensor:",label_tensor.shape)
print("Start Training:")
for fold, (train_index, test_index) in enumerate(kfold.split(train_text_tensor)):
  print("Fold #:",fold+1)
  text_train_fold = train_text_tensor[train_index]
  image_train_fold = train_image_tensor[train_index]
  label_train_fold = train_label_tensor[train_index]
  text_test_fold = text_tensor[test_index]
  image_test_fold = image_tensor[test_index]
  label_test_fold = label_tensor[test_index]
  train_fold_dataset = torch.utils.data.TensorDataset(text_train_fold,image_train_fold,label_train_fold)
  test_fold_dataset = torch.utils.data.TensorDataset(text_test_fold,image_test_fold,label_test_fold)
  train(model,train_fold_dataset,5,batch_size=100,criterion = nn.BCELoss(),optimizer = "Adam")
  evaluate(model,test_fold_dataset)
  print("Validation Accuracy:")
  evaluate(model,dev_dataset)

In [None]:
print("Training Accuracy:")
evaluate(model,train_dataset)
print("Validation Accuracy:")
evaluate(model,dev_dataset)

In [None]:
save(model,"withpreembed100reg-512x49-acc0.953valacc0.514")

In [None]:
model_check = torch.load("/content/drive/My Drive/Facebook Hateful Memes/Model/512x49-acc0.99valacc0.516.pth")
evaluate(model_check,train_dataset)

##Testing Part

In [None]:
text_tensor = get_text_tensor(test_data)
ids = test_data['id']
print("Text Tensor:",type(text_tensor),text_tensor.shape)
image_tensor = get_image_tensor(PATH_test,alternate =True)
print(image_tensor.shape)

In [None]:
with torch.no_grad():
  x = text_tensor.to(device)
  image = image_tensor.to(device)
  outputs = model(x,image)
  outputs = outputs.detach()
  output = outputs.squeeze(1)
  predicted = (output>0.5).int()
final_output = np.array(output.cpu())
final_predicted = np.array(predicted.cpu())
print(ids.shape,final_output.shape,final_predicted.shape)

In [None]:
data = {'id':list(ids),'proba':list(final_output),'label':list(final_predicted)}
df = pd.DataFrame(data)

In [None]:
df.head()

In [None]:
now = datetime.now()
dt_string = now.strftime("%d-%m-%Y:%H:%M:%S")
name = "withpreembed100reg-512x49-acc0.953valacc0.514"
PATH = "/content/drive/My Drive/Facebook Hateful Memes/SubmissionFile/"+name+".csv"
df.to_csv(PATH,index=False)
print("saved successfully at",PATH)