# Part 1: Build CpG Detector

Here we have a simple problem, given a DNA sequence (of N, A, C, G, T), count the number of CpGs in the sequence (consecutive CGs).

We have defined a few helper functions / parameters for performing this task.

We need you to build a LSTM model and train it to complish this task in PyTorch.

A good solution will be a model that can be trained, with high confidence in correctness.

In [5]:
from typing import Sequence 
from functools import partial
import random # to generate random numbers
import torch # to build the model
import numpy as np # to perform calculations

In [6]:
# DO NOT CHANGE HERE

def set_seed(seed=42):
    """
    Ensures reproducibility across runs, all the randoms generated will have the same seed
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

# Use this for getting x label
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    """
    Generates random integer-encoded DNA sequences of fixed length (default 128). 
    Each int from 0 to 4 corresponds to one of: 'N', 'A', 'C', 'G', 'T'.
    Example: [0, 2, 3, 1, 4, 3] might represent "NCGA TG"
    """
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    """
    Counts how many "CG" pairs are in the DNA string. This is my regression label (target output). It is done using 
    programmable logic, which I will need to do using deep learning.
    """
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers   
"""
Allows converting between integer-sequence form and string-sequence form.
"""
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [7]:
# we prepared two datasets for training and evaluation
# training data scale we set to 2048
# we test on 512

def prepare_data(num_samples=100):
    """
    Create dataset by:
        Generating X (random DNA int sequences)
        Converting to string DNA (temp)
        Getting labels y using count_cpgs(temp)
    """
    # prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    # step 1
    X_dna_seqs_train = list(rand_sequence(num_samples))
    """
    hint:
        1. You can check X_dna_seqs_train by print, the data is ids which is your training X 
        2. You first convert ids back to DNA sequence
        3. Then you run count_cpgs which will yield CGs counts - this will be the labels (Y)
    """
    ## Step 2: Convert each int sequence back to string using int2dna
    temp = [''.join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train]

    # Step 3: Count CG pairs in each DNA string to form target labels
    y_dna_seqs = [count_cpgs(seq) for seq in temp]
    
    return X_dna_seqs_train, y_dna_seqs
    
train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)

In [9]:
from torch.utils.data import DataLoader, TensorDataset

# Model and training hyperparameters
LSTM_HIDDEN = 64
LSTM_LAYER = 2
batch_size = 64
learning_rate = 0.001
epoch_num = 10

In [10]:
# Convert data to PyTorch tensors
train_tensor_x = torch.tensor(train_x, dtype=torch.long)
train_tensor_y = torch.tensor(train_y, dtype=torch.float32)

test_tensor_x = torch.tensor(test_x, dtype=torch.long)
test_tensor_y = torch.tensor(test_y, dtype=torch.float32)

In [11]:
# Create DataLoaders for batching
train_data_loader = DataLoader(TensorDataset(train_tensor_x, train_tensor_y), batch_size=batch_size, shuffle=True)
test_data_loader = DataLoader(TensorDataset(test_tensor_x, test_tensor_y), batch_size=batch_size, shuffle=False)

In [14]:
# Model
class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self):
        super(CpGPredictor, self).__init__()
        # TODO complete model, you are free to add whatever layers you need here
        # We do need a lstm and a classifier layer here but you are free to implement them in your way
        
        super(CpGPredictor, self).__init__()
        # Convert int (0–4) to 16-dim embedding vector
        self.embedding = torch.nn.Embedding(num_embeddings=5, embedding_dim=16)
        # LSTM for sequence modeling
        self.lstm = torch.nn.LSTM(input_size=16, hidden_size=LSTM_HIDDEN, num_layers=LSTM_LAYER, batch_first=True)
        # Final linear layer to output 1 regression value
        self.classifier = torch.nn.Linear(LSTM_HIDDEN, 1)

    def forward(self, x):
        x = self.embedding(x)                   # [batch, seq_len, embed_dim]
        lstm_out, _ = self.lstm(x)              # [batch, seq_len, hidden_size]
        last_hidden = lstm_out[:, -1, :]        # Take the last time step
        logits = self.classifier(last_hidden)   # Output: [batch, 1]
        return logits.squeeze(1)                # Remove extra dimension

In [15]:
# init model / loss function / optimizer etc.
"""
Initialize the model, loss, and optimizer.
"""
model = CpGPredictor()
loss_fn = torch.nn.MSELoss()                            # Good for regression
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [16]:
# training (you can modify the code below)
t_loss = 0.0

"""
Train the model across epochs.
    I need to complete:
        Fetch inputs, targets from batch
        outputs = model(inputs)
        loss = loss_fn(outputs, targets)
        loss.backward(); optimizer.step(); optimizer.zero_grad()
"""
for epoch in range(epoch_num):
    t_loss = 0.0
    for batch_x, batch_y in train_data_loader:
        optimizer.zero_grad()                  # Reset gradients
        preds = model(batch_x)                 # Forward pass
        loss = loss_fn(preds, batch_y)         # Compute loss
        loss.backward()                        # Backpropagate
        optimizer.step()                       # Update weights
        t_loss += loss.item()
    print(f"Epoch {epoch+1}/{epoch_num} - Training Loss: {t_loss:.4f}")

Epoch 1/10 - Training Loss: 575.4057
Epoch 2/10 - Training Loss: 136.6816
Epoch 3/10 - Training Loss: 134.1474
Epoch 4/10 - Training Loss: 134.2021
Epoch 5/10 - Training Loss: 134.4268
Epoch 6/10 - Training Loss: 134.2547
Epoch 7/10 - Training Loss: 134.1814
Epoch 8/10 - Training Loss: 134.2760
Epoch 9/10 - Training Loss: 134.2533
Epoch 10/10 - Training Loss: 134.2186


In [17]:
# eval (you can modify the code below)
"""
Run inference on test data.
I need to complete:
    Predict model(x)
    Append predictions and ground truths to res_pred, res_gs|
    Evaluate with RMSE or MAE
"""
model.eval()  # Set model to evaluation mode

res_gs = []    # Ground truth labels
res_pred = []  # Model predictions

with torch.no_grad():  # Disable gradient computation
    for batch_x, batch_y in test_data_loader:
        preds = model(batch_x)  # Get model predictions
        res_pred.extend(preds.tolist())  # Convert predictions to list and store
        res_gs.extend(batch_y.tolist())  # Convert true labels to list and store

# Simple evaluation metric: RMSE (root mean squared error)
from sklearn.metrics import mean_squared_error
import math

rmse = math.sqrt(mean_squared_error(res_gs, res_pred))
print(f"Test RMSE: {rmse:.4f}")


Test RMSE: 2.0391


In [None]:
# TODO complete evaluation of the model

# Part 2: what if the DNA sequences are not the same length

In [19]:
# hint we will need following imports
"""
These are utility functions from PyTorch used to:
    pad_sequence: pad sequences in a batch to same length.
    pack_padded_sequence: pack padded sequences for efficient LSTM processing.
    pad_packed_sequence: unpack after LSTM if needed.
"""
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

In [20]:
# DO NOT CHANGE HERE
"""
Ensure reproducibility of random number generation.
"""
random.seed(13)

# Use this for getting x label
def rand_sequence_var_len(n_seqs: int, lb: int=16, ub: int=128) -> Sequence[int]:
    """
    Generates sequences of random lengths between lb and ub.
    Each element is a random integer in [1, 5] mapping to 'N', 'A', 'C', 'G', 'T'.
    Note:
    This time, 0 is reserved for "pad" token (see later), so we start from 1.
    """
    for i in range(n_seqs):
        seq_len = random.randint(lb, ub)
        yield [random.randint(1, 5) for _ in range(seq_len)]


# Use this for getting y label
def count_cpgs(seq: str) -> int:
    """
    Counts the number of "CG" substrings in a DNA string.
    No change from Part 1 — it still works on a full DNA string.
    """
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs


# Alphabet helpers   
"""
DNA characters ('NACGT') are now encoded as integers [1, 2, 3, 4, 5].
Padding token is assigned to 0 ("pad").
These mappings let you convert between encoded int lists and DNA strings.
"""
alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}
dna2int.update({"pad": 0})
int2dna.update({0: "<pad>"})

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [22]:
# TODO complete the task based on the change
def prepare_data(num_samples=100, min_len=16, max_len=128):
    # TODO prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    
    """
    Generate X = variable-length DNA sequences (int-encoded)
    Convert them to string format
    Use count_cpgs() to generate target labels
    """
    #step 1 Generate variable-length int sequences
    X_dna_seqs_train = list(rand_sequence_var_len(num_samples, min_len, max_len))
    
    # Step 2: Convert each int sequence to DNA string
    temp = [''.join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train]
    
    # Step 3: Use count_cpgs to generate targets
    y_dna_seqs = [count_cpgs(seq) for seq in temp]
    
    return X_dna_seqs_train, y_dna_seqs

"""
Creates training and testing datasets of variable lengths between 64 and 128 bases.
"""
    
min_len, max_len = 64, 128
train_x, train_y = prepare_data(2048, min_len, max_len)
test_x, test_y = prepare_data(512, min_len, max_len)

In [23]:
class MyDataset(torch.utils.data.Dataset):
    """
    Defines a PyTorch Dataset to wrap your (X, y) data:
    Returns (LongTensor(sequence), label) per sample
    Needed for PyTorch’s DataLoader
    """
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)

    


In [25]:
# this will be a collate_fn for dataloader to pad sequence  
class PadSequence:
    """
    When using variable-length sequences, PyTorch needs a custom collate_fn in the DataLoader to:
    Pad all sequences in a batch to the length of the longest one
    Return padded_sequences, sequence_lengths, and labels
    """
    #TODO
    def __call__(self, batch):
        # Split batch into sequences and labels
        sequences, labels = zip(*batch)

        # Store original lengths (needed for packing)
        lengths = torch.tensor([len(seq) for seq in sequences])

        # Pad all sequences to max length in the batch
        padded_seqs = pad_sequence(sequences, batch_first=True, padding_value=0)

        # Convert labels to tensor
        labels = torch.tensor(labels, dtype=torch.float32)

        return padded_seqs, lengths, labels

In [26]:
# BUILD DATA LOADERS WITH PAD SEQUENCE
pad_sequence_fn = PadSequence()

train_dataset = MyDataset(train_x, train_y)
test_dataset = MyDataset(test_x, test_y)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=pad_sequence_fn)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False, collate_fn=pad_sequence_fn)


In [27]:
# DEFINE LSTM MODEL (WITH PACKED INPUT SUPPORT)
class CpGPredictor(torch.nn.Module):
    def __init__(self):
        super(CpGPredictor, self).__init__()
        self.embedding = torch.nn.Embedding(num_embeddings=6, embedding_dim=16, padding_idx=0)
        self.lstm = torch.nn.LSTM(input_size=16, hidden_size=64, num_layers=2, batch_first=True)
        self.classifier = torch.nn.Linear(64, 1)

    def forward(self, x, lengths):
        # Step 1: Embed
        embedded = self.embedding(x)

        # Step 2: Pack the padded batch
        packed = pack_padded_sequence(embedded, lengths.cpu(), batch_first=True, enforce_sorted=False)

        # Step 3: LSTM
        packed_output, (hn, cn) = self.lstm(packed)

        # Step 4: Use the last hidden state of the last layer
        final_hidden = hn[-1]

        # Step 5: Predict CpG count
        output = self.classifier(final_hidden)

        return output.squeeze(1)


In [29]:
# INSTANTIATE MODEL, LOSS, OPTIMIZER
model = CpGPredictor()
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [30]:
# TRAIN THE MODEL
for epoch in range(10):
    model.train()
    total_loss = 0.0

    for batch_x, lengths, batch_y in train_loader:
        optimizer.zero_grad()
        preds = model(batch_x, lengths)
        loss = loss_fn(preds, batch_y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}: Training Loss = {total_loss:.4f}")


Epoch 1: Training Loss = 286.4987
Epoch 2: Training Loss = 117.3417
Epoch 3: Training Loss = 117.6175
Epoch 4: Training Loss = 117.7837
Epoch 5: Training Loss = 117.2906
Epoch 6: Training Loss = 117.5815
Epoch 7: Training Loss = 117.2316
Epoch 8: Training Loss = 117.2386
Epoch 9: Training Loss = 117.2249
Epoch 10: Training Loss = 117.3609


In [31]:
# EVALUATE THE MODEL
model.eval()
res_gs = []
res_pred = []

with torch.no_grad():
    for batch_x, lengths, batch_y in test_loader:
        preds = model(batch_x, lengths)
        res_pred.extend(preds.tolist())
        res_gs.extend(batch_y.tolist())

from sklearn.metrics import mean_squared_error
import math

rmse = math.sqrt(mean_squared_error(res_gs, res_pred))
print(f"Test RMSE: {rmse:.4f}")


Test RMSE: 1.9889


In [32]:
# TODO complete the rest

In [35]:
# Saving the model
torch.save(model.state_dict(), "cpg_model.pt")
