In [34]:
##Author: Simona
#References
#https://pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html
#https://www.analyticsvidhya.com/blog/2020/01/first-text-classification-in-pytorch/
#https://towardsdatascience.com/media/935b97e7b4c541849529cf3b40e4e5ac
#https://machinelearningmastery.com/use-different-batch-sizes-training-predicting-python-keras/

import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.utils.rnn as rnn_utils
from numpy import load
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader



# torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
is_cuda = torch.cuda.is_available()
# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU not available, CPU used")

#List of all x and y files in the database
path = os.listdir("../../MachineLearning SourceCode/Dataset-vectorized/itasserSx9")
x_List = []
y_List = []
xyfileTuple = []

for names in path[:200]:                   ####choose here the files to process
    if names.endswith("-x-.npy"):
        x_List.append(names)
    else:
        y_List.append(names)

for xfile in x_List:
    for yfile in y_List:
        if (xfile.replace('-x-.npy','') == yfile.replace('-y-.npy','')):
            xyfileTuple.append((xfile,yfile))
            break
            
## Define dataset x_data, y_data
##lists of N_batches elements of shape (N_sequence, features)
x_data = []           # x as list of inputs array from different files
y_data = []           # y as list of outputs array from different files

N_batches = len(x_List)
for i in range(N_batches):
    x_i, y_i = load("../../MachineLearning SourceCode/Dataset-vectorized/itasserSx9"+xyfileTuple[i][0]), load("../../MachineLearning SourceCode/Dataset-vectorized/itasserSx9"+xyfileTuple[i][1])
    x_i, y_i = torch.from_numpy(x_i).type(torch.FloatTensor), torch.from_numpy(y_i).type(torch.FloatTensor) 
    x_data.append(x_i)
    y_data.append(y_i)
    
# Padding data to obtain x and y tensors with same amino-sequencelengths (by adding zeros manually) 
lengths = [len(x) for x in x_data]
longest_sequence = max(lengths)
batch_size = len(x_data)
D_in = x_data[0].shape[1]
D_out = y_data[0].shape[1]
padded_x = torch.zeros((N_batches, longest_sequence, D_in))    
padded_y = torch.zeros((N_batches, longest_sequence, D_out))

# copy over the actual sequences
for i, x_len in enumerate(lengths):
    x_i = x_data[i]
    y_i = y_data[i]
    padded_x[i, 0:x_len, :] = x_i[:x_len, :]         
    padded_y[i, 0:x_len, :] = y_i[:x_len, :]
    
def makedirs(dirname):
    if not os.path.exists(dirname):
        os.makedirs(dirname) 
makedirs('./Basic_LSTM_outputs')

GPU not available, CPU used


In [None]:
##x and y are tensors of shape (N_batch, N_sequence, D_in) and (N_batch,N_sequence, D_out) 

#split first 100 data in training and validation
x_train, x_valid, y_train, y_valid = train_test_split(padded_x[:100, :, :], padded_y[:100, :, :], test_size=0.2)

## Set parameters
#Data params
N_batch_train, N_seq, D_in = x_train.shape 
N_batch_valid = x_valid.shape[0]
D_out = 4 #number of classes, corresponding to the first 4 columns of y_data tensors
        #the last 2 columns contains angles values
    
# Network params
hidden_dim = 50
num_layers = 4
learning_rate = 1e-3
dtype = torch.float

In [None]:
# model for classiffication of secondary structures in types a, u, t, b
# Here we define LSTM model as a class
class LSTMstructures(nn.Module):

    def __init__(self, input_size, hidden_dim, num_layers, output_size):
        super().__init__()

        # Defining some parameters
        self.input_size = input_size
        self.output_size = output_size 
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

        #Defining the layers
        #self.dropout = nn.Dropout(0.3)
        # LSTM Layer
        self.lstm = nn.LSTM(input_size, hidden_dim, num_layers)      
        # Define the output layer
        self.linear = nn.Linear(hidden_dim, output_size)

    def forward(self, x):
        #x = self.dropout(x)
        lstm_out, hidden = self.lstm(x.view(len(x), 1, -1))
        output = self.linear(lstm_out.view(len(lstm_out), -1))
        out_scores = F.softmax(output)
        return out_scores
    

model = LSTMstructures(D_in, hidden_dim, num_layers, output_size=4)
criterion = torch.nn.CrossEntropyLoss()
optimiser = torch.optim.Adam(model.parameters(), lr=learning_rate)
model.to(device)

In [None]:
# Train model
model.to(device)
x_train.to(device)
y_train.to(device)
x_valid.to(device)
y_valid.to(device)
train_loss = torch.zeros((1000))  
valid_loss = torch.zeros((1000))  
best_val_loss = 1000
best_t = 0
patient = 0
t=0

# loop to learn the neural network
while True:
    
    sum_loss = 0.0
    for i in range(N_batch_train):  
        x = x_train[i, :, :].to(device)  #parse the x_train values
        y = y_train[i, :, :4].to(device)  #parse the first 4 columns values of y_train for classification

        model.zero_grad()
        tag_scores = model(x)
        loss = criterion(tag_scores, torch.max(y, 1)[1])
        loss.backward()
        optimiser.step()
        sum_loss+=loss

    train_loss[t] = sum_loss.item()/N_batch_train   
    
    if t % 100 == 0:
        print("epoch ", t, "Loss: ", sum_loss.item())
    
        
    ###calculate each epoch the validation loss
    ##use this for parameter tuning and early stopping
    sum_valid_loss = 0.0
    with torch.no_grad():
        for i in range(N_batch_valid):
            x = x_valid[i, :, :]
            y = y_valid[i, :, :4] 
            tag_scores_valid = model(x)
            loss = criterion(tag_scores_valid, torch.max(y, 1)[1])
            sum_valid_loss += loss
    valid_loss[t] = sum_valid_loss.item()/N_batch_valid
    
    if(sum_valid_loss.item() <= best_val_loss):   #here we impose early stopping
        best_val_loss = sum_valid_loss
        best_t = t
        patient = 0
        torch.save(model, './Basic_LSTM_outputs/mytraining_ss.pt')
    else:
        patient += 1
    if(patient>600): break
        
    t+=1
print(best_t)

# plot losses
import matplotlib.pyplot as plt
plt.cla()
plt.xlabel('epochs')
plt.ylabel('loss')
plt.plot(np.arange(best_t), train_loss.detach().numpy()[:best_t], 'b-', label='train_loss')
plt.plot(np.arange(best_t), valid_loss.detach().numpy()[:best_t], 'g-', label='valid_loss')
plt.legend()
fig = plt.gcf()
fig.set_size_inches(4, 4)
plt.savefig('./Basic_LSTM_outputs/loss', dpi=400, bbox_inches='tight')
plt.draw()

In [None]:
# accuracy of training and validation sets

#define metric
def binary_accuracy(preds, y):
    corrects = (preds.argmax(dim=1) == y.argmax(dim=1))
    acc = corrects.sum().float()/float(len(y))
    return acc

# Testing and accuracy score


lenghts = [len(y_data[i]) for i in range(N_batches)]
#model = torch.load('./Basic_LSTM_outputs/mytraining_ss.pt')
model.eval()

train_acc = torch.zeros((N_batch_train))
valid_acc = torch.zeros((N_batch_valid))

with torch.no_grad():
    
    for ii in range(N_batch_train):
        x = x_train[ii, :, :]
        y = y_train[ii, :, :4]
        true_lenght = lenghts[ii]
        tag_scores = model(x)
        
        tag_scores = tag_scores[:true_lenght, :]
        y = y[:true_lenght, :]
        train_acc[ii] = binary_accuracy(tag_scores, y) 

    
    for ii in range(N_batch_valid):
        x = x_valid[ii, :, :]
        y = y_valid[ii, :, :4]
        tag_scores = model(x)
        true_lenght = lenghts[N_batch_train+i]
        
        tag_scores = tag_scores[:true_lenght, :]
        y = y[:true_lenght, :]
        valid_acc[ii] = binary_accuracy(tag_scores, y) 

print(torch.mean(train_acc), 'train_acc')
print(torch.mean(valid_acc), 'valid_acc')

In [None]:
##This in case with early stopping included
##Test is performed on the next 20 values

x_test, y_test = padded_x[100:, :, :], padded_y[100:, :, :]
test_acc = torch.zeros((N_batch_valid))
with torch.no_grad():
    for ii in range(20):
        x = x_test[ii, :, :]
        y = y_test[ii, :, :4]
        tag_scores = model(x)
        true_lenght = lenghts[100+ii]
        
        tag_scores = tag_scores[:true_lenght, :]
        y = y[:true_lenght, :]
        test_acc[ii] = binary_accuracy(tag_scores, y) 

print(torch.mean(valid_acc), 'test_acc')

In [None]:
# Here we define RNN model as a class
class RNNstructures(nn.Module):
    def __init__(self, input_size, hidden_dim, num_layers, output_size):
        super().__init__()

        # Defining some parameters
        self.input_size = input_size
        self.output_size = output_size 
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

        #Defining the layers
        # RNN Layer
        self.rnn = nn.RNN(input_size, hidden_dim, num_layers)      
        # Define the output layer
        self.linear = nn.Linear(hidden_dim, output_size)

    def forward(self, x):
        rnn_out, hidden = self.rnn(x.view(len(x), 1, -1))
        output = self.linear(rnn_out.view(len(rnn_out), -1))
        out_scores = F.softmax(output)
        return out_scores

In [None]:
# Here we define GRU model as a class
class GRUstructures(nn.Module):

    def __init__(self, input_size, hidden_dim, output_size, num_layers):
        super().__init__()
        
        self.input_size = input_size
        self.output_size = output_size 
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

        # Define the GRU layer
        self.gru = nn.GRU(input_size, hidden_dim, num_layers, batch_first=True)
        # Define the output layer
        self.linear = nn.Linear(hidden_dim, output_size)


    def forward(self, x):
        gru_out, hidden = self.gru(x.view(len(x), 1, -1))
        output = self.linear(gru_out.view(len(gru_out), -1))
        out_scores = F.softmax(output)
        return out_scores