In [1]:
# !pip install tokenizers

## DataLoader

In [2]:
import os
import math

from tokenizers import Tokenizer, models

base_path = 'trainingdata'

In [3]:
def load_sequences(base_path):
    sequences = []
    labels = []
    for folder in os.listdir(base_path):
        if os.path.isdir(f'{base_path}/{folder}'):
            for file_name in os.listdir(f'{base_path}/{folder}'):
                fname = f'{base_path}/{folder}/{file_name}'
                with open(fname) as f:
                    for line in f:
                        if line.startswith('>'):
                            continue
                        sequences.append(line)
                        labels.append(folder)
    return sequences, labels

In [4]:
from torch.utils.data import Dataset, DataLoader
import torch
from torch.nn.utils.rnn import pad_sequence

class SequenceDataset(Dataset):
    def __init__(self, sequence, labels, tokenizer_file='gene_tokenizer.json'):
        """sequence: List of Str
        
        ["ACTG...", "GTCA...", ...]
        """
        self.sequence = sequence
        self.label_dict = self.get_label_dict(labels)
        self.labels = self.encode_labels(labels)

        self.tokenizer = Tokenizer(models.BPE(unk_token="[UNK]"))
        self.tokenizer = self.tokenizer.from_file(tokenizer_file)
        self.tokenizer.enable_padding()

    def get_label_dict(self, labels):
        label_set = set(labels)
        label_dict = {}
        for i, x in enumerate(label_set):
            label_dict[x] = i
        
        return label_dict
    
    def encode_labels(self, labels):
        encoded_label = []  
        for y in labels:
            encoded_label.append(self.label_dict[y])

        return encoded_label
            
        
    def __len__(self):
        return len(self.sequence)

    def __getitem__(self, idx):
        seq = self.sequence[idx]
        label = self.labels[idx]
        encoded_seq = self.tokenizer.encode(seq)
        return torch.LongTensor(encoded_seq.ids), label


    def collate_fn(self, batch):
        """batch: list of (torch.LongTensor, int)"""
        sequences = []
        labels = []
        for item in batch:
            sequences.append(item[0])
            labels.append(item[1])
            
        sequence = pad_sequence(sequences, batch_first=True, padding_value=self.tokenizer.padding['pad_id'])
        labels = torch.LongTensor(labels)
        
        return sequence, labels


## Model

In [7]:
from torch import nn

class RnnModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, pad_id, hidden_dim, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_id)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
        self.out_layer = nn.Linear(hidden_dim, vocab_size)

    def forward(self, ids):
        # ids: [batch size, max sequence length] = [B, L]
        embedded = self.embedding(ids)  # [B, L, E]
        rnn_out, _ = self.rnn(embedded)  # [B, L, H]
        return self.out_layer(rnn_out)  # [B, L, V]

class RnnModelForClassification(nn.Module):
    def __init__(self, vocab_size, embedding_dim, pad_id, hidden_dim, num_layers, output_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_id)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
        self.out_layer = nn.Linear(hidden_dim, output_size)
    
    def forward(self, ids):
        # ids: [batch size, max sequence length] = [B, L]
        embedded = self.embedding(ids)  # [B, L, E]
        rnn_out, _ = self.rnn(embedded)  # [B, L, H]
        hidden_state = rnn_out[:, -1, :]  # [B, H]
        return self.out_layer(hidden_state)  # [B, C]


## Training

In [None]:
from tqdm.notebook import tqdm
import numpy as np
from sklearn.metrics import classification_report, accuracy_score

def train(model, dataloader, loss_function, lr, num_epochs):
    # pytorch training loop
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    history = []
    for epoch in range(num_epochs):
        pbar = tqdm(dataloader)

        all_preds = []
        all_labels = []
        average_loss = []


        average_loss = []
        for batch in pbar:

            batch_sequences, y = batch
            x = batch_sequence = batch_sequence.to('cuda')
            y = y.to('cuda')

            h = model(x)  # [B, C]
            j = loss_function(h, y)
            
            # do gradient descent
            optimizer.zero_grad()  # remove junk from last step
            j.backward()   # calculate gradient from current batch outputs
            optimizer.step()  # update the weights using the gradients

            average_loss.append(j.item())
            all_preds.append(h.argmax(-1).detach().cpu())
            all_labels.append(y.cpu())
    
        print(classification_report(all_labels, all_preds, digits=4))
        accuracy = accuracy_score(all_labels, all_preds)
        
        history.append(accuracy)
    
    return history



# Do Training

In [None]:
tokenizer_file = 'gene_tokenizer.json'

sequences, labels = load_sequences(base_path)
dataset = SequenceDataset(sequences, labels)

lr = 1e-4
batch_size = 2
num_epochs = 50
vocab_size = dataset.tokenizer.get_vocab_size()
pad_id = dataset.tokenizer.padding['pad_id']
embedding_dim = 256
hidden_dim = 512
num_layers = 1

In [None]:
#model = RnnModel(vocab_size, embedding_dim, pad_id, hidden_dim, num_layers)
model = RnnModelForClassification(vocab_size, embedding_dim, pad_id, hidden_dim, num_layers, 2)
model = model.to('cuda')
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=dataset.collate_fn)
loss_function = nn.CrossEntropyLoss(ignore_index=pad_id)

In [None]:
acc_history = train(model, dataloader, loss_function, lr, num_epochs)

In [None]:
acc_history

In [None]:
import matplotlib.pyplot as plt

plt.plot(acc_history)