In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import sys
import os
import torch
import numpy as np
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data import DataLoader, Dataset
from torch.optim import Adam
import torch.nn as nn
from torch.optim.lr_scheduler import LambdaLR
from torch.nn import functional as F

In [None]:
base_lr = 0.0001
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
n_gpu = torch.cuda.device_count()

In [None]:
batch_size = 32
num_workers = 2
shuffle_dataset = True
random_seed = 451994
torch.manual_seed(random_seed)

In [None]:
cd /content/drive/MyDrive/multimodal_sentiment_analysis

In [None]:
class MyDataset(Dataset):
    def __init__(self, imgs, aud, text, target, length):
        self.imgs = imgs
        self.aud = aud
        self.text = text
        self.target = target
        self.length = length
        
    def __getitem__(self, index):
        
        return {'img': self.imgs[index], 'aud': self.aud[index],
                'text': self.text[index], 'target': self.target[index],
                'length':self.length[index]}
    
    def __len__(self):
        return len(self.data)

In [None]:
train_data = dict()
val_data = dict()
for fold in range(1):
    train_imgs = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/train' + str(fold) + '_image.npy')
    train_aud = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/train' + str(fold) + '_audio.npy')
    train_text = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/train' + str(fold) + '_text.npy')
    train_labels = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/train' + str(fold) + '_labels.npy')
    train_lengths = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/length_train' +str(fold) + '.npy')
    val_imgs = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/val' + str(fold) + '_image.npy')
    val_aud = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/val' + str(fold) + '_audio.npy')
    val_text = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/val' + str(fold) + '_text.npy')
    val_labels = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/val' + str(fold) + '_labels.npy')
    val_lengths = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/length_val' +str(fold) + '.npy')
    train_size, val_size = train_imgs.shape[0], val_imgs.shape[0]
    train_indices, val_indices = list(range(train_size)), list(range(val_size))
    train_sampler = SubsetRandomSampler(train_indices)
    valid_sampler = SubsetRandomSampler(val_indices)
    train_dataset = MyDataset(train_imgs, train_aud, train_text, train_labels,
                              train_lengths)
    val_dataset = MyDataset(val_imgs, val_aud, val_text, val_labels,
                            val_lengths)
    train_loader = DataLoader(train_dataset,
                          sampler=train_sampler,
                          batch_size=batch_size,
                          pin_memory=True,
                          shuffle=False,
                          drop_last=False,
                            )
    val_loader = DataLoader(val_dataset,
                            sampler=valid_sampler,
                            batch_size=batch_size,
                            num_workers=num_workers,
                            pin_memory=True,
                            shuffle=False,
                            drop_last=False,
                            )
    train_data[fold] = train_loader
    val_data[fold] = val_loader

In [None]:
class Model(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout, layers, bidirectional_flag):
        super().__init__()
        self.rnn = nn.GRU(input_dim, hidden_dim, num_layers=layers, bidirectional=bidirectional_flag, batch_first=True)
        if bidirectional_flag == True:
          self.fc = nn.Linear(2*hidden_dim, output_dim)
        else:
          self.fc = nn.Linear(hidden_dim, 200)
        self.dropout = nn.Dropout(dropout)
        self.num_layers = layers
        self.hidden_dim = hidden_dim
        self.atten_weight_b = nn.Linear(hidden_dim, hidden_dim)
        self.atten_weight_f = nn.Linear(hidden_dim, hidden_dim)
        self.bidirectional_used = bidirectional_flag

    def attention(self, output):
        out_f, out_b = output[:, :, :self.hidden_dim], output[:, :, self.hidden_dim:]
        out_f, out_b = self.atten_weight_f(out_f), self.atten_weight_b(out_b)
        fwd_atten = torch.bmm(out_f, out_f.permute(0, 2, 1))
        bwd_atten = torch.bmm(out_b, out_b.permute(0, 2, 1))
        fwd_atten = F.softmax(fwd_atten, 1)
        bwd_atten = F.softmax(bwd_atten, 1)
        out_atten_f, out_atten_b = torch.bmm(fwd_atten, out_f), torch.bmm(bwd_atten, out_b)
        out_atten = torch.cat((out_atten_f, out_atten_b), dim = -1)
        return out_atten
        
    def forward(self, x):
        output, _ = self.rnn(x)
        output = self.attention(output)
        out = self.fc(output)
        return output, out

In [None]:
def compute_accuracy(output, train_dic):
    batch_correct = 0.0
    batch_total = 0.0
    t = 0
    for i in range(output.shape[0]):
        req_len = torch.sum(train_dic['length'][i]).int()
        out_required = output[i][:req_len, :]
        target_required = train_dic['target'][i][:req_len].long()
        hap = (target_required == 3).float()
        t += hap.sum()
        pred = torch.argmax(out_required, dim = 1)
        correct_pred = (pred == target_required).float()
        tot_correct = correct_pred.sum()
        batch_correct += tot_correct
        batch_total += req_len  
    return batch_correct/batch_total

In [None]:
def compute_loss(output, train_dic):
    batch_loss = 0.0
    for i in range(output.shape[0]):
        req_len = torch.sum(train_dic['length'][i]).int()
        loss = nn.CrossEntropyLoss(ignore_index = 4)(output[i][:req_len, :],
                                                     train_dic['target'][i][:req_len].long().to(device))
        batch_loss += loss
    return batch_loss/output.shape[0]

In [None]:
epochs = 20
# in your training loop:
for fold in range(1):
    final_val_loss = 999999
    train_loader = train_data[fold]
    val_loader = val_data[fold]
    text_model = Model(200, 50, 4, 0.2, 2, True).double()
    text_model.to(device)
    optimizer = Adam(text_model.parameters(), lr=0.001)
    for e in range(epochs):
        tot_loss, tot_acc = 0.0, 0.0
        text_model.train()
        for ind, train_dic in enumerate(train_loader):
            text_model.zero_grad()
            inp = train_dic['text'].permute(0, 2, 1).double()
            _, out = text_model.forward(inp.to(device))
            train_dic['target'][train_dic['target'] == -1] = 4
            acc = compute_accuracy(out.cpu(), train_dic)
            loss = compute_loss(out.to(device), train_dic)
            tot_loss += loss.item()
            tot_acc += acc.item()
            loss.backward()
            optimizer.step()
        text_model.eval()
        val_loss, val_acc = 0.0, 0.0
        for ind, val_dic in enumerate(val_loader):
            inp = val_dic['text'].permute(0, 2, 1).double()
            _, val_out = text_model.forward(inp.to(device))
            val_dic['target'][val_dic['target'] == -1] = 4
            val_acc += compute_accuracy(val_out.cpu(), val_dic).item()
            val_loss += compute_loss(val_out, val_dic).item()
        if val_loss < final_val_loss:
            torch.save({'model_state_dict': text_model.state_dict(),
                        'optimizer_state_dict': optimizer.state_dict(),},
                        'best_model_text' + str(fold) + '.tar')
            final_val_loss = val_loss

        print("Epoch: ", str(e+1),
              "Training Loss: ", str(tot_loss/len(train_loader)),
              "|| Training Accuracy: ", str(tot_acc/len(train_loader)),
              "|| Validation Loss: ", str(val_loss/len(val_loader)),
              "|| Validation Accuracy: ", str(val_acc/len(val_loader)))

In [None]:
test_imgs = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/test_image.npy')
test_aud = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/test_audio.npy')
test_text = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/test_text.npy')
test_labels = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/test_labels.npy')
test_lengths = np.load('/content/drive/MyDrive/multimodal_sentiment_analysis/length_test.npy')

In [None]:
test_dataset = MyDataset(test_imgs, test_aud, test_text, test_labels,
                          test_lengths)

In [None]:
test_size = test_imgs.shape[0]
indices = list(range(test_size))
test_sampler = SubsetRandomSampler(indices)

In [None]:
test_loader = DataLoader(test_dataset,
                         sampler = test_sampler,
                         batch_size=batch_size,
                         num_workers=num_workers,
                         pin_memory=True,
                         shuffle=False,
                         drop_last=False,
                         )

In [None]:
for fold in range(1):

    text_model = Model(200, 50, 4, 0.2, 2, True).double()
    text_model.to(device)
    text_model.eval()
    

    checkpoint_text = torch.load('best_model_text'+str(fold)+'.tar')
    text_model.load_state_dict(checkpoint_text['model_state_dict'])

    for ind, test_dic in enumerate(test_loader):
        inp = test_dic['text'].permute(0, 2, 1).double()
        out_text, out = text_model.forward(inp.to(device))
        test_dic['target'][test_dic['target'] == -1] = 4
        acc = compute_accuracy(out.cpu(), test_dic).item()
    print("Accuracy: ", acc)
