# CpG Detector using LSTM

### 1. Introduction

##### This project focuses on predicting the occurrence of CpG sites in DNA sequences using an LSTM-based deep learning model. The dataset consists of randomly generated DNA sequences, which are converted into numerical representations for training. The model learns patterns in the sequences and predicts the CpG site count.

##### To handle sequences of varying lengths, we implemented padding and packing techniques using PyTorch's pack_padded_sequence. The project includes data preparation, model training, evaluation, and a prediction function to test unseen sequences. This approach demonstrates the application of LSTMs in bioinformatics for sequence-based prediction tasks.


In [None]:
--------------------------------------------------------------------------------------------------------------------------------

In [2]:
#pip install torch

Collecting torchNote: you may need to restart the kernel to use updated packages.
  Downloading torch-2.4.1-cp38-cp38-win_amd64.whl (199.4 MB)
Collecting typing-extensions>=4.8.0
  Using cached typing_extensions-4.12.2-py3-none-any.whl (37 kB)

Installing collected packages: typing-extensions, torch
  Attempting uninstall: typing-extensions
    Found existing installation: typing-extensions 3.7.4.3
    Uninstalling typing-extensions-3.7.4.3:
      Successfully uninstalled typing-extensions-3.7.4.3
Successfully installed torch-2.4.1 typing-extensions-4.12.2


##  2. Importing Required Libraries
##### Loading necessary packages for data handling, model building, and training

In [1]:
from typing import Sequence
from functools import partial
import random
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from torch import nn, optim

## 3. Setting Seed for Reproducibility
##### Ensuring consistent results across different runs

In [2]:
# DO NOT CHANGE HERE


def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

# Use this for getting x label 
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers   
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

## 4. Data Preparation and Generation

##### *  Generating synthetic DNA sequences
##### * Converting DNA sequences to integer format
##### * Counting CpG sites as target labels

In [3]:
# Prepare Data 
def prepare_data(num_samples=100):
    X_dna_seqs_train = list(rand_sequence(num_samples)) # generate the dna sequences
    temp = ["".join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train] # converting dna to int
    y_dna_seqs = [count_cpgs(seq) for seq in temp] # target
    return X_dna_seqs_train, y_dna_seqs 

train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)

## Verifying Generated Data
#### This section prints the samples from the generated training data (train_x ,train_y  , test_x ,train_y) to verify the data generation process.

In [4]:
print(train_x[:3])
print("------------")
print(train_y[:3])

[[2, 2, 1, 1, 1, 1, 1, 1, 0, 4, 1, 2, 0, 3, 1, 4, 0, 2, 1, 0, 2, 3, 3, 1, 2, 2, 1, 3, 4, 4, 3, 2, 3, 2, 0, 2, 4, 2, 3, 4, 4, 1, 3, 3, 4, 1, 2, 1, 1, 4, 2, 2, 2, 3, 2, 4, 2, 3, 1, 4, 3, 4, 1, 4, 1, 1, 2, 1, 0, 3, 3, 3, 0, 3, 0, 1, 1, 3, 3, 2, 1, 3, 2, 2, 1, 2, 4, 1, 4, 1, 3, 1, 4, 0, 4, 4, 0, 2, 4, 4, 2, 1, 2, 1, 1, 4, 0, 1, 1, 0, 3, 4, 4, 4, 0, 3, 2, 0, 4, 0, 1, 2, 0, 3, 2, 2, 4, 4], [0, 3, 4, 2, 0, 0, 2, 2, 1, 3, 4, 4, 4, 3, 1, 1, 2, 0, 3, 4, 1, 1, 2, 3, 1, 2, 3, 4, 1, 2, 1, 4, 2, 3, 1, 2, 3, 0, 2, 3, 3, 0, 0, 0, 4, 0, 2, 1, 3, 3, 4, 3, 2, 0, 2, 4, 3, 1, 1, 0, 0, 2, 1, 1, 2, 2, 4, 3, 2, 0, 2, 1, 0, 2, 3, 3, 4, 4, 3, 0, 2, 4, 2, 1, 0, 2, 1, 4, 1, 0, 1, 1, 0, 3, 2, 0, 2, 2, 1, 0, 1, 1, 4, 3, 1, 4, 3, 4, 0, 4, 0, 0, 4, 1, 4, 2, 4, 3, 2, 1, 4, 2, 2, 0, 2, 0, 0, 1], [0, 0, 4, 2, 0, 2, 3, 1, 4, 2, 0, 0, 0, 2, 3, 3, 3, 3, 2, 1, 1, 3, 2, 0, 4, 0, 4, 2, 2, 2, 1, 1, 2, 3, 1, 3, 3, 2, 1, 3, 4, 4, 3, 1, 3, 3, 1, 0, 4, 0, 3, 4, 1, 1, 0, 0, 4, 1, 2, 0, 3, 4, 4, 0, 1, 3, 0, 4, 0, 1, 2, 4, 1, 0, 4, 3

In [5]:
print(test_x[:3])
print("---------")
print(test_y[:3])

[[3, 3, 3, 0, 2, 3, 2, 2, 2, 0, 2, 4, 2, 4, 4, 1, 3, 3, 3, 3, 3, 1, 1, 0, 0, 2, 1, 4, 4, 4, 0, 3, 1, 2, 4, 3, 0, 3, 4, 0, 2, 3, 4, 0, 4, 3, 2, 1, 1, 1, 4, 1, 2, 4, 3, 0, 1, 0, 0, 4, 3, 2, 2, 3, 4, 3, 4, 1, 1, 4, 4, 1, 4, 0, 0, 2, 3, 0, 4, 1, 2, 4, 3, 4, 4, 0, 0, 3, 2, 0, 2, 2, 1, 2, 0, 3, 2, 2, 2, 1, 3, 0, 1, 3, 0, 4, 3, 1, 3, 0, 0, 0, 3, 0, 4, 0, 1, 3, 2, 2, 1, 1, 2, 2, 1, 0, 3, 3], [3, 3, 2, 4, 3, 2, 3, 2, 0, 2, 1, 4, 2, 0, 2, 0, 2, 1, 0, 3, 4, 3, 0, 4, 4, 3, 2, 1, 4, 1, 3, 2, 2, 2, 1, 4, 4, 0, 4, 1, 3, 2, 2, 0, 3, 3, 0, 2, 3, 0, 0, 0, 2, 4, 4, 2, 0, 2, 4, 1, 2, 1, 0, 2, 2, 0, 2, 1, 3, 3, 1, 3, 2, 2, 2, 0, 2, 3, 4, 4, 2, 1, 1, 1, 3, 1, 0, 2, 1, 3, 0, 0, 0, 3, 1, 4, 4, 1, 1, 4, 3, 3, 0, 4, 1, 3, 4, 3, 0, 4, 0, 2, 3, 0, 3, 2, 2, 1, 0, 2, 0, 4, 3, 3, 1, 1, 1, 3], [0, 3, 0, 4, 4, 3, 4, 0, 4, 4, 0, 4, 0, 2, 2, 2, 3, 0, 3, 2, 1, 1, 4, 3, 4, 2, 3, 3, 1, 3, 1, 2, 0, 4, 4, 0, 4, 1, 2, 0, 1, 2, 4, 3, 2, 4, 3, 1, 3, 1, 2, 3, 0, 2, 2, 3, 2, 0, 1, 4, 1, 0, 0, 2, 3, 4, 3, 4, 3, 2, 4, 4, 3, 2, 4, 1

## 5. LSTM Model Configuration
##### Defining hyperparameters (LSTM layers, hidden units, learning rate, etc.)

In [6]:
LSTM_HIDDEN = 64
LSTM_LAYER = 2
LSTM_BIDIRECTIONAL = True   #  Allows the LSTM model to learn from both past and future contexts, improving performance.
DROPOUT_RATE = 0.3   #  Helps prevent overfitting by randomly deactivating some neurons during training.
EMBEDDING_DIM = 32
batch_size = 32
learning_rate = 0.001
epoch_num = 4
WEIGHT_DECAY = 1e-5  #  Used in the optimizer to apply L2 regularization, which helps in controlling overfitting.
GRAD_CLIP = 5.0  #  Prevents exploding gradients by capping the gradient values.

## 6. Creating PyTorch Datasets and DataLoaders

##### * Implementing a custom PyTorch dataset
##### * Defining data loaders for batch processing

In [7]:
# Dataset and DataLoader
class MyDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels
    
    def __getitem__(self, index):
        return torch.tensor(self.sequences[index], dtype=torch.long), torch.tensor(self.labels[index], dtype=torch.float32)
    
    def __len__(self):
        return len(self.sequences)

train_dataset = MyDataset(train_x, train_y)
test_dataset = MyDataset(test_x, test_y)




train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

## 7. LSTM Model Definition: CpG Predictor 

##### * Creating an embedding layer for DNA sequences
##### * Implementing LSTM layers for sequence modeling
##### * Adding a fully connected output layer






In [8]:
# Model Definition
class CpGPredictor(nn.Module):
    def __init__(self):
        super(CpGPredictor, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=5, embedding_dim=EMBEDDING_DIM)
        self.lstm = nn.LSTM(input_size=EMBEDDING_DIM, hidden_size=LSTM_HIDDEN, 
                            num_layers=LSTM_LAYER, batch_first=True, 
                            bidirectional=LSTM_BIDIRECTIONAL, dropout=DROPOUT_RATE)
        self.classifier = nn.Linear(LSTM_HIDDEN * (2 if LSTM_BIDIRECTIONAL else 1), 1)
    
    def forward(self, x):
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x)
        logits = self.classifier(lstm_out[:, -1, :])
        return logits.squeeze()


## 8. Initializing Model, Loss Function, and Optimizer

##### * Setting up the loss function (Mean Squared Error)

##### * Configuring the Adam optimizer

In [9]:
# Initialize Model and Optimizer
model = CpGPredictor()
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=WEIGHT_DECAY)

## 9. Training the LSTM Model
##### * Implementing the training loop

##### * Performing forward and backward propagation

##### *  Updating model weights

In [10]:
# Training Loop
model.train()
for epoch in range(epoch_num):
    t_loss = 0.0
    for batch_x, batch_y in train_data_loader:
        optimizer.zero_grad()
        output = model(batch_x)
        loss = loss_fn(output, batch_y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), GRAD_CLIP)
        optimizer.step()
        t_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {t_loss:.4f}")

Epoch 1, Loss: 612.8225
Epoch 2, Loss: 268.9088
Epoch 3, Loss: 268.6634
Epoch 4, Loss: 264.5030


## 10. LSTM Evaluation
##### * Switching to evaluation mode

##### * Making predictions on test data

##### * Collecting and storing results

In [11]:
# Evaluation
model.eval()
res_gs = []
res_pred = []

with torch.no_grad():
    for batch_x, batch_y in test_data_loader:
        output = model(batch_x)
        res_gs.extend(batch_y.tolist())
        res_pred.extend(output.tolist())

In [12]:
from sklearn.metrics import mean_absolute_error

# Compute Mean Absolute Error (MAE)
mae = mean_absolute_error(res_gs, res_pred)
print(f"Mean Absolute Error (MAE) on test data: {mae:.4f}")

Mean Absolute Error (MAE) on test data: 1.5569


##  11. Predicting CpG Counts for New Sequences
##### Function to convert DNA sequence input into model predictions

##### * Testing with example DNA sequences

In [13]:
# Prediction Feature: Convert DNA sequence to model input and get prediction
def predict_cpg_count(dna_sequence: str):
    int_sequence = [dna2int.get(base, 0) for base in dna_sequence]  # Convert DNA bases to integers
    input_tensor = torch.tensor([int_sequence], dtype=torch.long)   # Convert to tensor
    model.eval()
    
    with torch.no_grad():
        prediction = model(input_tensor)
    
    return prediction.item()


# Example usage
test_sequence = "NCACANNTNCGGA"
predicted_cpg_count = predict_cpg_count(test_sequence)
print(f"Predicted CpG count for '{test_sequence}': {predicted_cpg_count:.2f}")


Predicted CpG count for 'NCACANNTNCGGA': 5.14


# Part 2

# 12. Handling Variable-Length DNA Sequences

### Random Seed and Sequence Processing Functions

In [158]:
# DO NOT CHANGE HERE
random.seed(13)

# Generating Random DNA Sequences
#The rand_sequence_var_len() function generates variable-length DNA sequences with random nucleotide values.
#These sequences are converted into their corresponding numerical representations.


# Use this for getting x label
def rand_sequence_var_len(n_seqs: int, lb: int = 16, ub: int = 128) -> Sequence[int]:
    for i in range(n_seqs):
        seq_len = random.randint(lb, ub)
        yield [random.randint(1, 5) for _ in range(seq_len)]


# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs


# Alphabet helpers
alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}
dna2int.update({"pad": 0})
int2dna.update({0: "<pad>"})

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

## 13. Generating Training and Testing Data

In [159]:
# Prepare Data
def prepare_data(num_samples=100, min_len=16, max_len=128):
    X_dna_seqs_train = list(rand_sequence_var_len(num_samples, min_len, max_len))
    temp = ["".join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train]
    y_dna_seqs = [count_cpgs(seq) for seq in temp]
    return X_dna_seqs_train, y_dna_seqs

## 14. Controlled Sequence Lengths

In [160]:
min_len, max_len = 64, 128
train_x, train_y = prepare_data(2048, min_len, max_len)
test_x, test_y = prepare_data(512, min_len, max_len)

In [161]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)

## 15. Padding Variable-Length Sequences for LSTM

In [162]:
# Collate function for variable-length sequences
class PadSequence:
    def __call__(self, batch):
        sequences, labels = zip(*batch)
        lengths = torch.tensor([len(seq) for seq in sequences])
        padded_sequences = pad_sequence([torch.tensor(seq, dtype=torch.long) for seq in sequences], batch_first=True, padding_value=0)
        labels = torch.tensor(labels, dtype=torch.float32)
        return padded_sequences, lengths, labels


batch_size = 32

train_dataset = MyDataset(train_x, train_y)
test_dataset = MyDataset(test_x, test_y)

train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=PadSequence())
test_data_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=PadSequence())


## 16. LSTM Model for Variable-Length Sequences

In [163]:
# Model Definition
class CpGPredictorVarLen(nn.Module):
    def __init__(self):
        super(CpGPredictorVarLen, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=6, embedding_dim=16)
        self.lstm = nn.LSTM(input_size=16, hidden_size=64, num_layers=3,dropout=0.2, batch_first=True)
        self.classifier = nn.Linear(64, 1)

    def forward(self, x, lengths):
        x = self.embedding(x)
        x_packed = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)
        lstm_out, _ = self.lstm(x_packed)
        lstm_out, _ = pad_packed_sequence(lstm_out, batch_first=True)
        logits = self.classifier(lstm_out[:, -1, :])
        return logits.squeeze()

## 17. Model loss function and Optimizer

In [164]:
model = CpGPredictorVarLen()
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.MSELoss()

## 18. Training

In [165]:
# Training Loop
epoch_num = 15
model.train()
for epoch in range(epoch_num):
    t_loss = 0.0
    for batch_x, lengths, batch_y in train_data_loader:
        optimizer.zero_grad()
        output = model(batch_x, lengths)
        loss = loss_fn(output, batch_y)
        loss.backward()
        optimizer.step()
        t_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {t_loss:.4f}")

print("Training and evaluation completed.")


  padded_sequences = pad_sequence([torch.tensor(seq, dtype=torch.long) for seq in sequences], batch_first=True, padding_value=0)


Epoch 1, Loss: 1121.1155
Epoch 2, Loss: 1075.7621
Epoch 3, Loss: 1056.9942
Epoch 4, Loss: 1024.1830
Epoch 5, Loss: 1001.6663
Epoch 6, Loss: 974.3478
Epoch 7, Loss: 939.2596
Epoch 8, Loss: 923.8299
Epoch 9, Loss: 900.7290
Epoch 10, Loss: 870.1682
Epoch 11, Loss: 844.4788
Epoch 12, Loss: 829.3928
Epoch 13, Loss: 804.0160
Epoch 14, Loss: 783.2110
Epoch 15, Loss: 769.9051
Training and evaluation completed.


## 19. Evaluate The Model

In [166]:
from sklearn.metrics import mean_absolute_error, r2_score

def evaluate_model(model, test_loader):
    model.eval()
    predictions, actuals = [], []
    
    with torch.no_grad():
        for batch_x, lengths, batch_y in test_loader:
            output = model(batch_x, lengths)
            predictions.extend(output.cpu().numpy())
            actuals.extend(batch_y.cpu().numpy())

    mae = mean_absolute_error(actuals, predictions)
    r2 = r2_score(actuals, predictions)
    
    print(f"Evaluation Results:\nMAE: {mae:.4f}, R² Score: {r2:.4f}")

# Call evaluation function
evaluate_model(model, test_data_loader)


  padded_sequences = pad_sequence([torch.tensor(seq, dtype=torch.long) for seq in sequences], batch_first=True, padding_value=0)


Evaluation Results:
MAE: 2.8953, R² Score: -2.0702


## 20. Predicting CpG Counts from Variable-Length DNA Sequences

In [167]:
def predict_cpg(model, input_sequence):
    model.eval()
    with torch.no_grad():
        input_tensor = torch.LongTensor(list(dnaseq_to_intseq(input_sequence))).unsqueeze(0)
        length_tensor = torch.tensor([len(input_sequence)])
        prediction = model(input_tensor, length_tensor)
        return prediction.item()

# Example prediction
example_sequence = "NCACCGGA"  # Replace with your sequence
predicted_cpg = predict_cpg(model, example_sequence)
print(f"Predicted CpG count for sequence '{example_sequence}': {predicted_cpg:.4f}")

Predicted CpG count for sequence 'NCACCGGA': 4.8976


## 21. save the trained model
##### The final trained model is saved to a specified directory for future use.

In [168]:
# Save the trained model
model_save_path = r"C:\Users\Nitheesh\OneDrive\Desktop\brototype\CpG Detector\CpG detector model\cpg_predictor.pth"  # Replace with your desired path
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")


Model saved to C:\Users\Nitheesh\OneDrive\Desktop\brototype\CpG Detector\CpG detector model\cpg_predictor.pth
