In [None]:
!pip install transformers

In [None]:
import pickle
import transformers
import json
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import gensim
import gensim.downloader as api
import torchtext
import torch
from torch import nn

In [None]:
path = '../input/minivqaiust/'

In [None]:
with open(path + "image_features.pickle", 'rb') as f:
    img = pickle.load(f)
with open(path + "image_question.json") as json_file:
      img_q = json.load(json_file)

In [None]:
df = pd.read_csv(path + "train.csv")
q_train_idx = list(df['question_id'])
label_train = list(df['label'])
df = pd.read_csv(path + "val.csv")
q_val_idx = list(df['question_id'])
label_val = list(df['label'])
df = pd.read_csv(path + "test.csv")
q_test_idx = list(df['question_id'])

In [None]:
questions_train = []
image_features_train = []
all_qs = {}

#change format for better performing
for idx, imq in img_q.items():
  for ques in imq:
    all_qs[ques[0]] = {'question':ques[1], 'image_id': str(idx)}
all_qs[131087000]

In [None]:
for idx in q_train_idx:
  questions_train.append(all_qs[idx]['question'])
  image_features_train.append(img[all_qs[idx]['image_id']])

In [None]:
questions_val = []
image_features_val = []

for idx in q_val_idx:
  questions_val.append(all_qs[idx]['question'])
  image_features_val.append(img[all_qs[idx]['image_id']])

In [None]:
questions_test = []
image_features_test = []

for idx in q_test_idx:
  questions_test.append(all_qs[idx]['question'])
  image_features_test.append(img[all_qs[idx]['image_id']])

In [None]:
tokenizer = transformers.BertTokenizer.from_pretrained('bert-base-uncased')
bertmodel = transformers.BertModel.from_pretrained('bert-base-uncased')

In [None]:
class BERTClassification(torch.nn.Module):
    def __init__ (self):
        super(BERTClassification, self).__init__()
        self.bert = bertmodel
        
    def forward(self, text):
        hidden_state = self.bert(text)['last_hidden_state']
        result = torch.mean(hidden_state, dim=1)
        return result

In [None]:
tmodel = BERTClassification()

In [None]:
def pad_bert(b):
    v = [tokenizer.encode(x) for x in b]
    # compute max length of a sequence in this minibatch
    l = max(map(len,v))
    return torch.stack([torch.nn.functional.pad(torch.tensor(t),(0,l-len(t)),mode='constant',value=0) for t in v])

padded_train = pad_bert(questions_train)
padded_val = pad_bert(questions_val)

In [None]:
padded_test = pad_bert(questions_test)

In [None]:
with torch.no_grad():
  question_features_train = tmodel(padded_train)
with torch.no_grad():
  question_features_val = tmodel(padded_val)

In [None]:
with torch.no_grad():
  question_features_test = tmodel(padded_test)

In [None]:
train_dataset = torch.utils.data.TensorDataset(question_features_train, torch.tensor(image_features_train), torch.tensor(label_train))
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)

In [None]:
val_dataset = torch.utils.data.TensorDataset(question_features_val, torch.tensor(image_features_val), torch.tensor(label_val))
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=True)

In [None]:
class VQA2(nn.Module):
    def __init__(self, features_size):
        super(type(self), self).__init__()
        #text feature
        self.bert_out = torch.nn.Linear(768, 512)
        self.linear = nn.Sequential(
            nn.BatchNorm1d(features_size),
            nn.Linear(features_size, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),         
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Linear(128, 10),
            nn.BatchNorm1d(10),
        )
                
        
    def forward(self, text, image):
        text_f = self.bert_out(text)
        feature = torch.cat([text_f,image], dim=1)
        logits = self.linear(feature)
        logits = nn.functional.softmax(logits, dim=1)
        return logits

In [None]:
vqa2 = VQA2(1024)

In [None]:
learning_rate = 0.08
epochs = 10
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(vqa2.parameters(), lr=learning_rate)

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (text, image, y) in enumerate(dataloader):        
        # Compute prediction and loss
        pred = model(text, image)
        loss = loss_fn(pred, y)
        
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(text)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


In [None]:
def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for text, image, y in dataloader:
            pred = model(text, image)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            
    test_loss /= size
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, vqa2, loss_fn, optimizer)
    test_loop(val_dataloader, vqa2, loss_fn)
print("Done!")