In [10]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from functools import partial
import matplotlib.pyplot as plt
import numpy as np
import random
import os

# ==== Mandatory Code (DO NOT CHANGE) ====
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        if dimer == "CG":
            cgs += 1
    return cgs

alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(5))}
int2dna = {i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

# ==== Device Setup ====
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ==== Attention Class ====
class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.attn = nn.Linear(hidden_dim * 2 + hidden_dim * 2, hidden_dim)  # [h;e] -> hidden_dim
        self.v = nn.Linear(hidden_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        seq_len = encoder_outputs.size(0)
        batch_size = encoder_outputs.size(1)
        hidden = hidden.unsqueeze(0).repeat(seq_len, 1, 1)  # [seq_len, batch_size, hidden_dim * 2]
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))  # [seq_len, batch_size, hidden_dim]
        attention = self.v(energy).squeeze(2)  # [seq_len, batch_size]
        return F.softmax(attention, dim=0)  # [seq_len, batch_size]

# ==== CpGPredictor Class ====
class CpGPredictor(nn.Module):
    def __init__(self, input_dim=5, embedding_dim=64, hidden_dim=256, num_layers=2, dropout=0.33262129231366233):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.embedding = nn.Embedding(input_dim, embedding_dim, padding_idx=0)
        self.layernorm = nn.LayerNorm(embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True,
                            dropout=dropout if num_layers > 1 else 0, bidirectional=True)
        self.attention = Attention(hidden_dim)
        self.classifier = nn.Linear(hidden_dim * 2, 1)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        embedded = self.layernorm(embedded)
        packed_embedded = torch.nn.utils.rnn.pack_padded_sequence(embedded, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_output, (hidden, _) = self.lstm(packed_embedded)
        lstm_out, _ = torch.nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        
        hidden = torch.cat((hidden[-2], hidden[-1]), dim=-1)  # Concatenate forward and backward hidden states
        attn_weights = self.attention(hidden, lstm_out.transpose(0, 1))
        attn_weights = attn_weights.transpose(0, 1).unsqueeze(1)
        context = torch.bmm(attn_weights, lstm_out)
        
        out = self.dropout(context.squeeze(1))
        prediction = self.classifier(out).squeeze()
        return context.squeeze(1), prediction, attn_weights.squeeze(1)

# ==== Prediction Function ====
def predict_cpg(model, sequence, return_attention=False):
    """
    Predict CG count for a given DNA sequence and optionally return attention weights.
    
    Args:
        model (nn.Module): Trained CpGPredictor model.
        sequence (str): DNA sequence as a string (e.g., "ATGCGCGTANCGCGAT").
        return_attention (bool): Whether to return attention weights.
    
    Returns:
        float or tuple: Predicted CG count, and optionally attention weights.
    """
    model.eval()
    int_seq = list(dnaseq_to_intseq(sequence.upper()))
    x = torch.tensor([int_seq], dtype=torch.long).to(device)
    lengths = torch.tensor([len(int_seq)], dtype=torch.long).to(device)
    
    with torch.no_grad():
        _, pred, attn_weights = model(x, lengths)
    pred = pred.item()
    if return_attention:
        return pred, attn_weights.squeeze().cpu().numpy()
    return pred

# ==== Testing Function ====
def test_model(model_path, test_sequences):
    """
    Test the trained model on new unseen sequences.
    
    Args:
        model_path (str): Path to the saved model weights (e.g., "bilstm_bahdanau_final.pt").
        test_sequences (list): List of DNA sequences as strings to test.
    """
    # Initialize model with the same architecture used during training
    model = CpGPredictor(
        input_dim=5, 
        embedding_dim=64, 
        hidden_dim=256, 
        num_layers=2, 
        dropout=0.3).to(device)
    
    '''
    Best_hyperparameters: {
    'embedding_dim': 64, 
    'hidden_dim': 256, 
    'num_layers': 2, 
    'dropout': 0.33262129231366233, 
    'lr': 0.0008572034020671933, 
    'weight_decay': 3.4321573869941195e-06, 
    'batch_size': 32}
    '''
    
    # Load the trained weights
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file {model_path} not found. Please train the model first.")
    model.load_state_dict(torch.load(model_path, map_location=device))
    print(f"Loaded model from {model_path}")

    # Test each sequence
    for seq in test_sequences:
        actual_cg = count_cpgs(seq)
        pred_cg, attn_weights = predict_cpg(model, seq, return_attention=True)
        
        print(f"\nSequence: {seq}")
        print(f"Actual CG Count: {actual_cg}")
        print(f"Predicted CG Count: {pred_cg:.2f}")
        print(f"Absolute Error: {abs(actual_cg - pred_cg):.2f}")

        # Visualize attention weights
        plt.figure(figsize=(12, 4))
        plt.bar(range(len(attn_weights)), attn_weights)
        plt.title(f"Attention Weights for Sequence: {seq[:20]}... (Actual: {actual_cg}, Predicted: {pred_cg:.2f})")
        plt.xlabel("Position")
        plt.ylabel("Attention Weight")
        plt.savefig(f"attention_{seq[:10]}.png")
        plt.close()


In [11]:
  # Example prediction and attention visualization
test_seq = "ATGCGCGTANCGCCGNCCGGCGCGTANCTACGGCGCGTANCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTAGCGTANCCGCGTANNCCGCGTANCAT"
pred, attn = predict_cpg(model, test_seq, return_attention=True)
actual = count_cpgs(test_seq)
logging.info(f"Prediction for {test_seq}: {pred:.2f}, Actual: {actual}")

INFO:root:Prediction for ATGCGCGTANCGCCGNCCGGCGCGTANCTACGGCGCGTANCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTAGCGTANCCGCGTANNCCGCGTANCAT: 0.01, Actual: 26


In [12]:

print(os.getcwd())


a:\WORKING STREMLIT PROJECTS\CpG_Predictor_App\test


In [13]:
import os

root_dir = r'c:\Users\anilj.ANIL_JOSEPH\OneDrive\Desktop\final_dna_testing'
os.chdir(root_dir)

print(os.getcwd())  

c:\Users\anilj.ANIL_JOSEPH\OneDrive\Desktop\final_dna_testing


In [14]:
import os

model_path = './models/bilstm_bahdanau_final.pt'
if not os.path.exists(model_path):
    print(f"Model file {model_path} not found.")
else:
    test_model(model_path, test_sequences)


Loaded model from ./models/bilstm_bahdanau_final.pt


TypeError: 'function' object is not iterable

In [None]:
# ==== Main Execution ====
if __name__ == "__main__":
    # Example unseen test sequences (you can modify these)
    test_sequences = [
        "ATGCGCGTANCGCGAT",           # Short sequence
        "CGCGATCGCGATCGCGATCGCGAT",   # High CG density
        "ATGC" * 50,                    # Repeated pattern, length 200
        "N" * 100 + "CG" * 50 + "AT" * 50,  # Mixed pattern, length 200
    ]

    # Path to the trained model
    model_path = r"./models/bilstm_bahdanau_final.pt"

    # Run the test
    try:
        test_model(model_path, test_sequences)
    except Exception as e:
        print(f"Error during testing: {e}")

Loaded model from ./models/bilstm_bahdanau_final.pt

Sequence: ATGCGCGTANCGCGAT
Actual CG Count: 4
Predicted CG Count: 23.74
Absolute Error: 19.74

Sequence: CGCGATCGCGATCGCGATCGCGAT
Actual CG Count: 8
Predicted CG Count: 24.74
Absolute Error: 16.74

Sequence: ATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGC
Actual CG Count: 0
Predicted CG Count: 0.07
Absolute Error: 0.07

Sequence: NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGCGATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATATAT
Actual CG Count: 50
Predicted CG Count: 14.39
Absolute Error: 35.61


In [None]:
import logging
logging.basicConfig(level=logging.INFO)

# ==== Load Trained Model ====
model = CpGPredictor(
    input_dim=5, 
    embedding_dim=64, 
    hidden_dim=256, 
    num_layers=2, 
    dropout=0.33262129231366233).to(device)

model_path = r"./models/bilstm_bahdanau_final.pt"  # Update this if your model filename is different

# Load the trained model weights
if not os.path.exists(model_path):
    raise FileNotFoundError(f"Model file {model_path} not found.")
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()
logging.info("Model loaded successfully.")

# ==== Example Sequence ====
test_seq = "ATGCGCGTANCGGGCGCCCGCGTANCATCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTANCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTAGCGTANCCGCGTANNCCGCGTANCATCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTAGCGTANCCGCCCGNCCGGCGCGTANCTACGGCGCCCGCGTANCATCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTANCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTAGCGTANCCGCGTANNCCGCGTANCATCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTAGCGTANCCGC"

# ==== Prediction ====
pred, attn = predict_cpg(model, test_seq, return_attention=True)
actual = count_cpgs(test_seq)

# ==== Logging and Visualization ====
logging.info(f"Sequence: {test_seq}")
logging.info(f"Actual CpG count: {actual}")
logging.info(f"Predicted CpG count: {pred:.2f}")
logging.info(f"Absolute Error: {abs(pred - actual):.2f}")

# ==== Attention Visualization ====
plt.figure(figsize=(14, 4))
plt.bar(range(len(attn)), attn)
plt.title(f"Attention Weights for Input Sequence\nActual: {actual}, Predicted: {pred:.2f}")
plt.xlabel("Nucleotide Position")
plt.ylabel("Attention Weight")
plt.tight_layout()
plt.savefig("attention_sample.png")
plt.show()


FileNotFoundError: Model file ./models/bilstm_bahdanau_final.pt not found.

In [None]:
import os
import logging
import torch
import matplotlib.pyplot as plt

def test_sample_sequence(
    sequence: str,
    model_path: str = r"./models/bilstm_bahdanau_final.pt" ,
    visualize_attention: bool = True
):
    """
    Loads the trained model, predicts CpG count for a given sequence, and optionally visualizes attention weights.

    Args:
        sequence (str): DNA sequence to test.
        model_path (str): Path to the saved model weights.
        visualize_attention (bool): Whether to display attention plot.
    """
    logging.basicConfig(level=logging.INFO)

    # Model hyperparameters - must match training time
    model = CpGPredictor(
        input_dim=5,
        embedding_dim=64,
        hidden_dim=256,
        num_layers=2,
        dropout=0.33262129231366233
    ).to(device)

    # Load model weights
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file '{model_path}' not found.")
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    logging.info("✅ Model loaded successfully.")

    # Predict
    pred, attn = predict_cpg(model, sequence, return_attention=True)
    actual = count_cpgs(sequence)

    # Logging
    logging.info(f"📌 Sequence: {sequence}")
    logging.info(f"✅ Actual CpG count: {actual}")
    logging.info(f"🧠 Predicted CpG count: {pred:.2f}")
    logging.info(f"❗ Absolute Error: {abs(pred - actual):.2f}")

    # Visualization
    if visualize_attention:
        plt.figure(figsize=(14, 4))
        plt.bar(range(len(attn)), attn)
        plt.title(f"Attention Weights for Input Sequence\nActual: {actual}, Predicted: {pred:.2f}")
        plt.xlabel("Nucleotide Position")
        plt.ylabel("Attention Weight")
        plt.tight_layout()
        plt.savefig("attention_sample.png")
        plt.show()


Perfect idea! Let’s extend the function so it can:

✅ Handle multiple sequences

✅ Save predictions and attention plot to files

✅ Write a dedicated test log to ./logs/testing/ directory

In [None]:
import os
import logging
import torch
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime

def test_sequences(
    sequences,
    model_path=r"./models/bilstm_bahdanau_final.pt" ,
    output_dir="./logs/testing/testing_image",
    log_dir="./logs/testing/testing_logs",
    save_attention=True,
    save_csv=True
):
    """
    Predicts CpG counts for a list of sequences using a pre-trained model and logs results.

    Args:
        sequences (list[str]): List of DNA sequences.
        model_path (str): Path to the saved model.
        output_dir (str): Directory to save attention plots.
        log_dir (str): Directory to save log file.
        save_attention (bool): Whether to save attention bar plots.
        save_csv (bool): Whether to save results in CSV format.
    """

    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(log_dir, exist_ok=True)

    # Setup timestamped logging
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = os.path.join(log_dir, f"test_log_{timestamp}.log")
    logging.basicConfig(
        filename=log_file,
        filemode="w",
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s"
    )

    # Load the model
    model = CpGPredictor(
        input_dim=5,
        embedding_dim=64,
        hidden_dim=256,
        num_layers=2,
        dropout=0.33262129231366233
    ).to(device)

    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file '{model_path}' not found.")
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    logging.info("✅ Model loaded successfully.")

    # For CSV logging
    results = []

    for idx, seq in enumerate(sequences, 1):
        pred, attn = predict_cpg(model, seq, return_attention=True)
        actual = count_cpgs(seq)
        error = abs(pred - actual)

        logging.info(f"\nSequence {idx}:")
        logging.info(f"📌 {seq}")
        logging.info(f"✅ Actual CpG: {actual}, 🧠 Predicted CpG: {pred:.2f}, ❗ Error: {error:.2f}")

        results.append({
            "Sequence_ID": f"Seq_{idx}",
            "Sequence": seq,
            "Actual_CpG": actual,
            "Predicted_CpG": round(pred, 2),
            "Error": round(error, 2)
        })

        if save_attention:
            plt.figure(figsize=(14, 4))
            plt.bar(range(len(attn)), attn)
            plt.title(f"Attention - Seq {idx}\nActual: {actual}, Predicted: {pred:.2f}")
            plt.xlabel("Nucleotide Position")
            plt.ylabel("Attention Weight")
            plt.tight_layout()
            plot_path = os.path.join(output_dir, f"attention_seq_{idx}.png")
            plt.savefig(plot_path)
            plt.close()
            logging.info(f"🖼️ Attention plot saved to: {plot_path}")

    # Save results to CSV
    if save_csv:
        df = pd.DataFrame(results)
        csv_path = os.path.join(output_dir, f"prediction_results_{timestamp}.csv")
        df.to_csv(csv_path, index=False)
        logging.info(f"📝 Predictions saved to: {csv_path}")

    print(f"✅ Testing complete. Logs: {log_file}, Results: {csv_path if save_csv else 'Not saved'}")
    

In [None]:
test_seqs = [
    "ATGCGCGTANCGCCGNCCGGCGCGTANCTACGGCGCGTANCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTAGCGTANCCGCGTANNCCGCGTANCAT",
    "CGTANCGCGCGTANCGCCGNCGTACGCGTANCTACGGCGCGTANCCGCGTANCGCCGCGCGCGTAGCGTANCGCGCGTANCTACGGCGCGTANCAT",
]

test_sequences(test_seqs)

# ==== Attention Visualization ====
plt.figure(figsize=(14, 4))
plt.bar(range(len(attn)), attn)
plt.title(f"Attention Weights for Input Sequence\nActual: {actual}, Predicted: {pred:.2f}")
plt.xlabel("Nucleotide Position")
plt.ylabel("Attention Weight")
plt.tight_layout()
plt.savefig("attention_sample.png")
plt.show()


FileNotFoundError: Model file './models/bilstm_bahdanau_final.pt' not found.

In [None]:
import os
import logging
import torch
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime

def test_sequences(
    sequences,
    model_path=r"./models/bilstm_bahdanau_final.pt",
    output_dir="./logs/testing/testing_image",
    log_dir="./logs/testing/testing_logs",
    save_attention=True,
    save_csv=True
):
    """
    Predicts CpG counts for a list of sequences using a pre-trained model and logs results.

    Args:
        sequences (list[str]): List of DNA sequences.
        model_path (str): Path to the saved model.
        output_dir (str): Directory to save attention plots.
        log_dir (str): Directory to save log file.
        save_attention (bool): Whether to save attention bar plots.
        save_csv (bool): Whether to save results in CSV format.
    """

    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(log_dir, exist_ok=True)

    # Setup timestamped logging
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = os.path.join(log_dir, f"test_log_{timestamp}.log")
    logging.basicConfig(
        filename=log_file,
        filemode="w",
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s"
    )

    # Load the model
    model = CpGPredictor(
        input_dim=5,
        embedding_dim=64,
        hidden_dim=256,
        num_layers=2,
        dropout=0.33262129231366233
    ).to(device)

    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file '{model_path}' not found.")
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    logging.info("✅ Model loaded successfully.")

    results = []

    # For storing attention info for later visualization
    all_attentions = []

    for idx, seq in enumerate(sequences, 1):
        pred, attn = predict_cpg(model, seq, return_attention=True)
        actual = count_cpgs(seq)
        error = abs(pred - actual)

        logging.info(f"\nSequence {idx}:")
        logging.info(f"📌 {seq}")
        logging.info(f"✅ Actual CpG: {actual}, 🧠 Predicted CpG: {pred:.2f}, ❗ Error: {error:.2f}")

        results.append({
            "Sequence_ID": f"Seq_{idx}",
            "Sequence": seq,
            "Actual_CpG": actual,
            "Predicted_CpG": round(pred, 2),
            "Error": round(error, 2)
        })

        # Save attention plot
        if save_attention:
            plt.figure(figsize=(14, 4))
            plt.bar(range(len(attn)), attn)
            plt.title(f"Attention - Seq {idx}\nActual: {actual}, Predicted: {pred:.2f}")
            plt.xlabel("Nucleotide Position")
            plt.ylabel("Attention Weight")
            plt.tight_layout()
            plot_path = os.path.join(output_dir, f"attention_seq_{idx}.png")
            plt.savefig(plot_path)
            plt.close()
            logging.info(f"🖼️ Attention plot saved to: {plot_path}")

        # Save for inline visualization
        all_attentions.append((idx, seq, actual, pred, attn))

    # Save results to CSV
    if save_csv:
        df = pd.DataFrame(results)
        csv_path = os.path.join(output_dir, f"prediction_results_{timestamp}.csv")
        df.to_csv(csv_path, index=False)
        logging.info(f"📝 Predictions saved to: {csv_path}")
    else:
        csv_path = None

    print(f"✅ Testing complete.\n Logs saved to: {log_file}\n📄 CSV saved to: {csv_path if save_csv else 'Not saved'}")

    # === Inline attention visualization ===
    for idx, seq, actual, pred, attn in all_attentions:
        print(f"\n📍 Sequence {idx}")
        print(f"Actual CpG: {actual}, Predicted CpG: {pred:.2f}")
        print(f"Sequence: {seq}")

        plt.figure(figsize=(14, 4))
        plt.bar(range(len(attn)), attn)
        plt.title(f"Attention Weights - Seq {idx}")
        plt.xlabel("Nucleotide Position")
        plt.ylabel("Attention Weight")
        plt.tight_layout()
        plt.show()


In [None]:
test_seqs = [
    "ATGCGCGTANCGCCGNCCGGCGCGTANCTACGGCGCGTANCCGCGTANCGCCGNCCGGCGCGTANCTANCGCGGCGCGTAGCGTANCCGCGTANNCCGCGTANCAT",
    "CGTANCGCGCGTANCGCCGNCGTACGCGTANCTACGGCGCGTANCCGCGTANCGCCGCGCGCGTAGCGTANCGCGCGTANCTACGGCGCGTANCAT",
]

test_sequences(test_seqs)


TypeError: __init__() got an unexpected keyword argument 'embedding_dim'