In [None]:
from biodatasets import list_datasets, load_dataset
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from torch.utils.tensorboard import SummaryWriter
import pandas as pd
import numpy as np
from tqdm import tqdm
import math
import time
from sklearn.metrics import confusion_matrix

In [None]:
# Loading data into numpy array
pathogen = load_dataset("pathogen")

X, y = pathogen.to_npy_arrays(input_names=["sequence"], target_names=["class"])

pathogen.display_description()

In [None]:
# Encoding Amino Acids to number
def get_seq_column_map(X):
    unique = set()
    for idx, sequence in enumerate(X[0]):
        unique.update(list(sequence))
    
    return dict(zip(unique, list(range(len(unique)))))
    
pathogen_map = get_seq_column_map(X)
print(pathogen_map)

In [None]:
class PathogenDataset(Dataset):
    
    def __init__(self, pathogen_map, data):
        self.pathogen_map = pathogen_map
        self.X = data[0]
        self.Y = data[1]
    
    def __len__(self):
        return len(self.Y)
    
    def __getitem__(self, idx):
        X = torch.as_tensor([self.pathogen_map[e] for e in list(self.X[idx])]) 
        Y = self.Y[idx]
        return X, Y

def collate_padd(batch):
        x = [row[0] for row in batch]
        y = [row[1] for row in batch]
        
        sequence_len = [len(row) for row in x]
        x =  pad_sequence(x, batch_first=True)
        return (torch.as_tensor(x).to(torch.float32), torch.as_tensor(sequence_len)), torch.as_tensor(y).to(torch.float32)
    
# Split ~ 80% 10% 10%
training_set = PathogenDataset(pathogen_map,(X[0][:80000], y[0][:80000]))
training_loader = DataLoader(training_set, batch_size=4, shuffle=True, collate_fn=collate_padd)

validation_set = PathogenDataset(pathogen_map,(X[0][80000:90000], y[0][80000:90000]))
validation_loader = DataLoader(validation_set, batch_size=8, collate_fn=collate_padd)

testing_set = PathogenDataset(pathogen_map,(X[0][90000:], y[0][90000:]))
testing_loader = DataLoader(testing_set, batch_size=8, collate_fn=collate_padd)

next(iter(training_loader))

In [None]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, max_len):
        super().__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)

        self.register_buffer('pe', pe, persistent=False)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x

In [None]:
class Net(nn.Module):
    """
    Text classifier based on a pytorch TransformerEncoder.
    """

    def __init__(
        self,
        vocab_size,
        d_model,
        nhead=8,
        dim_feedforward=512,
        num_layers=6,
        activation="relu",
        dropout=0.1,
    ):

        super().__init__()

        #vocab_size, d_model = embeddings.size()
        assert d_model % nhead == 0, "nheads must divide evenly into d_model"

        #self.emb = nn.Embedding.from_pretrained(embeddings, freeze=False)

        self.embed = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=512,
        )
        
        self.pos_encoder = PositionalEncoding(
            d_model=d_model,
            max_len=11000,
        )

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer,
            num_layers=num_layers,
        )
        
        self.dropout = nn.Dropout(p=0.25)
        
        self.classifier = nn.Linear(d_model, 1)
        
        self.d_model = d_model

    def forward(self, x):
        x = self.embed(x) * math.sqrt(self.d_model)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)
        
        x = self.dropout(x)
        
        x = self.classifier(x)

        return x


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"device : {device}")
torch.cuda.get_device_name()

In [None]:
model = Net(
    vocab_size=len(pathogen_map),
    d_model=512,
    nhead=8,  
    dim_feedforward=50,
    num_layers=6,
    dropout=0.25
).to(device)

print(model)

writer = SummaryWriter()

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(2):
    tqdm_bar = tqdm(training_loader, desc=f"epoch {epoch}", position=0)
    
    # Training
    model.train()
    for idx, ((inputs, sequence_len), labels) in enumerate(tqdm_bar):
        inputs = inputs.cuda()
        labels = labels.cuda()
        
        optimizer.zero_grad()
        sigmoid = nn.Sigmoid()
        outputs = sigmoid(model(inputs.to(torch.int32)))
        
        print(outputs.flatten(), labels)
        
        loss = criterion(outputs.flatten(), labels).to(torch.float32)
        loss.backward()
        
        
        writer.add_scalar('Loss/train', loss, idx)
        
    # Training Accuracy
    correct, total = 0, 0
    predicted = torch.round(outputs.flatten())
    y = labels

    total += labels.size(0)
    correct += (predicted == y).sum().item()
    writer.add_scalar('accuracy/train', correct/total, idx)

    optimizer.step()

    """
    # Validation Accuracy
    model.eval()
    with torch.no_grad():
        for idx, (inputs, labels) in enumerate(validation_loader):
            correct, total = 0, 0
            inputs = inputs.cuda()
            labels = labels.cuda()

            outputs = model(inputs.to(torch.int32))

            _, predicted = torch.max(outputs, 1)
            _, y = torch.max(labels, 1)

            total += labels.size(0)
            correct += (predicted == y).sum().item()
            writer.add_scalar('accuracy/validation', correct/total, idx)
    """
writer.close()

In [None]:
!tensorboard --logdir=runs

In [None]:
PATH = './pathogen_net_transformer.pth'
torch.save(model.state_dict(), PATH)

In [None]:
model = Net(
    vocab_size=len(pathogen_map),
    d_model=512,
    nhead=8,  
    dim_feedforward=50,
    num_layers=6,
    dropout=0.25
).to(device)
model.load_state_dict(torch.load(PATH))
model.eval()


# Testing Accuracy
correct, total = 0, 0
with torch.no_grad():
    all_predicted, all_y = [], []
    for ((inputs, sequence_len), labels) in testing_loader:
        inputs = inputs.cuda()
        labels = labels.cuda()
        
        outputs = sigmoid(model(inputs.to(torch.int32)))
        
        predicted = torch.round(outputs.flatten())
        y = labels
                
        all_predicted.extend(predicted.tolist())
        all_y.extend(y.tolist())
        
        total += labels.size(0)
        correct += (predicted == y).sum().item()

print(confusion_matrix(all_y, all_predicted))
print(f'Accuracy of nn: {correct / total}')