# **CpG Site Detection Using LSTM**


**Overview**

This notebook implements an LSTM-based model to detect and count CpG sites in DNA sequences. The model processes DNA sequences (strings of N, A, C, G, T) and predicts the number of CG dimers present.


**Prerequisites**

PyTorch
NumPy
Basic understanding of DNA sequences and CpG sites

**Contents**

Data Preparation & Processing
Model Implementation
Training Pipeline
Evaluation & Testing
Results Analysis

In [2]:
!pip install torch numpy streamlit


Collecting streamlit
  Downloading streamlit-1.41.1-py2.py3-none-any.whl.metadata (8.5 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.41.1-py2.py3-none-any.whl (9.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.1/9.1 MB[0m [31m61.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m88.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl (79 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[

In [3]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from typing import Sequence
from functools import partial
import numpy as np
import random


THOUGHT PROCESS 1: Understanding the Problem
- We need to count CpG sites (where C is followed by G) in DNA sequences
- This is a sequence analysis problem where context matters
- LSTM is suitable because:
  1. It can capture sequential patterns
  2. It has memory to remember previous nucleotides
  3. It can handle variable-length sequences

In [4]:
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

THOUGHT PROCESS 2: Data Representation
- DNA sequences use NACGT alphabet
- Need to convert between:
  * DNA sequences (strings)
  * Integer encodings (for model input)
- One-hot encoding will be used for model input

In [21]:
def dnaseq_to_intseq(sequence):
    """Convert a DNA sequence to integer encoding, handling invalid characters"""
    return [dna2int.get(base, 0) for base in sequence]  # Default to 'N' -> 0


THOUGHT PROCESS 3: Data Generation
- Need synthetic data for training
- Should generate:
  * Random DNA sequences
  * Corresponding CpG counts
- Must ensure balanced representation of CpG sites

In [22]:
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    """Generate random DNA sequences"""
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

def count_cpgs(seq: str) -> int:
    """Count CpG sites in sequence"""
    cgs = 0
    for i in range(0, len(seq) - 1):
        if seq[i:i+2] == "CG":
            cgs += 1
    return cgs

def prepare_data(num_samples=100):
    """Prepare training data with labels"""
    X_dna_seqs = list(rand_sequence(num_samples))
    temp = [''.join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs]
    y_dna_seqs = [count_cpgs(seq) for seq in temp]
    return X_dna_seqs, y_dna_seqs

THOUGHT PROCESS 4: Dataset Organization
- Need custom Dataset class for PyTorch
- Must handle:
  * Batch processing
  * Tensor conversion
  * Length tracking

In [23]:
class DNADataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx]), torch.tensor(self.labels[idx], dtype=torch.float32)


THOUGHT PROCESS 5: Model Architecture
- LSTM-based architecture choices:
  * Bidirectional: No (not needed for simple counting)
  * Layers: 2 (balance between complexity and efficiency)
  * Hidden size: 128 (sufficient for pattern recognition)
  * Dropout: 0.2 (prevent overfitting)

In [24]:
class CpGPredictor(nn.Module):
    def __init__(self, input_size=5, hidden_size=128, num_layers=2):
        super(CpGPredictor, self).__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=0.2
        )
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        x = torch.nn.functional.one_hot(x, num_classes=5).float()
        lstm_out, _ = self.lstm(x)
        final_hidden = lstm_out[:, -1, :]
        return self.classifier(final_hidden).squeeze()

THOUGHT PROCESS 6: Training Configuration
- Hyperparameters chosen:
  * Batch size: 32 (standard choice)
  * Learning rate: 0.001 (typical for Adam)
  * Epochs: 10 (sufficient for convergence)
  * Loss: MSE (regression problem)

In [25]:
def train_model(model, train_loader, criterion, optimizer, epochs, device):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch_x, batch_y in train_loader:
            batch_x = batch_x.to(device)
            batch_y = batch_y.to(device)

            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}")


THOUGHT PROCESS 7: Training Implementation
Now let's put it all together and train the moDEL

In [34]:
def main():
    # Configuration
    BATCH_SIZE = 32
    LEARNING_RATE = 0.001
    EPOCHS = 10
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Prepare data
    print("Preparing data...")
    train_x, train_y = prepare_data(2048)  # 2048 training samples
    test_x, test_y = prepare_data(512)     # 512 test samples

    # Create datasets and loaders
    train_dataset = DNADataset(train_x, train_y)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

    # Initialize model and training components
    print("Initializing model...")
    model = CpGPredictor().to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

    # Train
    print("Starting training...")
    train_model(model, train_loader, criterion, optimizer, EPOCHS, device)

    # Save model
    torch.save(model.state_dict(), 'cpg_model.pth')
    print("Training completed and model saved!")

if __name__ == "__main__":
    main()




Preparing data...
Initializing model...
Starting training...
Epoch 1/10, Loss: 10.3543
Epoch 2/10, Loss: 5.0045
Epoch 3/10, Loss: 5.0137
Epoch 4/10, Loss: 4.9410
Epoch 5/10, Loss: 5.0291
Epoch 6/10, Loss: 4.9241
Epoch 7/10, Loss: 4.9731
Epoch 8/10, Loss: 4.8853
Epoch 9/10, Loss: 5.0450
Epoch 10/10, Loss: 4.9507
Training completed and model saved!


THOUGHT PROCESS 8: Testing the Model
Let's create a simple function to test our model on new sequenc

In [35]:

def predict_cpgs(model, sequence):
    """Predict CpG count for a given sequence"""
    # Ensure the sequence contains only valid characters (NACGT)
    valid_chars = set('NACGT')
    if not all(char in valid_chars for char in sequence):
        raise ValueError("Sequence contains invalid characters. Only 'N', 'A', 'C', 'G', 'T' are allowed.")

    int_seq = list(dnaseq_to_intseq(sequence))  # Convert sequence to integer indices
    input_tensor = torch.tensor([int_seq]).to(device)

    model.eval()
    with torch.no_grad():
        prediction = model(input_tensor)

    return prediction.item()


In [37]:
# Loading the model
model = CpGPredictor().to(device)
model.load_state_dict(torch.load('/content/cpg_model.pth'))
model.eval()  # Set the model to evaluation mode

# Now you can use the model for predictions
test_sequence = "NCACANNTNCGGAGGCGNA"
predicted = predict_cpgs(model, test_sequence)
print(f"Predicted CpGs: {predicted:.2f}")


Predicted CpGs: 5.18


  model.load_state_dict(torch.load('/content/cpg_model.pth'))
