In [2]:
from typing import Sequence
from functools import partial
import random
import torch
import numpy as np
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import streamlit as st

ModuleNotFoundError: No module named 'torch'

In [None]:
# DO NOT CHANGE HERE
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)






In [None]:
# Use this for getting x label
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]: # type: ignore
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

In [None]:
# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        if dimer == "CG":
            cgs += 1
    return cgs

In [None]:
# Alphabet helpers   
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [None]:
# Prepare data
def prepare_data(num_samples=100):
    X_dna_seqs_train = list(rand_sequence(num_samples))
    temp = ["".join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train]  # Convert intseq to DNA seqs
    y_dna_seqs = [count_cpgs(seq) for seq in temp]  # Generate labels
    return X_dna_seqs_train, y_dna_seqs

train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)

In [None]:
# Config
LSTM_HIDDEN = 64
LSTM_LAYER = 2
batch_size = 32
learning_rate = 0.001
epoch_num = 10

In [None]:
# Dataset class
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)
    
def collate_fn(batch):
    sequences, labels = zip(*batch)
    lengths = torch.tensor([len(seq) for seq in sequences])
    padded_seqs = pad_sequence(sequences, batch_first=True, padding_value=0)
    return padded_seqs, torch.tensor(labels), lengths

train_dataset = MyDataset(train_x, train_y)
test_dataset = MyDataset(test_x, test_y)

train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn)
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, collate_fn=collate_fn)




In [None]:
# Model
class CpGPredictor(torch.nn.Module):
    def __init__(self):
        super(CpGPredictor, self).__init__()
        self.embedding = torch.nn.Embedding(5, 8, padding_idx=0)  # Embedding layer
        self.lstm = torch.nn.LSTM(input_size=8, hidden_size=LSTM_HIDDEN, num_layers=LSTM_LAYER, batch_first=True)
        self.classifier = torch.nn.Linear(LSTM_HIDDEN, 1)

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        packed = pack_padded_sequence(embedded, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_output, (hidden, _) = self.lstm(packed)
        output, _ = pad_packed_sequence(packed_output, batch_first=True)
        logits = self.classifier(hidden[-1])  # Use the last hidden state
        return logits.squeeze()

In [None]:
# Initialize model, loss function, optimizer
model = CpGPredictor()
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Training loop
model.train()
for epoch in range(epoch_num):
    t_loss = 0.0
    for batch in train_data_loader:
        x, y, lengths = batch
        optimizer.zero_grad()
        outputs = model(x, lengths)
        loss = loss_fn(outputs, y.float())
        loss.backward()
        optimizer.step()
        t_loss += loss.item()
    print(f"Epoch {epoch + 1}, Loss: {t_loss:.4f}")

In [None]:
# Evaluation loop
model.eval()
res_gs = []
res_pred = []

with torch.no_grad():
    for batch in test_data_loader:
        x, y, lengths = batch
        outputs = model(x, lengths)
        res_gs.extend(y.numpy())
        res_pred.extend(outputs.numpy())

In [None]:
# Streamlit App
st.title("CpG Detector using LSTM")

st.write("This app detects the number of CpGs in DNA sequences using a trained LSTM model.")

sequence_input = st.text_input("Enter a DNA sequence (N, A, C, G, T):")

if sequence_input:
    sequence_int = list(map(lambda x: dna2int.get(x, 0), sequence_input))
    sequence_tensor = torch.LongTensor([sequence_int])
    lengths_tensor = torch.tensor([len(sequence_int)])

    with torch.no_grad():
        model.eval()
        prediction = model(sequence_tensor, lengths_tensor).item()

    st.write(f"Predicted number of CpGs: {prediction:.2f}")

# Output results
st.subheader("Evaluation Results")
st.write("Ground Truth (first 10):", res_gs[:10])
st.write("Predictions (first 10):", res_pred[:10])