# Import Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!unzip 'drive/MyDrive/Colab Notebooks/uw-cs480-fall20.zip'

## Import models

In [None]:
from google.colab import files
uploaded = files.upload()
%ls

In [2]:
import pandas as pd
import numpy as np
import math
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tqdm import tqdm
# PyTorch libraries and modules
import torch
import torch.nn as nn
import torchvision
from torch.autograd import Variable
from torch.nn import Linear, ReLU, CrossEntropyLoss, Sequential, Conv2d, MaxPool2d, Module, Softmax, BatchNorm2d, Dropout


from torch.optim import Adam, SGD
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

import time

from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay


# create ensemble model

In [None]:
class MyEnsemble(nn.Module):
    def __init__(self, _cnn, _rnn, _nn):
        super(MyEnsemble, self).__init__()
        self.cnn = _cnn
        self.rnn = _rnn
        self.nn = _nn
        self.classifier2 = nn.Linear(81, 200)
        self.classifier1 = nn.Linear(200, 100)
        self.classifier = nn.Linear(100, 27)
        
    def forward(self, image, description, l, features):
        x1 = self.cnn(image)
        x2 = self.rnn(description, l)
        x3 = self.nn(features)


        x = torch.cat((x1, x2, x3), dim=1)
        x = self.classifier2(F.relu(x))
        x = self.classifier1(F.relu(x))
        x = self.classifier(F.relu(x))
        return x

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim = True)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc

def train(model, iterator, optimizer, criterion, device):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for (image, (description, l), features, y) in iterator:

        optimizer.zero_grad()

        y_pred = model(image, description, l, features)
        
        
        loss = criterion(y_pred, y)
        
        acc = calculate_accuracy(y_pred, y)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def evaluate_nn(model, iterator, criterion, device):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():

        for (image, (description, l), features, y) in iterator:

            description = description.to(device, dtype=torch.long)
            # l = l.to(device, dtype=torch.long)
            l = l.long()
            
            image = image.to(device)
            features = features.to(device)
        
            y = y.to(device, dtype=torch.int64)
                    
            # y_pred = model(image)
            y_pred = model(image, description, l, features)
            
            loss = criterion(y_pred, y)
            
            acc = calculate_accuracy(y_pred, y)
            
            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
riterion = CrossEntropyLoss()
criterion = criterion.cuda()


cnn_model = CNN()
cnn_model.load_state_dict(torch.load('cnn-model.pt'))


rnn_model = RNN()
rnn_model.load_state_dict(torch.load('rnn-model.pt'))
# test_loss, test_acc = evaluate_cnn(rnn_model, cnn_valid_iterator, criterion, device)
# print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

nn_model = NN(4, 15, 27)
nn_model.load_state_dict(torch.load('linear-model.pt'))

ensamble_model = MyEnsemble(cnn_model, rnn_model, nn_model)

optimizer = Adam(ensamble_model.parameters(), lr=0.0001)

In [None]:
model.load_state_dict(torch.load('output/combine-model.pt'))
test_loss, test_acc = evaluate_cnn(model, cnn_valid_iterator, criterion, device)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')


train_accuracy_list = []
train_loss_list = []
valid_acc_list = []
valid_loss_list = []
best_valid_loss = test_loss

for epoch in range(EPOCHS):

    start_time = time.monotonic()

    # train
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion, device)
    
    # valid
    valid_loss, valid_acc = evaluate_cnn(model, valid_iterator, criterion, device)

    # save best model
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'output/combine-model.pt')

    # Track the accuracy
    train_accuracy_list.append(train_acc)
    train_loss_list.append(train_loss)
    valid_acc_list.append(valid_acc)
    valid_loss_list.append(valid_loss）

# Output prediction