<a href="https://colab.research.google.com/github/PalepuRohith/Major_Project/blob/main/MUTATION_CLASSIFICATION.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, BertForSequenceClassification
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import random
from torch.optim.lr_scheduler import ReduceLROnPlateau
from huggingface_hub import login


In [None]:
!pip install transformers datasets

In [None]:
df = pd.read_csv("/content/humanvirus_updated.csv")

In [None]:
def create_kmers(sequence, k=6):
    """Convert a DNA sequence into k-mer representation with size k."""
    return " ".join([sequence[i : i + k] for i in range(len(sequence) - k + 1)])


In [None]:
df["kmer_sequence"] = df["Sequence"].apply(lambda seq: create_kmers(seq, k=6))

In [None]:
df["label"] = df["mutation_class"].map({"wildtype": 0, "mutated": 1})


In [None]:
tokenizer = AutoTokenizer.from_pretrained("zhihan1996/DNABERT-2-117M")

In [None]:
def tokenize_function(seq):
    """Tokenize and pad sequences using DNABERT-2."""
    encoded = tokenizer(
        seq,
        padding="max_length",
        truncation=True,
        max_length=512,
        return_tensors="pt",
    )
    return {"input_ids": encoded["input_ids"].squeeze(), "attention_mask": encoded["attention_mask"].squeeze()}


In [None]:
df["tokens"] = df["kmer_sequence"].apply(tokenize_function)


In [None]:
df = df.dropna(subset=['label'])

In [None]:
train_texts, val_texts, train_labels, val_labels = train_test_split(
    df["tokens"], df["label"], test_size=0.2, random_state=42, stratify=df["label"]
)


In [None]:
class SequenceDataset(Dataset):
    """Custom Dataset for Sequence Classification"""
    def __init__(self, texts, labels): # Changed _init_ to __init__
        self.texts = texts
        self.labels = labels

    def __len__(self): # Changed _len_ to __len__
        return len(self.labels)

    def __getitem__(self, idx): # Changed _getitem_ to __getitem__
        item = self.texts.iloc[idx]
        return (
            item["input_ids"],
            item["attention_mask"],
            torch.tensor(self.labels.iloc[idx], dtype=torch.float),
        )

In [None]:
train_dataset = SequenceDataset(train_texts, train_labels)
val_dataset = SequenceDataset(val_texts, val_labels)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


In [None]:
bert_model = BertForSequenceClassification.from_pretrained(
    "Peltarion/dnabert-minilm", num_labels=1,output_hidden_states=True
)

In [None]:
class BERT_LSTM(nn.Module):
    def __init__(self, bert_model):
        super(BERT_LSTM, self).__init__()
        self.bert = bert_model
        self.lstm = nn.LSTM(
            768, 128, num_layers=2, batch_first=True, bidirectional=True, dropout=0.3
        )
        self.dropout = nn.Dropout(0.4)
        self.fc1 = nn.Linear(128 * 2, 128)  # Increased to 128 for more complexity
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, input_ids, attention_mask):
        bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        bert_output = bert_output.hidden_states[-1]
        lstm_output, _ = self.lstm(bert_output)
        output = self.fc1(lstm_output[:, -1, :])
        output = torch.relu(self.fc2(output))
        output = self.fc3(output).squeeze()
        return output

In [None]:
batch_size = 32  # Increased batch size for stability
learning_rate = 3e-5  # Lower learning rate for better convergence
num_epochs = 8

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BERT_LSTM(bert_model).to(device)


In [None]:
class_counts = df["label"].value_counts().to_dict()
# Check if both labels (0 and 1) are present in class_counts
if 0 not in class_counts:
    class_counts[0] = 0  # Assign a count of 0 if label 0 is missing
if 1 not in class_counts:
    class_counts[1] = 0  # Assign a count of 0 if label 1 is missing
# Calculate weights, ensuring both labels are considered
weights = [1.0 / class_counts[i] if class_counts[i] != 0 else 1.0 for i in range(2)]
# If a label has 0 count, assign a weight of 1 to avoid division by zero
class_weights = torch.tensor(weights).to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=class_weights[1])

In [None]:
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)
scheduler = ReduceLROnPlateau(optimizer, mode="min", factor=0.3, patience=2, verbose=True)

In [None]:
early_stopping_patience = 3
best_val_loss = float("inf")
patience_counter = 0

In [None]:
def train_model(model, train_loader, val_loader, epochs=8):
    global best_val_loss, patience_counter
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for batch in train_loader:
            input_ids, attention_mask, labels = batch
            input_ids, attention_mask, labels = (
                input_ids.to(device),
                attention_mask.to(device),
                labels.to(device),
            )

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            running_loss += loss.item()

        avg_loss = running_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{epochs}, Training Loss: {avg_loss:.4f}")

        val_loss, val_acc, val_f1 = evaluate_model(model, val_loader)
        print(f"Validation - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}")

        scheduler.step(val_loss)

        # Early Stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), "best_model.pth")  # Save best model
        else:
            patience_counter += 1
            if patience_counter >= early_stopping_patience:
                print("Early stopping triggered!")
                break


In [None]:
def evaluate_model(model, val_loader):
    model.eval()
    val_loss, preds, true_labels = 0, [], []
    with torch.no_grad():
        for batch in val_loader:
            input_ids, attention_mask, labels = batch
            input_ids, attention_mask, labels = (
                input_ids.to(device),
                attention_mask.to(device),
                labels.to(device),
            )

            outputs = model(input_ids, attention_mask).squeeze()
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            preds.extend(torch.sigmoid(outputs).cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    val_loss /= len(val_loader)
    preds = np.array(preds)
    acc = accuracy_score(true_labels, np.round(preds))
    f1 = f1_score(true_labels, np.round(preds))
    return val_loss, acc, f1

In [None]:
 train_model(model, train_loader, val_loader, epochs=5)

In [None]:
val_loss, acc, f1, roc_auc = evaluate_model(model, val_loader)
print(f"Final Model - Accuracy: {acc:.4f}, F1-Score: {f1:.4f}, ROC-AUC: {roc_auc:.4f}")