# New Section

In [4]:
pip install Biopython

Collecting Biopython
  Downloading biopython-1.85-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading biopython-1.85-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/3.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[91m‚ï∏[0m [32m3.3/3.3 MB[0m [31m168.5 MB/s[0m eta [36m0:00:01[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m3.3/3.3 MB[0m [31m90.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Biopython
Successfully installed Biopython-1.85


In [5]:
import numpy as np
from Bio import SeqIO
from sklearn.preprocessing import OneHotEncoder

def encode_sequence(seq, max_len=300):
    # A, C, G, T/U encoding
    seq = seq.upper().replace('U', 'T')
    mapping = {'A':0, 'C':1, 'G':2, 'T':3}
    arr = np.zeros((4, max_len), dtype=np.float32)
    for i, base in enumerate(seq[:max_len]):
        if base in mapping:
            arr[mapping[base], i] = 1.0
    return arr

def gc_content(seq):
    seq = seq.upper().replace('U','T')
    gc = seq.count('G') + seq.count('C')
    return gc / len(seq) if len(seq)>0 else 0

def orf_length(seq):
    start_codons = ['ATG']
    stop_codons = ['TAA', 'TAG', 'TGA']
    max_len = 0
    seq = seq.upper().replace('U','T')
    for frame in range(3):
        for i in range(frame, len(seq)-2, 3):
            codon = seq[i:i+3]
            if codon in start_codons:
                for j in range(i+3, len(seq)-2, 3):
                    stop_codon = seq[j:j+3]
                    if stop_codon in stop_codons:
                        length = j+3 - i
                        if length > max_len:
                            max_len = length
                        break
    return max_len

def load_fasta_encode(file, label, max_len=300):
    seqs, labels = [], []
    for record in SeqIO.parse(file, "fasta"):
        seqs.append(encode_sequence(str(record.seq), max_len))
        labels.append(label)
    return seqs, labels

def extract_features(sequences):
    feats = []
    for seq in sequences:
        # reconstruct seq string
        seq_str = ''.join(['ACGT'[i] if seq[i,j] == 1 else '' for j in range(seq.shape[1]) for i in range(4)])
        gc = gc_content(seq_str)
        orf_len = orf_length(seq_str)
        feats.append([gc, orf_len, len(seq_str), sum(seq.flatten())])
    return np.array(feats, dtype=np.float32)

if __name__ == "__main__":
    import os
    lnc_fasta = "lnc_RNA_data.fa"
    coding_fasta = "coding_data.fa"
    max_len = 300

    print("üîç Loading lncRNA sequences...")
    lnc_seqs, lnc_labels = load_fasta_encode(lnc_fasta, 1, max_len)
    print("üîç Loading mRNA sequences...")
    coding_seqs, coding_labels = load_fasta_encode(coding_fasta, 0, max_len)

    X_seq = np.array(lnc_seqs + coding_seqs)
    y = np.array(lnc_labels + coding_labels)

    print(f"‚úÖ Loaded {len(y)} sequences")

    # Extract features: GC content, ORF length, sequence length, total bases one-hot count
    X_feat = extract_features(X_seq)

    # Save numpy arrays
    os.makedirs("output", exist_ok=True)
    np.save("X_seq.npy", X_seq)
    np.save("X_feat.npy", X_feat)
    np.save("y.npy", y)

    print("üíæ Saved X_seq.npy, X_feat.npy, y.npy ")


üîç Loading lncRNA sequences...
üîç Loading mRNA sequences...
‚úÖ Loaded 20000 sequences
üíæ Saved X_seq.npy, X_feat.npy, y.npy 


In [6]:
# 03_train_diffusion_model.py (Updated with BiLSTM + Class Weights + Improved Regularization)

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt

# ========== Device ==========
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ========== Diffusion Block ==========
class DiffusionBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv = nn.Conv1d(channels, channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
    def forward(self, x):
        out = self.conv(x)
        return self.relu(out + x)

# ========== Hybrid Diffusion Model ==========
class HybridDiffusionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.seq_cnn = nn.Sequential(
            nn.Conv1d(4, 64, 3, padding=1),
            nn.ReLU(),
            DiffusionBlock(64),
            nn.Conv1d(64, 128, 3, padding=1),
            nn.ReLU(),
            DiffusionBlock(128),
            nn.MaxPool1d(2)
        )
        self.bilstm = nn.LSTM(128, 64, batch_first=True, bidirectional=True)
        self.feat_fc = nn.Sequential(
            nn.Linear(4, 32),
            nn.ReLU()
        )
        self.classifier = nn.Sequential(
            nn.Linear(64*2 + 32, 64),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(64, 2)
        )

    def forward(self, x_seq, x_feat):
        x = self.seq_cnn(x_seq)           # [B, 128, 150]
        x = x.permute(0, 2, 1)            # [B, 150, 128]
        _, (h_n, _) = self.bilstm(x)      # h_n: [2, B, 64]
        x_seq_feat = torch.cat((h_n[-2], h_n[-1]), dim=1)  # [B, 128]
        x_feat_proj = self.feat_fc(x_feat)                 # [B, 32]
        x = torch.cat([x_seq_feat, x_feat_proj], dim=1)
        return self.classifier(x)

# ========== Training Function ==========
def train():
    X_seq = np.load("X_seq.npy")
    X_feat = np.load("X_feat.npy")
    y = np.load("y.npy")

    X_seq = torch.tensor(X_seq, dtype=torch.float32)
    X_feat = torch.tensor(X_feat, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.long)

    dataset = TensorDataset(X_seq, X_feat, y)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_set, val_set = random_split(dataset, [train_size, val_size], generator=torch.Generator().manual_seed(42))

    train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=64)

    model = HybridDiffusionModel().to(device)

    # Class weights (optional tuning based on imbalance)
    class_counts = torch.bincount(y)
    weights = 1.0 / class_counts.float()
    weights = weights / weights.sum() * 2
    criterion = nn.CrossEntropyLoss(weight=weights.to(device))

    optimizer = optim.Adam(model.parameters(), lr=0.001)

    epochs = 50
    for epoch in range(1, epochs+1):
        model.train()
        total_loss = 0
        for x_seq_batch, x_feat_batch, label_batch in train_loader:
            x_seq_batch = x_seq_batch.to(device)
            x_feat_batch = x_feat_batch.to(device)
            label_batch = label_batch.to(device)

            optimizer.zero_grad()
            outputs = model(x_seq_batch, x_feat_batch)
            loss = criterion(outputs, label_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch:02d} | Train Loss: {avg_loss:.4f}")

        # Validation
        model.eval()
        val_preds = []
        val_labels = []
        with torch.no_grad():
            for x_seq_batch, x_feat_batch, label_batch in val_loader:
                x_seq_batch = x_seq_batch.to(device)
                x_feat_batch = x_feat_batch.to(device)
                outputs = model(x_seq_batch, x_feat_batch)
                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                val_preds.extend(preds)
                val_labels.extend(label_batch.numpy())

        print(classification_report(val_labels, val_preds, target_names=["mRNA", "lncRNA"]))
        print("Confusion Matrix:")
        print(confusion_matrix(val_labels, val_preds))
        print(f"Accuracy: {accuracy_score(val_labels, val_preds):.4f}\n")

    torch.save(model.state_dict(), "hybrid_diffusion_model.pt")
    print("‚úÖ Model saved to hybrid_diffusion_model.pt")

if __name__ == "__main__":
    train()


Epoch 01 | Train Loss: 1.1041
              precision    recall  f1-score   support

        mRNA       0.62      0.99      0.76      2005
      lncRNA       0.96      0.40      0.57      1995

    accuracy                           0.69      4000
   macro avg       0.79      0.69      0.66      4000
weighted avg       0.79      0.69      0.66      4000

Confusion Matrix:
[[1976   29]
 [1198  797]]
Accuracy: 0.6933

Epoch 02 | Train Loss: 0.5285
              precision    recall  f1-score   support

        mRNA       0.62      0.99      0.76      2005
      lncRNA       0.97      0.40      0.56      1995

    accuracy                           0.69      4000
   macro avg       0.80      0.69      0.66      4000
weighted avg       0.80      0.69      0.66      4000

Confusion Matrix:
[[1984   21]
 [1202  793]]
Accuracy: 0.6943

Epoch 03 | Train Loss: 0.5104
              precision    recall  f1-score   support

        mRNA       0.74      0.71      0.73      2005
      lncRNA       0.

In [14]:
# 04_evaluate_diffusion_model.py

import numpy as np
import torch
from torch.utils.data import TensorDataset, random_split, DataLoader
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from train_diffusion_model import HybridDiffusionModel  # assuming your training script named this way

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def evaluate():
    # Load data
    X_seq = np.load("X_seq.npy")
    X_feat = np.load("X_feat.npy")
    y = np.load("y.npy")

    X_seq = torch.tensor(X_seq, dtype=torch.float32)
    X_feat = torch.tensor(X_feat, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.long)

    dataset = TensorDataset(X_seq, X_feat, y)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    _, val_set = random_split(dataset, [train_size, val_size], generator=torch.Generator().manual_seed(42))

    val_loader = DataLoader(val_set, batch_size=64)

    # Load model
    model = HybridDiffusionModel().to(device)
    model.load_state_dict(torch.load("hybrid_diffusion_model.pt", map_location=device))
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for x_seq_batch, x_feat_batch, label_batch in val_loader:
            x_seq_batch = x_seq_batch.to(device)
            x_feat_batch = x_feat_batch.to(device)
            outputs = model(x_seq_batch, x_feat_batch)
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(label_batch.numpy())

    print("Classification Report:")
    print(classification_report(all_labels, all_preds, target_names=["mRNA", "lncRNA"]))
    print("Confusion Matrix:")
    print(confusion_matrix(all_labels, all_preds))
    print(f"Accuracy: {accuracy_score(all_labels, all_preds):.4f}")

if __name__ == "__main__":
    evaluate()


ModuleNotFoundError: No module named 'train_diffusion_model'

In [15]:
import numpy as np
import torch
from torch.utils.data import TensorDataset, random_split, DataLoader
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ======= Paste model class definitions here =======

class DiffusionBlock(torch.nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv = torch.nn.Conv1d(channels, channels, kernel_size=3, padding=1)
        self.relu = torch.nn.ReLU()
    def forward(self, x):
        out = self.conv(x)
        return self.relu(out + x)

class HybridDiffusionModel(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.seq_cnn = torch.nn.Sequential(
            torch.nn.Conv1d(4, 64, 3, padding=1),
            torch.nn.ReLU(),
            DiffusionBlock(64),
            torch.nn.Conv1d(64, 128, 3, padding=1),
            torch.nn.ReLU(),
            DiffusionBlock(128),
            torch.nn.MaxPool1d(2)
        )
        self.bilstm = torch.nn.LSTM(128, 64, batch_first=True, bidirectional=True)
        self.feat_fc = torch.nn.Sequential(
            torch.nn.Linear(4, 32),
            torch.nn.ReLU()
        )
        self.classifier = torch.nn.Sequential(
            torch.nn.Linear(64*2 + 32, 64),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.4),
            torch.nn.Linear(64, 2)
        )

    def forward(self, x_seq, x_feat):
        x = self.seq_cnn(x_seq)
        x = x.permute(0, 2, 1)
        _, (h_n, _) = self.bilstm(x)
        x_seq_feat = torch.cat((h_n[-2], h_n[-1]), dim=1)
        x_feat_proj = self.feat_fc(x_feat)
        x = torch.cat([x_seq_feat, x_feat_proj], dim=1)
        return self.classifier(x)

# ======= Evaluation function =======

def evaluate():
    # Load data
    X_seq = np.load("X_seq.npy")
    X_feat = np.load("X_feat.npy")
    y = np.load("y.npy")

    X_seq = torch.tensor(X_seq, dtype=torch.float32)
    X_feat = torch.tensor(X_feat, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.long)

    dataset = TensorDataset(X_seq, X_feat, y)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    _, val_set = random_split(dataset, [train_size, val_size], generator=torch.Generator().manual_seed(42))

    val_loader = DataLoader(val_set, batch_size=64)

    # Load model weights
    model = HybridDiffusionModel().to(device)
    model.load_state_dict(torch.load("hybrid_diffusion_model.pt", map_location=device))
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for x_seq_batch, x_feat_batch, label_batch in val_loader:
            x_seq_batch = x_seq_batch.to(device)
            x_feat_batch = x_feat_batch.to(device)
            outputs = model(x_seq_batch, x_feat_batch)
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(label_batch.numpy())

    print("Classification Report:")
    print(classification_report(all_labels, all_preds, target_names=["mRNA", "lncRNA"]))
    print("Confusion Matrix:")
    print(confusion_matrix(all_labels, all_preds))
    print(f"Accuracy: {accuracy_score(all_labels, all_preds):.4f}")

if __name__ == "__main__":
    evaluate()


  model.load_state_dict(torch.load("hybrid_diffusion_model.pt", map_location=device))


Classification Report:
              precision    recall  f1-score   support

        mRNA       0.92      0.92      0.92      2005
      lncRNA       0.92      0.92      0.92      1995

    accuracy                           0.92      4000
   macro avg       0.92      0.92      0.92      4000
weighted avg       0.92      0.92      0.92      4000

Confusion Matrix:
[[1840  165]
 [ 164 1831]]
Accuracy: 0.9177


In [18]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from Bio import SeqIO
import os

# ========== Device ==========
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ========== Diffusion Block ==========
class DiffusionBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv = nn.Conv1d(channels, channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
    def forward(self, x):
        out = self.conv(x)
        return self.relu(out + x)

# ========== Hybrid Diffusion Model ==========
class HybridDiffusionModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.seq_cnn = nn.Sequential(
            nn.Conv1d(4, 64, 3, padding=1),
            nn.ReLU(),
            DiffusionBlock(64),
            nn.Conv1d(64, 128, 3, padding=1),
            nn.ReLU(),
            DiffusionBlock(128),
            nn.MaxPool1d(2)
        )
        self.bilstm = nn.LSTM(128, 64, batch_first=True, bidirectional=True)
        self.feat_fc = nn.Sequential(
            nn.Linear(4, 32),
            nn.ReLU()
        )
        self.classifier = nn.Sequential(
            nn.Linear(64*2 + 32, 64),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(64, 2)
        )

    def forward(self, x_seq, x_feat):
        x = self.seq_cnn(x_seq)
        x = x.permute(0, 2, 1)
        _, (h_n, _) = self.bilstm(x)
        x_seq_feat = torch.cat((h_n[-2], h_n[-1]), dim=1)
        x_feat_proj = self.feat_fc(x_feat)
        x = torch.cat([x_seq_feat, x_feat_proj], dim=1)
        return self.classifier(x)

# ========== Feature Extraction ==========
def one_hot_encode(seq, max_len=300):
    mapping = {'A':0, 'C':1, 'G':2, 'U':3, 'T':3}
    one_hot = np.zeros((4, max_len), dtype=np.float32)
    seq = seq.upper().replace('T', 'U')
    for i in range(min(len(seq), max_len)):
        nt = seq[i]
        if nt in mapping:
            one_hot[mapping[nt], i] = 1.0
    return one_hot

def gc_content(seq):
    seq = seq.upper()
    gc = seq.count('G') + seq.count('C')
    return gc / len(seq) if len(seq) > 0 else 0

def orf_length(seq):
    seq = seq.upper().replace('T', 'U')
    start = 'AUG'
    stops = ['UAA', 'UAG', 'UGA']
    max_len = 0
    for frame in range(3):
        for i in range(frame, len(seq)-2, 3):
            codon = seq[i:i+3]
            if codon == start:
                for j in range(i+3, len(seq)-2, 3):
                    if seq[j:j+3] in stops:
                        orf = j + 3 - i
                        if orf > max_len:
                            max_len = orf
                        break
    return max_len / len(seq) if len(seq) > 0 else 0

def feature_extraction(seq):
    gc = gc_content(seq)
    orf = orf_length(seq)
    length = len(seq) / 300.0  # normalize
    at = 1 - gc
    return np.array([gc, orf, length, at], dtype=np.float32)

# ========== Prediction ==========
def predict(fasta_path):
    model = HybridDiffusionModel().to(device)
    model.load_state_dict(torch.load("hybrid_diffusion_model.pt", map_location=device))
    model.eval()

    seq_data, features, ids = [], [], []

    for record in SeqIO.parse(fasta_path, "fasta"):
        seq = str(record.seq)
        ids.append(record.id)
        seq_data.append(one_hot_encode(seq))
        features.append(feature_extraction(seq))

    X_seq = torch.tensor(np.array(seq_data), dtype=torch.float32).to(device)
    X_feat = torch.tensor(np.array(features), dtype=torch.float32).to(device)

    preds, probs = [], []

    with torch.no_grad():
        for i in range(0, len(X_seq), 64):
            s = X_seq[i:i+64]
            f = X_feat[i:i+64]
            out = model(s, f)
            prob = torch.softmax(out, dim=1)[:, 1].cpu().numpy()
            label = (prob > 0.5).astype(int)
            preds.extend(label)
            probs.extend(prob)

    # ========== Save Results ==========
    os.makedirs("output", exist_ok=True)

    # Save predictions
    df_preds = pd.DataFrame({
        "ID": ids,
        "Prediction": ["lncRNA" if p == 1 else "mRNA" for p in preds],
        "lncRNA_Probability": probs
    })
    df_preds.to_csv("output/predictions.csv", index=False)

    # Save features
    df_feat = pd.DataFrame(features, columns=["GC_Content", "ORF_Length", "Norm_Length", "AT_Content"])
    df_feat.insert(0, "ID", ids)
    df_feat.to_csv("output/features.csv", index=False)

    print("‚úÖ Prediction saved to output/predictions.csv")
    print("‚úÖ Features saved to output/features.csv")

if __name__ == "__main__":
    predict("chickpea_data.fa")


  model.load_state_dict(torch.load("hybrid_diffusion_model.pt", map_location=device))


‚úÖ Prediction saved to output/predictions.csv
‚úÖ Features saved to output/features.csv
