In [1]:
!git clone https://github.com/dev-meesjakob/seminar-dlmb-rnn.git

Cloning into 'seminar-dlmb-rnn'...
remote: Enumerating objects: 186, done.[K
remote: Counting objects: 100% (186/186), done.[K
remote: Compressing objects: 100% (114/114), done.[K
remote: Total 186 (delta 72), reused 173 (delta 62), pack-reused 0[K
Receiving objects: 100% (186/186), 10.51 MiB | 15.13 MiB/s, done.
Resolving deltas: 100% (72/72), done.


In [2]:
!mkdir /content/seminar-dlmb-rnn/models # create the directory to save models in

In [3]:
import pandas as pd

# Read the protein sequence data
with open("/content/seminar-dlmb-rnn/dataset/data.txt", "r") as f:
    sequences = f.readlines()

# Read the labels
with open("/content/seminar-dlmb-rnn/dataset/labels.txt", "r") as f:
    labels = f.readlines()

# Remove newline characters
sequences = [seq.strip() for seq in sequences]
labels = [label.strip() for label in labels]

# Convert labels to integers
labels = [int(label) for label in labels]

# Create a DataFrame
df = pd.DataFrame({
    'sequence': sequences,
    'label': labels
})

df.head()

Unnamed: 0,sequence,label
0,ANDENYALAA,0
1,MPKTKPKVKNHKRNKTEPSPKQP,0
2,LKHFEDWSTAMLTA,0
3,MLISAYPKVSLGMVKLVLMVDLSAPKRLGG,0
4,RNAHNFPLDLAAIEAPSTNG,0


In [4]:
def feature_label_split(df, target_col):
  y = df[[target_col]]
  X = df.drop(columns=[target_col])
  return X, y

def train_val_test_split(df, target_col, test_ratio):
    validation_ratio = test_ratio / (1 - test_ratio)
    X, y = feature_label_split(df, target_col)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_ratio, shuffle=False)
    X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size=validation_ratio, shuffle=False)

    return X_train, X_validation, X_test, y_train, y_validation, y_test

#X_train, X_validation, X_test, y_train, y_validation, y_test = train_val_test_split(df, 'label', 0.25)

In [5]:
#train = torch.utils.data.TensorDataset(torch.Tensor(X_train),torch.Tensor(y_train))
#validation = torch.utils.data.TensorDataset(torch.Tensor(X_validation),torch.Tensor(y_validation))
#test = torch.utils.data.TensorDataset(torch.Tensor(X_test),torch.Tensor(y_test))

In [6]:
from sklearn.model_selection import train_test_split

df_train, df_test = train_test_split(df, train_size=0.8, shuffle=True) # shuffle=True is important here since the raw data is ordered negative to positive
df_train, df_val = train_test_split(df_train, test_size=0.25, shuffle=True) # split the training set into training and validation

df_train.reset_index(drop=True, inplace=True)
df_test.reset_index(drop=True, inplace=True)
df_val.reset_index(drop=True, inplace=True)

In [7]:
all_acids = ['A','R','N','D','C','Q','E','G','H','I','L','K','M','F','P','O','S','U','T','W','Y','V','B','Z','X','J','0'] # each letter corresponds to an amino acid, x is an unknown acid and 0 will be used for padding
n_acids = len(all_acids)

In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()
targets = le.fit_transform(all_acids)

all_acids_tensor = torch.Tensor(targets)
one_hot_acids = F.one_hot(all_acids_tensor.long()) # this lets us encode every acid as a one hot vector of length n_acids

# returns the one hot vector corresponding to the input
def acid_to_tensor(acid):
  tensor = torch.zeros(1, n_acids)
  tensor[0] = one_hot_acids[all_acids.index(acid)]
  return tensor

# returns a tensor of one hot vectors for the input protein
def protein_to_tensor(protein):
  tensor = torch.zeros(len(protein), 1, n_acids)
  for li, acid in enumerate(protein):
    tensor[li] = acid_to_tensor(acid)
  return tensor

In [9]:
from torch.utils.data import Dataset, DataLoader

class ToxinDatasetReader(Dataset):
  def __init__(self, df: pd.DataFrame):
    self.df = df

  def __len__(self):
    return len(self.df)

  def __getitem__(self, idx: int):
    row = self.df.loc[idx]
    input_protein = list(row.sequence)
    len_protein = len(input_protein)
    labels = row.label
    return input_protein, len_protein, labels

train_dat = ToxinDatasetReader(df_train)
test_dat = ToxinDatasetReader(df_test)
val_dat = ToxinDatasetReader(df_val)

In [10]:
def collate_batch_rnn(data):
  _, lengths, labels  = zip(*data)
  max_len = max(lengths)

  features = torch.zeros(len(data), max_len, n_acids)
  labels = torch.tensor(labels)
  lengths = torch.tensor(lengths)

  for i in range(len(data)):
    padding = ["0"] * (max_len - len(data[i][0]))
    data[i][0].extend(padding)

  for i in range(len(data)):
    for j in range(len(data[i][0])):
      features[i][j] = acid_to_tensor(data[i][0][j])

  return features.float(), lengths.long(), labels.long()

In [11]:
batch_size = 32 # this should be tweaked and tested
train_dat_loader = DataLoader(train_dat, batch_size=batch_size, collate_fn=collate_batch_rnn, drop_last=True)
test_dat_loader = DataLoader(test_dat, batch_size=batch_size, collate_fn=collate_batch_rnn, drop_last=True)
val_dat_loader = DataLoader(val_dat, batch_size=batch_size, collate_fn=collate_batch_rnn, drop_last=True)

# test the dataloader
proteins_tensor, lengths, labels = next(iter(train_dat_loader)) # this gives a tensor containing 32 protein tensors which are all padded to be the same length as the longest in the batch

print(labels.size(0)) # tensor of dimension <batch_size x max_length x n_acids>

32


In [12]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using '{device}' device")

Using 'cpu' device


In [51]:
class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, output_size):
    super(RNN, self).__init__()

    self.hidden_size = hidden_size

    #self.input2hidden = nn.Linear(input_size, hidden_size, bias=False)
    #self.hidden2hidden = nn.Linear(hidden_size, hidden_size)
    #self.hidden2output = nn.Linear(hidden_size, output_size)
    self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
    self.fc = nn.Linear(hidden_size, output_size)
    self.sigmoid = nn.Sigmoid()

  def forward(self, x, lengths, hidden_state=None):
    if hidden_state is None:
      hidden_state = self.init_hidden(x.size(0))

    hidden_state.to(device)

    lengths, perm_idx = lengths.sort(0, descending=True)
    x = x[perm_idx]

    packed_input = nn.utils.rnn.pack_padded_sequence(x, lengths, batch_first=True).to(device)

    packed_output, hidden_state = self.rnn(packed_input, hidden_state)

    output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)

    _, unperm_idx = perm_idx.sort(0, descending=False)
    output = output[unperm_idx]

    output = self.fc(output[:, -1, :])
    output = self.sigmoid(output)

    return output, hidden_state

  def _process_packed_input(self, packed_input, hidden_state):
    outputs = []
    batch_sizes = packed_input.batch_sizes

    input_offset = 0
    last_batch_size = batch_sizes[0]
    hiddens = hidden_state[:last_batch_size]

    for batch_size in batch_sizes:
      current_input = packed_input.data[input_offset:input_offset + batch_size]
      current_hidden = hidden_state[:batch_size]

      if batch_size != last_batch_size:
        hiddens = hidden_state[:batch_size]
        last_batch_size = batch_size

      x = self.input2hidden(current_input)
      hiddens = self.hidden2hidden(hiddens)
      hiddens = torch.tanh(x + hiddens)

      outputs.append(hiddens)
      input_offset += batch_size

    packed_output = torch.cat(outputs, dim=0)
    packed_output = nn.utils.rnn.PackedSequence(packed_output, batch_sizes)

    return packed_output, hidden_state

  def init_hidden(self, batch_size=1):
    return torch.zeros(1, batch_size, self.hidden_size, requires_grad=False).to(device)

In [55]:
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt

class Optimization:
  def __init__(self, model, loss_fn, optimizer):
    self.model = model
    self.loss_fn = loss_fn
    self.optimizer = optimizer
    self.train_losses = []
    self.val_losses = []

  def train_step(self, batch, lengths, labels):
    batch = batch.to(device)
    labels = labels.to(device)

    self.model.train()
    outputs, _ = self.model(batch, lengths)

    labels = labels.view(-1, 1).float()

    loss = self.loss_fn(outputs, labels)

    loss.backward()

    self.optimizer.step()
    self.optimizer.zero_grad()

    return loss.item()

  def train(self, train_loader, val_loader, batch_size=32, n_epochs=50, n_features=1):
    model_path = f'/content/seminar-dlmb-rnn/models/{self.model}_{datetime.now().strftime("%Y-%m-%d %H:%M:%S").replace(" ", "_")}'

    for epoch in range(1, n_epochs + 1):
        batch_losses = []

        # training set
        for batch, lengths, labels in train_loader:
          loss = self.train_step(batch, lengths, labels)
          batch_losses.append(loss)

        training_loss = np.mean(batch_losses)
        self.train_losses.append(training_loss)

        # validation set
        with torch.no_grad():
          batch_val_losses = []
          for batch, lengths, labels in val_loader:
            batch = batch.to(device)
            labels = labels.to(device)

            labels = labels.view(-1, 1).float()

            self.model.eval()
            outputs, _ = self.model(batch, lengths)

            val_loss = self.loss_fn(outputs, labels).item()
            batch_val_losses.append(val_loss)

          validation_loss = np.mean(batch_val_losses)
          self.val_losses.append(validation_loss)

        if (epoch <= 10) | (epoch % 50 == 0):
          print(f"[{epoch}/{n_epochs}] Training loss: {training_loss:.4f}\t Validation loss: {validation_loss:.4f}")

    torch.save(self.model.state_dict(), model_path)

  def evaluate(self, test_loader, batch_size=1, n_features=1):

    # test set
    with torch.no_grad():
      predictions = []
      values = []

      for batch, lengths, labels in test_loader:
        batch = batch.to(device)
        labels = labels.to(device)

        self.model.eval()
        outputs = self.model(batch, lengths)
        predictions.append(outputs.detach().numpy())
        values.append(labels.detach().numpy())

      return predictions, values

  def plot_losses(self):
    plt.plot(self.train_losses, label="Training loss")
    plt.plot(self.val_losses, label="Validation loss")
    plt.legend()
    plt.title("Losses")
    plt.show()
    plt.close()

In [56]:
import torch.optim as optim
import glob
import os

input_size = n_acids
output_size = 1
hidden_size = 64
n_epochs = 10
learning_rate = 0.001
loss_fn = nn.BCELoss()

model_params = {'input_size': input_size,
                'hidden_size' : hidden_size,
                'output_size' : output_size
                }

model = RNN(**model_params)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Define the directory containing the models
model_directory = '/content/seminar-dlmb-rnn/models/'

# Use glob to find files starting with "model_"
model_files = glob.glob(os.path.join(model_directory, 'model_*'))

# Check if any model files were found
if model_files:
    # Find the latest file based on modification time
    latest_model_file = max(model_files, key=os.path.getmtime)
    model_path = latest_model_file
    print(f'Latest model file found: {model_path}')
    model.load_state_dict(torch.load(model_path))
else:
    print('No model files found starting with "model_"')

opt = Optimization(model=model, loss_fn=loss_fn, optimizer=optimizer)
opt.train(train_dat_loader, val_dat_loader, n_epochs=n_epochs, n_features=input_size)
opt.plot_losses()

No model files found starting with "model_"


KeyboardInterrupt: 