In [37]:
import torch
import torch.nn as nn
import torch.utils.data as data
import os
import numpy as np
import json

In [38]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [39]:
def expandData(weekly_songs, encoded):
    res = []
    for pos in range(len(encoded)):
        item = []
        for t in range(len(encoded[pos])):
            code = encoded[pos][t]
            item.append(weekly_songs[code[0]][code[1]])
        res.append(item)
    
    return res

In [40]:
class JsonDataset(data.Dataset):
    def __init__(self, data_path):
        f = open(data_path, 'r')
        d = json.loads(f.read())
        self.weekly_songs = d['weekly_songs']
        self.data = d['data']
        f.close()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return expandData(self.weekly_songs, self.data[index][0]), \
            torch.FloatTensor(self.data[index][1]), \
            torch.LongTensor([self.data[index][2]])

In [41]:
train_data = JsonDataset('/kaggle/input/cpsc490/combined-train.json')
validation_data = JsonDataset('/kaggle/input/cpsc490/combined-validation.json')
test_data = JsonDataset('/kaggle/input/cpsc490/combined-test.json')

In [42]:
x_seq_num = len(train_data[0][0])
x_seq_size = 10
rnn_num_layers = 2

x_size = train_data[0][1].shape[0]
fc_hidden_size = 200
fc_num_layers = 5

batch_size = 1

In [43]:
params = {'batch_size': batch_size, 'shuffle': True, 'num_workers': 1, 'pin_memory': True}
train_loader = data.DataLoader(train_data, **params)
validation_loader = data.DataLoader(validation_data, **params)
test_loader = data.DataLoader(test_data, **params)

In [44]:
class RNN(nn.Module):
    def __init__(self, x_seq_size, rnn_num_layers):
        super(RNN, self).__init__()
        
        self.rnn_num_layers = rnn_num_layers

        self.rnn = nn.LSTM(x_seq_size, x_seq_size, rnn_num_layers,
                          batch_first = True)

    def forward(self, x_seq):
        h0 = torch.zeros(self.rnn_num_layers, x_seq.shape[0], x_seq.shape[2]).to(device)
        c0 = torch.zeros(self.rnn_num_layers, x_seq.shape[0], x_seq.shape[2]).to(device)
        
        out, _ = self.rnn(x_seq, (h0, c0))
        return out[:, -1, :]

In [45]:
class Naive(nn.Module):
    def __init__(self, x_size, fc_hidden_size, fc_num_layers):
        super(Naive, self).__init__()
        
        d = 0.5
        
        seq = []
        seq.append(nn.Linear(x_size, fc_hidden_size))
        seq.append(nn.Tanh())
        seq.append(nn.Dropout(d))

        for _ in range(fc_num_layers - 1):
            seq.append(nn.Linear(fc_hidden_size, fc_hidden_size))
            seq.append(nn.Tanh())
            seq.append(nn.Dropout(d))
        
        seq.append(nn.Linear(fc_hidden_size, 2))
        self.fc = nn.Sequential(*seq)
        
    def forward(self, x):
        out = self.fc(x)
        return out

In [46]:
class Combined(nn.Module):
    def __init__(self, x_seq_size, rnn_num_layers, x_size, fc_hidden_size, fc_num_layers):
        super(Combined, self).__init__()
        
        self.rnn = RNN(x_seq_size, rnn_num_layers)
        self.rnn.load_state_dict(torch.load('/kaggle/input/cpsc490/best_rnn_2.pth'))
        
        self.naive = Naive(x_seq_num, fc_hidden_size, fc_num_layers)
        self.naive.load_state_dict(torch.load('/kaggle/input/cpsc490/best_naive_9.pth'))
        
    def forward(self, x_seq, x):
        next_input = None
        
        layer = None
        for s in x_seq:
            if s == []:
                res = torch.FloatTensor([0]).to(device)
                res = torch.reshape(res, (1, 1))
            else:     
                s = torch.FloatTensor(s).to(device)
                s = torch.reshape(s, (1, s.shape[0], s.shape[1]))
                res = self.rnn(s)
                res = torch.exp(-25 * torch.square(torch.norm(res - x, 2)))
                res = torch.reshape(res, (1, 1))
            
            if layer is None:
                layer = res
            else:
                layer = torch.cat((layer, res), dim=1)
            
        layer = torch.reshape(layer, (1, -1))
        if next_input is None:
            next_input = layer
        else:
            next_input = torch.cat((next_input, layer), dim=0)
            
        out = self.naive(next_input)
        return out

In [47]:
model = Combined(x_seq_size, rnn_num_layers, x_size, fc_hidden_size, fc_num_layers).to(device)

In [48]:
# Test
d = train_data + validation_data + test_data

model.eval()
with torch.no_grad():
    n_correct = 0
    total = 0
        
    for i in range(len(d)):
        x_seq, x, targets = d[i]
        x = x.to(device)
        targets = torch.flatten(targets).to(device)

        outputs = model(x_seq, x)
        _, predicted = torch.max(outputs.data, 1)

        total += 1
        n_correct += (predicted == targets).sum().item()
    
    acc = float(100 * n_correct / total)
    print('Test accuracy: {}%'.format(acc))

Test accuracy: 66.0%
