In [72]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torchviz import make_dot
#from torchsummary import summary

import tensorboard

import numpy as np 
import pandas as pd 
import random
from Bio import SeqIO

from tqdm import tqdm

import warnings
warnings.filterwarnings('ignore')


>dataloading

In [73]:
# reading all records
records = list(SeqIO.parse("/home/hachem/Documents/mutagan/data/ncbi_dataset/data/protein.faa", "fasta"))

# one hot encoding
vocab = set()
for record in records:
    vocab.update(str(record.seq))
    
vocab.add("<pad>"), vocab.add("<sos>"), vocab.add("<eos>")
to_ix = {char: i for i, char in enumerate(vocab)}
inv_to_ix = {v: k for k, v in to_ix.items()}

FileNotFoundError: [Errno 2] No such file or directory: '/home/hachem/Documents/mutagan/data/ncbi_dataset/data/protein.faa'

In [None]:
class BiologicalSequenceDataset:
    def __init__(self, records):
        self.records = records

    def __len__(self):
        return len(self.records)

    def __getitem__(self, i):
        seq = self.records[i].seq
        return torch.tensor([to_ix[residue] for residue in seq])

def collate_fn(batch):
    return torch.nn.utils.rnn.pad_sequence(
        batch,
        batch_first=True,
        padding_value=to_ix["<pad>"]
    )

>model

In [74]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [75]:
class Encoder(nn.Module):
    
    def __init__(self, input_size, embedding_size, hidden_size, num_layers):
        super(Encoder,self).__init__()
        self.input_size = input_size
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.embedding = nn.Embedding(num_embeddings=self.input_size, embedding_dim=self.embedding_size)
        self.rnn = nn.LSTM(input_size = embedding_size, hidden_size= self.hidden_size, num_layers=self.num_layers, bidirectional=True, batch_first=True)


    def forward(self, x):
        #x shape : (batch_size, seq_len)
        #embedding shape : (batch_size, seq_len, embedding_size)

        embedding = self.embedding(x)
        output, (hidden, cell) = self.rnn(embedding)

        return hidden, cell 


class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers):
        super(Decoder,self).__init__()
        self.input_size = input_size
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size 

        self.embedding = nn.Embedding(num_embeddings=self.input_size, embedding_dim=self.embedding_size)
        self.rnn = nn.LSTM(input_size = embedding_size, hidden_size= self.hidden_size, num_layers=self.num_layers, bidirectional=False, batch_first=True)
        self.fc = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, x, hidden, cell):
        embedding = self.embedding(x)
        output, (hidden_, cell_) = self.rnn(embedding, (hidden, cell))
        logits = self.fc(output)

        pred = nn.functional.softmax(logits)

        return pred, hidden_, cell_
        

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, teacher_forcing_ratio=0.7):
        super(Seq2Seq,self).__init__()
        self.encoder = encoder
        self.decoder = decoder 
        self.teacher_forcing_ratio = teacher_forcing_ratio

    def forward(self, parent_batch, child_batch):
        batch_size = parent_batch.shape[0]
        child_len = child_batch.shape[-1]

        hidden, state = self.encoder(parent_batch)

        hidden_ = hidden.view(batch_size, self.encoder.hidden_size*2).unsqueeze(0)
        state_ = state.view(batch_size, self.encoder.hidden_size*2).unsqueeze(0)

        #add noise to the state 
        outputs = torch.zeros((batch_size, 1, self.decoder.output_size)).to(device)
        x = to_ix["<sos>"]*torch.ones((batch_size,1)).int().to(device)

        for t in range(50): #range(child_len):
            #output shape (batch_size, 1, vocab size)
            #outputs shape ((batch_size, seq_len, vocab size))
            output, hidden_, state_ = self.decoder(x, hidden_, state_)
            outputs = torch.cat((outputs, output), dim=1)
            
            #teacher forcing 
            x = child_batch[:,t].unsqueeze(-1).int() if random.random() < self.teacher_forcing_ratio else output.argmax(-1)

        return outputs

    def get_encoder(self):
        return self.encoder.detach()

    def set_teacher_forcing_ratio(self, new_ratio):
        self.teacher_forcing_ratio = new_ratio



class Discriminator(nn.Module):
    def __init__(self, encoder):
        super(Discriminator, self).__init__()
        self.encoder = encoder
        self.hidden_size = self.encoder.hidden_size
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.BatchNorm1d(2*self.hidden_size),
            nn.Linear(2*self.hidden_size, 128),
            nn.LeakyReLU(0.1),
            nn.Dropout(0.2),
            nn.BatchNorm1d(128),
            nn.Linear(128,64),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(64),
            nn.Linear(64,1),
            nn.Sigmoid()
            )
        
    def forward(self, x):
        hidden, state = self.encoder(x)
        x = state.view(state.shape[0], self.encoder.hidden_size*2)
        x = self.classifier(x)

        return x


>training 

In [76]:
#mutaGAN 
vocab_size = 24

#encoder
encoder_emb_size = 250
encoder_hidden_size = 256
encoder_num_layers = 1 #128 

#decoder
decoder_emb_size = 250
decoder_hidden_size = 256
decoder_num_layers = 1 #128 

#MLE training
MLE_num_epochs = 72
MLE_batch_size = 2 #16
MLE_learning_rate = 0.01

#GAN training
GAN_num_epochs = 350
GAN_batch_size = 16 
GAN_learning_rate = 1e-3

In [77]:
training_data = torch.utils.data.DataLoader(
    BiologicalSequenceDataset(records),
    batch_size=MLE_batch_size,
    collate_fn=collate_fn,
)

In [78]:
batch = iter(training_data).next() #test
enc = Encoder(input_size=24, embedding_size=250, hidden_size=256, num_layers=1)
dec = Decoder(input_size=24, embedding_size=250, hidden_size=256*2, output_size=24, num_layers=1)
seq2seq = Seq2Seq(enc,dec)

In [79]:
d = Discriminator(enc)

>MLE training

In [80]:
criterion = nn.CrossEntropyLoss(ignore_index=to_ix["<pad>"])
optimizer = Adam(seq2seq.parameters(), lr=MLE_learning_rate)

In [83]:
for epoch in range(MLE_num_epochs):
    for parent, child in zip(training_data, training_data):

        parent = parent.to(device)
        child = child.to(device)

        #feed forward 
        output = seq2seq(parent, child)
        loss = criterion(output, child[:,:output.shape[-1]])

        #backpropagate
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        print("loss",loss.item())

loss 3.9593498706817627
loss 3.7702770233154297
loss 3.7621991634368896
loss 3.8032398223876953
loss 3.68393611907959
loss 3.689722776412964
loss 3.7920544147491455


KeyboardInterrupt: 

In [None]:
# outs = seq2seq(batch,batch)
# outs.argmax(-1).shape

# for seq in outs.argmax(-1): 
#     print([inv_to_ix[t.item()] for t in seq])

> GAN training 