Load required packages

In [0]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import pickle
import cv2
import zipfile
import os

Load the zipped data file of the specified type.

In [0]:
dataset = "BERT"

In [0]:
if not(os.path.exists(dataset)):
  with zipfile.ZipFile(dataset + ".zip", 'r') as zip_ref:
    zip_ref.extractall("")

In [0]:
# Load data
with open(dataset + "/X.pkl", "rb") as f:
  X = pickle.load(f)

with open(dataset + "/y.pkl", "rb") as f:
  y = pickle.load(f)

X = X.to('cuda').float()
y = y.to('cuda').float()
y = (y> 10).float()

In [0]:
indices = np.arange(0, X.shape[0])
np.random.shuffle(indices)
train_indices = indices[:(X.shape[0] - 1000)]
test_indices = indices[(X.shape[0] - 1000):]

y_train = y[train_indices]
y_test = y[test_indices]
X_train = X[train_indices, :, :]
X_test = X[test_indices, :, :]

Define an RNN architecture for modelling.

In [0]:
# Define the network as a class
class RNNNetwork(torch.nn.Module):

    def __init__(self, input_size):
        super(RNNNetwork, self).__init__()
        
        #  Recurrent (sequence) layer
        self.lstm = torch.nn.LSTM(input_size, 100)

        # Two linear layers
        self.linear1 = torch.nn.Linear(100, 1)

        # Dropout of 50% of values
        self.dropout = torch.nn.Dropout(0.8)

        # Sigmoid activation
        self.sigmoid = torch.nn.Sigmoid()

    # Performs the forward pass
    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.linear1(out)
        out = self.sigmoid(out)
        return out[len(x)-1]

In [0]:
# Declare a model and prepare for training
rnn_model = RNNNetwork(X_train.shape[2])
rnn_model.to('cuda')
rnn_model.train(True)
rnn_model.float()

# Use the mse loss for a criterion
criterion = torch.nn.MSELoss()

# Define the adam optimizer on the discriminator network
rnn_optimizer = torch.optim.Adam(rnn_model.parameters(), lr=0.000001,
                                 betas=(0.9, 0.9999))


Train the model:

In [0]:
# Save the losses
losses_train = []
losses_test = []
epochs = 80
batch_size=1

# Use 50 epochs
for i in range(0, epochs+1):
    
    # Every tenth iteration
    if (i % 10 == 0):

      # Track the epoch count
      print(i, " of ", epochs)

      # Get loss on the training data
      epoch_losses_train = []
      for j in range(0, int(X_train.shape[0]/batch_size)):
      
          # Zero the  gradient
          rnn_optimizer.zero_grad()

          # Select inputs
          overall_loss = 0
          for k in range(batch_size):
            train_input = X_train[(j+k):(j+k+1),:, :]
            train_input = X_train[(j+k):(j+k+1),:((np.where(train_input.sum(axis=2).cpu().numpy() != -300))[1]).max(), :]
            train_input = train_input.reshape(train_input.shape[1], 1, train_input.shape[2])

            # Skip this input if needed
            if train_input.shape[0] == 0:
              continue

            # Get outputs and truth
            train_output = rnn_model(train_input)
            true_output = y_train[(j+k):(j+k+1)]
            
            # Calculate the loss and update the model
            loss = criterion(train_output, true_output)
            overall_loss = overall_loss + loss

          overall_loss.backward()
          rnn_optimizer.step()

          # Save the loss for this training iteration
          epoch_losses_train.append(overall_loss)

      # Set training to false
      rnn_model.train(False)

      # Get loss on the testing data
      epoch_losses_test = []
      for j in range(0, int(X_test.shape[0])):

          # Select inputs
          train_input = X_test[j:(j+1),:, :]
          train_input = X_test[(j):(j+1),:((np.where(train_input.sum(axis=2).cpu().numpy() != -300))[1]).max(), :]
          train_input = train_input.reshape(train_input.shape[1], 1, train_input.shape[2])

          # Sip if neceeded
          if train_input.shape[0] == 0:
            continue

          # Get outputs and truth
          train_output = rnn_model(train_input)
          true_output = y_test[j:(j+1)]
          
          # Calculate the save the loss, but don't update the model
          loss = criterion(train_output, true_output)
          epoch_losses_test.append(loss)

      # Save the losses
      losses_train.append((sum(epoch_losses_train) / len(epoch_losses_train)).sqrt())
      losses_test.append((sum(epoch_losses_test) / len(epoch_losses_test)).sqrt())

      # Allow model to train on the next iterations
      rnn_model.train(True)
    
    else:

      # Go across the entire dataset
      for j in range(0, int(X_train.shape[0]/batch_size)):
      
          # Zero the  gradient
          rnn_optimizer.zero_grad()

          # Select inputs
          overall_loss = 0
          for k in range(0, batch_size):
            train_input = X_train[(j+k):(j+k+1),:, :]
            train_input = X_train[(j+k):(j+k+1),:((np.where(train_input.sum(axis=2).cpu().numpy() != -300))[1]).max(), :]
            train_input = train_input.reshape(train_input.shape[1], 1, train_input.shape[2])

            # Skip this input if needed
            if train_input.shape[0] == 0:
              continue

            # Get outputs and truth
            train_output = rnn_model(train_input)
            true_output = y_train[(j+k):(j+k+1)]
            
            # Calculate the loss and update the model
            loss = criterion(train_output, true_output)
            overall_loss = overall_loss + loss

          overall_loss.backward()
          rnn_optimizer.step()


0  of  80


  return F.mse_loss(input, target, reduction=self.reduction)


10  of  80
20  of  80
30  of  80
40  of  80
50  of  80
60  of  80
70  of  80
80  of  80


Evaluate the model on both training and testing data:

In [0]:
# Turn training off
rnn_model.train(False)

# Track accuracies
accuracy_train = []
for j in range(0, int(X_train.shape[0])):

    # Select inputs
    train_input = X_train[j:(j+1),:, :]
    train_input = X_train[(j):(j+1),:((np.where(train_input.sum(axis=2).cpu().numpy() != -300))[1]).max(), :]
    train_input = train_input.reshape(train_input.shape[1], 1, train_input.shape[2])

    # Skip if 0 length
    if train_input.shape[0] == 0:
      continue

    # Calculate hte model output
    train_output = rnn_model(train_input)
    true_output = y_train[j:(j+1)]

    # Check if it was correct
    correct = (train_output > 0.5) == true_output
    accuracy_train.append(correct.float())

# Do same on testing data
accuracy_test = []
for j in range(0, int(X_test.shape[0])):

    # Select inputs
    train_input = X_test[j:(j+1),:, :]
    train_input = X_test[(j):(j+1),:((np.where(train_input.sum(axis=2).cpu().numpy() != -300))[1]).max(), :]
    train_input = train_input.reshape(train_input.shape[1], 1, train_input.shape[2])

    if train_input.shape[0] == 0:
      continue

    train_output = rnn_model(train_input)
    true_output = y_train[j:(j+1)]

    correct = (train_output > 0.5) == true_output
    accuracy_test.append(correct.float())

# Calculate overall accuracies
print((sum(accuracy_train)[0][0]) / len(accuracy_train))
print((sum(accuracy_test)[0][0]) / len(accuracy_test))

tensor(0.8281, device='cuda:0')
tensor(0.8340, device='cuda:0')


Get values needed for confusion matrices

In [0]:
accuracy_train_0 = accuracy_train[(y_train == 0).cpu().numpy()]
accuracy_train_1 = accuracy_train[(y_train == 1).cpu().numpy()]

print(sum(accuracy_train_0))
print(sum(accuracy_train_1))

print(len(accuracy_train_0))
print(len(accuracy_train_1))


accuracy_test_0 = accuracy_test[(y_test == 0).cpu().numpy()]
accuracy_test_1 = accuracy_test[(y_test == 1).cpu().numpy()]

print(sum(accuracy_test_0))
print(sum(accuracy_test_1))

print(len(accuracy_test_0))
print(len(accuracy_test_1))


2677.0
550.0
3233
664
694.0
140.0
831
169
