# Part 1: Build CpG Detector

Here we have a simple problem, given a DNA sequence (of N, A, C, G, T), count the number of CpGs in the sequence (consecutive CGs).

We have defined a few helper functions / parameters for performing this task.

We need you to build a LSTM model and train it to complish this task in PyTorch.

A good solution will be a model that can be trained, with high confidence in correctness.

In [1]:
from typing import Sequence
from functools import partial
import random
import torch
import numpy as np
import random

In [2]:
# DO NOT CHANGE HERE
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

# Use this for getting x label
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers   
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [3]:
# we prepared two datasets for training and evaluation
# training data scale we set to 2048
# we test on 512

def prepare_data(num_samples=100):
    # prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    # step 1
    X_dna_seqs_train = list(rand_sequence(num_samples))
    """
    hint:
        1. You can check X_dna_seqs_train by print, the data is ids which is your training X 
        2. You first convert ids back to DNA sequence
        3. Then you run count_cpgs which will yield CGs counts - this will be the labels (Y)
    """
    #step2
    temp = [intseq_to_dnaseq(seq) for seq in X_dna_seqs_train] # use intseq_to_dnaseq here to convert ids back to DNA seqs
    #step3
    y_dna_seqs = [count_cpgs(seq) for seq in X_dna_seqs_train] # use count_cpgs here to generate labels with temp generated in step2
    
    return X_dna_seqs_train, y_dna_seqs
    
train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)




In [4]:
# some config
LSTM_HIDDEN = 16
LSTM_LAYER = 2
batch_size = 32
learning_rate = 0.001
epoch_num = 10

In [5]:
from torch.utils.data import DataLoader, TensorDataset

train_x_tensor = torch.tensor(train_x)
test_x_tensor = torch.tensor(test_x)
train_y_tensor = torch.tensor(train_y, dtype=torch.float32)  # Convert labels to floats
test_y_tensor = torch.tensor(test_y, dtype=torch.float32)  # Convert labels to floats

# create data loader
train_data_loader =  DataLoader(
    dataset=torch.utils.data.TensorDataset(train_x_tensor, train_y_tensor),
    batch_size=batch_size,
    shuffle=True,
)

test_Data_loader = DataLoader(
    dataset=torch.utils.data.TensorDataset(test_x_tensor, test_y_tensor),
    batch_size=batch_size,
    shuffle=False,  # No shuffling needed for testing
)

In [6]:
# Model
class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self, input_size, hidden_size):
        super(CpGPredictor, self).__init__()
        # TODO complete model, you are free to add whatever layers you need here
        # We do need a lstm and a classifier layer here but you are free to implement them in your way
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.classifier = nn.Linear(hidden_size, 1)

    def forward(self, x):
        # TODO complete forward function
        embeddings = nn.functional.one_hot(x, num_classes=5).float()
        lstm_out, _ = self.lstm(embeddings)
        output = lstm_out[:, -1, :]
        
        #Predict CpG count using the linear layer
        logits = self.classifier(output)
        return logits


In [7]:
# init model / loss function / optimizer etc.
import torch.nn as nn
model = CpGPredictor(5,32)
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [11]:
# training (you can modify the code below)
t_loss = 0.0
model.train()
model.zero_grad()
for epoch in range(epoch_num):
    for data, target in train_data_loader:
        #TODO complete training loop
        optimizer.zero_grad()
        
        # Forward pass
        output = model(data)
        loss = loss_fn(output.squeeze(), target)
        
        t_loss += loss.item()
        loss.backward()

        optimizer.step()

        # Update epoch loss
        t_loss += loss.item()
        
    print(t_loss)
    t_loss = .0
    

0.09571299851813819
0.0010617807572543825
0.00022028727994438668
0.00012347974484328006
9.209139722088366e-05
7.721985838315959e-05
6.787969937249727e-05
5.9024541371854866e-05
5.296557836231841e-05
4.561747141451633e-05


In [8]:
model.eval()  # Set model to evaluation mode

res_gs = []  # Store ground truth CpG counts
res_pred = []  # Store predicted CpG counts

with torch.no_grad():  # Disable gradient calculation for evaluation
    for data, target in test_Data_loader:
        output = model(data)  # Pass data through the model

        # Detach and convert outputs for comparison and storage
        predicted = output.squeeze().detach().cpu().numpy().tolist()
        target = target.detach().cpu().numpy().tolist()

        res_gs.extend(target)  # Append ground truth counts
        res_pred.extend(predicted)  # Append predicted counts

In [13]:
# TODO complete evaluation of the model

import numpy as np


def calculate_mse(y_true, y_pred):
    y_true_np = np.array(y_true)
    y_pred_np = np.array(y_pred)
    return np.mean(np.square(y_true_np - y_pred_np))

def calculate_mae(y_true, y_pred):
    y_true_np = np.array(y_true)
    y_pred_np = np.array(y_pred)
    return np.mean(np.abs(y_true_np - y_pred_np))

def calculate_r_squared(y_true, y_pred):
    y_true_np = np.array(y_true)
    y_pred_np = np.array(y_pred)
    ss_res = np.sum(np.square(y_true_np - y_pred_np))
    ss_tot = np.sum(np.square(y_true_np - np.mean(y_true_np)))
    return 1 - (ss_res / ss_tot)


# Example using  data collected from 'res_gs' and 'res_pred' during inference
mse = calculate_mse(res_gs, res_pred)
mae = calculate_mae(res_gs, res_pred)
r_squared = calculate_r_squared(res_gs, res_pred)

print("Mean Squared Error:", mse)
print("Mean Absolute Error:", mae)
print("R-Squared:", r_squared)

Mean Squared Error: 3.414942978099439e-07
Mean Absolute Error: 0.00048632130346959457
R-Squared: -inf


  return 1 - (ss_res / ss_tot)


# Part 2: what if the DNA sequences are not the same length

In [9]:
# hint we will need following imports
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

In [10]:
# DO NOT CHANGE HERE
random.seed(13)

# Use this for getting x label
def rand_sequence_var_len(n_seqs: int, lb: int=16, ub: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        seq_len = random.randint(lb, ub)
        yield [random.randint(1, 5) for _ in range(seq_len)]


# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs


# Alphabet helpers   
alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}
dna2int.update({"pad": 0})
int2dna.update({0: "<pad>"})

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [11]:
# TODO complete the task based on the change

def prepare_data(num_samples=100, min_len=16, max_len=128):
    # Step 1: Generate random DNA sequences
    X_dna_seqs_train = list(rand_sequence_var_len(num_samples, min_len, max_len))
    
    # Step 2: Pad sequences to a common length
    max_seq_len = max(len(seq) for seq in X_dna_seqs_train)
    padded_seqs = []
    for seq in X_dna_seqs_train:
        padded_seq = seq + [0] * (max_seq_len - len(seq))  # Pad with zeros
        padded_seqs.append(padded_seq)

    # Convert to tensors
    padded_seqs_tensor = torch.tensor(padded_seqs, dtype=torch.long)
    
    # Step 3: Count CpG dinucleotides for target labels
    y_dna_seqs = [count_cpgs(seq) for seq in X_dna_seqs_train]

    # Step 4: Convert integer sequences to tensors (optional)
    if torch.cuda.is_available():
        padded_seqs_tensor = padded_seqs_tensor.cuda()

    return padded_seqs_tensor, y_dna_seqs

# Example usage
min_len, max_len = 64, 128
train_x, train_y = prepare_data(2048, min_len, max_len)
test_x, test_y = prepare_data(512, min_len, max_len)

In [12]:
from torch.nn.utils.rnn import pad_sequence
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)

    
# this will be a collate_fn for dataloader to pad sequence  
class PadSequence:
    def __call__(self, batch):

    # Extract sequences and labels from the batch
        sequences, labels = zip(*batch)

    # Pad sequences to a common length (assume padding value 0)
        padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)

    # Convert labels to tensors (if necessary)
        labels = torch.tensor(labels)

        return padded_sequences, labels

In [13]:
train_data = MyDataset(train_x, train_y)
test_data = MyDataset(test_x, test_y)

train_loader = DataLoader(train_data, batch_size=32, collate_fn=PadSequence())
test_loader = DataLoader(test_data, batch_size=32, collate_fn=PadSequence())

In [30]:
# TODO complete the rest


import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

# Import your data preparation functions (replace with actual implementation)
#from DataLoader import prepare_data, MyDataset, PadSequence

# Hyperparameters
input_size = 5  # Number of unique characters in DNA alphabet (N, A, C, G, T)
hidden_size = 64  # Number of hidden units in LSTM
num_layers = 2  # Number of LSTM layers
learning_rate = 0.001  # Learning rate for optimizer
num_epochs = 10  # Number of training epochs

# Model definition
class CpGPredictor(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(CpGPredictor, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)  # Embedding layer
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)  # Output layer for predicting CpG count

    def forward(self, x):
        # Convert DNA sequence to integer encoding (replace with your encoding logic)
        encoded_x = torch.tensor(dnaseq_to_intseq(x), dtype=torch.long)
        def dnaseq_to_intseq(seq):
  # ... conversion logic ...
        return torch.tensor(converted_sequence, dtype=torch.long)
        print(f"Dimension of encoded_x: {len(encoded_x)}")
    if len(encoded_x) == 1:  # Check if only one dimension
        x_tensor = torch.tensor(encoded_x, dtype=torch.long).to(device)
    else:
  # Add batch dimension if necessary
        x_tensor = torch.tensor(encoded_x).unsqueeze(0).to(device)
    
        # Create a tensor from the encoded sequence
        x_tensor = torch.tensor(encoded_x).unsqueeze(0).to(device)  # Add batch dimension

        # Pass through embedding layer
        embeddings = self.embedding(x_tensor)

        # Pass through LSTM layers
        lstm_out, _ = self.lstm(embeddings)
        # Extract the last hidden state (assuming batch-first ordering)
        last_hidden = lstm_out[:, -1, :]

        # Predict CpG count with linear layer
        prediction = self.fc(last_hidden)
        return prediction

# Prepare data
train_x, train_y = prepare_data(2048, 64, 128)
test_x, test_y = prepare_data(512, 64, 128)

# Create datasets and data loaders
train_data = MyDataset(train_x, train_y)
test_data = MyDataset(test_x, test_y)

train_loader = DataLoader(train_data, batch_size=32, collate_fn=PadSequence())
test_loader = DataLoader(test_data, batch_size=32, collate_fn=PadSequence())

# Determine device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize model and optimizer
model = CpGPredictor(input_size, hidden_size, num_layers).to(device)
loss_fn = nn.MSELoss()  # Mean squared error loss for regression
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
for epoch in range(1, num_epochs + 1):
    print("Epoch:", epoch)

    # Training loop
    model.train()
    total_loss = 0.0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_fn(output, target.float())  # Convert target to float for MSE loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

    avg_loss = total_loss / len(train_loader)
    print('Average Training Loss: {:.6f}'.format(avg_loss))

    # Evaluation loop
    model.eval()
    test_loss = 0.0

IndentationError: expected an indented block after function definition on line 30 (1845672750.py, line 32)