# Part 1: Build CpG Detector

Here we have a simple problem, given a DNA sequence (of N, A, C, G, T), count the number of CpGs in the sequence (consecutive CGs).

We have defined a few helper functions / parameters for performing this task.

We need you to build a LSTM model and train it to complish this task in PyTorch.

A good solution will be a model that can be trained, with high confidence in correctness.

In [1]:
from typing import Sequence
from functools import partial
import random
import torch
import numpy as np
import random

import matplotlib.pyplot as plt
from skorch import NeuralNetClassifier
from sklearn.model_selection import GridSearchCV


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# DO NOT CHANGE HERE
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

# Use this for getting x label
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers   
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [3]:
# we prepared two datasets for training and evaluation
# training data scale we set to 2048
# we test on 512

def prepare_data(num_samples=100):
    # prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    # step 1
    X_dna_seqs_train = list(rand_sequence(num_samples))
    """
    hint:
        1. You can check X_dna_seqs_train by print, the data is ids which is your training X 
        2. You first convert ids back to DNA sequence
        3. Then you run count_cpgs which will yield CGs counts - this will be the labels (Y)
    """
    #step2
    # use intseq_to_dnaseq here to convert ids back to DNA seqs
    temp = [''.join(intseq_to_dnaseq(el)) for el in X_dna_seqs_train] 
    #step3
    y_dna_seqs = [count_cpgs(el) for el in temp] 
    
    return torch.tensor(X_dna_seqs_train), torch.tensor(y_dna_seqs).unsqueeze(1)
    
train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)

In [4]:
# some config
LSTM_HIDDEN = 64
LSTM_LAYER = 25
batch_size = 64
learning_rate = 0.5
dropout_p=0.5
epoch_num = 100
vocab_size = len("NACGT")
EMBED_DIM = 20

In [5]:
# create data loader

train_data_loader = torch.utils.data.DataLoader(list(zip(train_x[:200], train_y[:200])), batch_size, shuffle=True)
test_Data_loader = torch.utils.data.DataLoader(list(zip(test_x, test_y)), batch_size, shuffle=True)

In [6]:
# Model
class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self, num_layers, hidden_layer_dim, dropout, vocab_size, embed_dim):
        super(CpGPredictor, self).__init__()
        self.vocab_size = vocab_size
        self.dropout_p = dropout
        self.num_layers=num_layers
        self.hidden_layer_dim = hidden_layer_dim
        self.embed_dim=embed_dim
        
        ## Define the model layers
        self.embedding = torch.nn.Embedding(vocab_size, embed_dim)
        self.lstm = torch.nn.LSTM(embed_dim, hidden_size=hidden_layer_dim,
                                  num_layers=num_layers, batch_first=True)
        self.linear = torch.nn.Linear(hidden_layer_dim, 1)
        self.lstm2 = torch.nn.LSTM(hidden_layer_dim, hidden_size=hidden_layer_dim, batch_first=True)
        self.dropout1 = torch.nn.Dropout(dropout_p)
        self.dropout2 = torch.nn.Dropout(0.2)

    def forward(self, x):
        embedded = self.embedding(x)
        yhat, _ = self.lstm(embedded)
        
        ## Without Dropout, the model overfits too soon
        yhat = self.dropout1(yhat)
        yhat, _ = self.lstm2(yhat)
#         yhat = self.dropout2(yhat)
        out = self.linear(yhat[:, -1, :])
        return out


In [7]:
# init model / loss function / optimizer etc.
fixed_len_model = CpGPredictor(LSTM_LAYER, LSTM_HIDDEN, dropout_p, vocab_size, EMBED_DIM)
loss_fn = torch.nn.L1Loss()
optimizer = torch.optim.AdamW(fixed_len_model.parameters(), lr=learning_rate, weight_decay=1e-3)

In [None]:
# training (you can modify the code below)
t_loss = .0
loss_history = []
train_rmse_plot = []
test_rmse_plot = []
train_acc_plot = []
test_acc_plot = []
min_loss = 100
for epoch in range(epoch_num):
    fixed_len_model.train()
    fixed_len_model.zero_grad()
    for X_batch, y_batch in train_data_loader:
        y_pred = fixed_len_model(X_batch)
        loss = loss_fn(y_pred, y_batch)

        optimizer.zero_grad()
        t_loss += loss.item()
        loss.backward()
        optimizer.step()
    loss_history.append(t_loss)
    t_loss = .0

    fixed_len_model.eval()
    with torch.no_grad():
        y_pred = fixed_len_model(train_x)
        train_rmse = np.sqrt(loss_fn(y_pred, train_y))
        train_rmse_plot.append(train_rmse.item())
        train_accuracy = (torch.sum(torch.round(y_pred).int() == train_y))/train_y.size()[0]
        train_acc_plot.append(train_accuracy)
        
        y_pred = fixed_len_model(test_x)
        test_rmse = np.sqrt(loss_fn(y_pred, test_y))
        test_rmse_plot.append(test_rmse.item())
        test_accuracy = (torch.sum(torch.round(y_pred).int() == test_y))/test_y.size()[0]
        test_acc_plot.append(test_accuracy)
    print("Epoch ", epoch, " Loss = ", loss_history[-1])
    if loss_history[-1] < min_loss:
        min_loss = loss_history[-1]
        # Save the model
        torch.save(fixed_len_model.state_dict(), 'models/fixed_len_model_weight.pth')

Epoch  0  Loss =  34.10824251174927
Epoch  1  Loss =  36.50367736816406
Epoch  2  Loss =  18.9738187789917
Epoch  3  Loss =  21.5706684589386
Epoch  4  Loss =  17.118098258972168
Epoch  5  Loss =  15.607149600982666
Epoch  6  Loss =  11.690606474876404
Epoch  7  Loss =  13.3591148853302
Epoch  8  Loss =  9.2801034450531
Epoch  9  Loss =  7.259066820144653
Epoch  10  Loss =  8.05556058883667
Epoch  11  Loss =  7.506715178489685
Epoch  12  Loss =  6.672113656997681
Epoch  13  Loss =  6.09421443939209
Epoch  14  Loss =  6.944698929786682
Epoch  15  Loss =  6.931966543197632
Epoch  16  Loss =  7.463358163833618
Epoch  17  Loss =  6.32133674621582
Epoch  18  Loss =  6.496281027793884
Epoch  19  Loss =  7.822375535964966
Epoch  20  Loss =  7.36993134021759
Epoch  21  Loss =  7.866607069969177
Epoch  22  Loss =  7.457654356956482
Epoch  23  Loss =  6.527387499809265
Epoch  24  Loss =  6.640496611595154
Epoch  25  Loss =  6.559722661972046
Epoch  26  Loss =  8.420514464378357
Epoch  27  Loss =

In [None]:
# plot RMSE
import itertools

colors = ['r', 'g']
cc = itertools.cycle(colors)
plot_lines = []

l1, = plt.plot(train_rmse_plot, '-', color='r')
l2, = plt.plot(test_rmse_plot, '.-', color='g')

plot_lines.append([l1, l2])

legend1 = plt.legend(plot_lines[0], ["Train RMSE plot", "Test RMSE plot"], loc=1)
plt.legend([l[0] for l in plot_lines], range(2), loc=4)
plt.gca().add_artist(legend1)

In [None]:
# plot accuracy

colors = ['r', 'g']
cc = itertools.cycle(colors)
plot_lines = []

l1, = plt.plot(train_acc_plot, '-', color='r')
l2, = plt.plot(test_acc_plot, '--', color='g')

plot_lines.append([l1, l2])

legend1 = plt.legend(plot_lines[0], ["Train accuracy", "Test accuracy"], loc=1)
plt.legend([l[0] for l in plot_lines], range(2), loc=4)
plt.gca().add_artist(legend1)

plt.plot(train_acc_plot, c='r')
plt.plot(test_acc_plot, c='g')
plt.show()

In [None]:
fixed_len_model.eval()

In [None]:
example = list(rand_sequence(1))
print("".join(intseq_to_dnaseq(example[0])))
tester = torch.tensor(example)
prediction = fixed_len_model(tester)
print(prediction)

# Part 2: what if the DNA sequences are not the same length

In [None]:
# hint we will need following imports
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

In [None]:
# DO NOT CHANGE HERE
random.seed(13)

# Use this for getting x label
def rand_sequence_var_len(n_seqs: int, lb: int=16, ub: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        seq_len = random.randint(lb, ub)
        yield [random.randint(1, 5) for _ in range(seq_len)]


# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs


# Alphabet helpers   
alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}
dna2int.update({"pad": 0})
int2dna.update({0: "<pad>"})

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [None]:
# TODO complete the task based on the change
def prepare_data(num_samples=100, min_len=16, max_len=128):
    # TODO prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    #step 1
    X_dna_seqs_train = list(rand_sequence_var_len(num_samples, min_len, max_len))
    #step 2
    temp = [''.join(intseq_to_dnaseq(el)) for el in X_dna_seqs_train if el!=0]
    #step3
    y_dna_seqs = torch.tensor([count_cpgs(el) for el in temp]).unsqueeze(1)
    
    return X_dna_seqs_train, y_dna_seqs
    
    
min_len, max_len = 64, 128
train_x, train_y = prepare_data(2048, min_len, max_len)
test_x, test_y = prepare_data(512, min_len, max_len)

In [None]:
def custom_pad(int_list, max_len=128, pad_var=0):
    if (torch.is_tensor(int_list)):
        length=int_list.size()[0]
    else:
        length = len(int_list)

    pad_len = max_len-length
    if pad_len > 0:
        out = int_list+[0]*pad_len
    else:
        out= int_list
    return out

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)

    
# this will be a collate_fn for dataloader to pad sequence  
class PadSequence:
    def __init__(self, padding_idx):
        self.padding_idx = padding_idx
    
    def __call__(self, batch):
        ##Padding
        sequences = [custom_pad(el, 128, dna2int["pad"]) for el,_ in batch]

        labels = torch.tensor([y for _, y in batch]).unsqueeze(1)
        return torch.tensor(sequences), labels
    

In [None]:
# create data loader

train_data_loader = torch.utils.data.DataLoader(list(zip(train_x, train_y)), batch_size, shuffle=True, 
                                                collate_fn=PadSequence(dna2int["pad"]))
test_Data_loader = torch.utils.data.DataLoader(list(zip(test_x, test_y)), batch_size, shuffle=True, 
                                               collate_fn=PadSequence(dna2int["pad"]))

In [None]:
padded_vocab_size = len(dna2int.keys())

In [None]:
# init model / loss function / optimizer etc.
padded_seq_model = CpGPredictor(LSTM_LAYER, LSTM_HIDDEN, dropout_p, padded_vocab_size, EMBED_DIM)
loss_fn_2 = torch.nn.MSELoss()
optimizer = torch.optim.Adam(padded_seq_model.parameters(), lr=learning_rate)

In [None]:
# Start training
padded_seq_model.train()
padded_seq_model.zero_grad()

In [None]:
t_loss = .0
loss_history = []
min_loss = 100
train_rmse_plot = []
test_rmse_plot = []
train_accuracy_plot = []
test_accuracy_plot = []

for epoch in range(epoch_num):
    for X_batch, y_batch in train_data_loader:
        y_pred = padded_seq_model(X_batch)
        loss = loss_fn(y_pred, y_batch)
        optimizer.zero_grad()
        t_loss += loss.item()
        loss.backward()
        optimizer.step()
    loss_history.append(t_loss)
    print(t_loss)
    t_loss = .0
    
    ## Evaluate
    with torch.no_grad():
        y_pred = padded_seq_model(train_x)
        train_rmse = np.sqrt(loss_fn(y_pred, train_y))
        train_rmse_plot.append(train_rmse.item())
        train_accuracy = (torch.sum(torch.round(y_pred).int() == train_y))/train_y.size()[0]

        y_pred = padded_seq_model(test_x)
        test_rmse = np.sqrt(loss_fn(y_pred, test_y))
        test_rmse_plot.append(test_rmse.item())
        test_accuracy = (torch.sum(torch.round(y_pred).int() == test_y))/test_y.size()[0]
    if min_loss > loss_history[-1]:
        torch.save(padded_seq_model.state_dict(), "models/padded_seq_model_weights.pth")
        min_loss = loss_history[-1]

In [None]:
padded_seq_model.eval()

In [None]:
example = list(rand_sequence(1))
print(" ".join(intseq_to_dnaseq(example[0])))
tester = torch.tensor(example)
print(tester)
prediction = padded_seq_model(tester)
print(prediction)

In [None]:
# plot accuracy

colors = ['r', 'g']
cc = itertools.cycle(colors)
plot_lines = []

l1, = plt.plot(train_acc_plot, '-', color='r')
l2, = plt.plot(test_acc_plot, '--', color='g')

plot_lines.append([l1, l2])

legend1 = plt.legend(plot_lines[0], ["Train accuracy", "Test accuracy"], loc=1)
plt.legend([l[0] for l in plot_lines], range(2), loc=4)
plt.gca().add_artist(legend1)

plt.plot(train_acc_plot, c='r')
plt.plot(test_acc_plot, c='g')
plt.show()