In [None]:
import torch, numpy as np, random

seed = 40
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
import torch
torch.cuda.empty_cache()
torch.cuda.ipc_collect()

In [None]:
import pandas as pd
from Bio import SeqIO
local_fasta_path = 'TF_Training_Labeled_Combined.fasta'

# Load FASTA file using Biopython
sequences = []
for record in SeqIO.parse(local_fasta_path, "fasta"):
    # Split the description to extract label
    description_parts = record.description.split("%")
    label = int(description_parts[-1].split("LABEL=")[1])  # Extracting the numeric part of the label
    sequences.append([record.name, str(record.seq), label])

# Create dataframe
df = pd.DataFrame(sequences, columns=["name", "sequence", "label"])

# Display the dataframe
df.head(5)

In [None]:
from datasets import Dataset
# Convert label column to int (already done)
df['label'] = df['label'].astype(int)

# Optional: Map label to class names if needed
label2id = {label: i for i, label in enumerate(sorted(df['label'].unique()))}
id2label = {i: label for label, i in label2id.items()}

# Convert dataframe to HuggingFace dataset
dataset = Dataset.from_pandas(df)

In [None]:
type(dataset)

In [None]:
import torch.nn as nn
import torch.nn.functional as F


class CustomHeadClassifier(nn.Module):
    def __init__(self, embedding_size, hidden_size, dropout, num_classes):
        super(CustomHeadClassifier, self).__init__()
        self.fc1 = nn.Linear(embedding_size, hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x, attention_mask=None):
        # x shape: (batch_size, seq_len, embedding_size)
        if attention_mask is not None:
            # masked mean pooling
            mask = attention_mask.unsqueeze(-1)  # (batch_size, seq_len, 1)
            x = (x * mask).sum(1) / mask.sum(1)
        else:
            x = x.mean(dim=1)  # mean pooling over sequence
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
#Load tokenzier and model

from transformers import AutoTokenizer, AutoModelForSequenceClassification

model_name = "facebook/esm2_t12_35M_UR50D"  # or other ESM-2 variant

tokenizer = AutoTokenizer.from_pretrained(model_name)

model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=2,
    problem_type="single_label_classification"
)


embedding_size = model.config.hidden_size
model.classifier = CustomHeadClassifier(embedding_size, hidden_size=128, dropout=0.3, num_classes=2)
model = model.to("cuda:0")

In [None]:
# Freeze classification head
# for param in model.classifier.parameters():  
#     param.requires_grad = False

In [None]:
for name, param in model.named_parameters():
    if param.requires_grad:
        print(name)

In [None]:
# Count trainable parameters
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
all_params = sum(p.numel() for p in model.parameters())

print(f"Trainable parameters: {trainable_params:,}")
print(f"Total parameters: {all_params:,}")

In [None]:
def tokenize_function(example):
    return tokenizer(example["sequence"], truncation=True, padding="max_length", max_length=1024)

tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.train_test_split(test_size=0.2)

In [None]:
trainable = [name for name, param in model.named_parameters() if param.requires_grad]
frozen = [name for name, param in model.named_parameters() if not param.requires_grad]

print(f"Trainable layers: {len(trainable)}")
print(trainable[:5])  # print a few

print("\n")

print(f"Frozen layers: {len(frozen)}")
print(frozen[:5])  # print a few



from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./ESM2-35M-with-Finetune",
    logging_dir="./logs",
    logging_strategy="epoch",
    eval_strategy="epoch",
    save_strategy="epoch",
    num_train_epochs=10,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2, #Effective value should be 8
    learning_rate=1e-5,
    weight_decay=0.01,
    fp16=True,
    report_to="none"
)

In [None]:
from sklearn.metrics import accuracy_score

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = logits.argmax(axis=-1)
    acc = accuracy_score(labels, predictions)
    return {"accuracy": acc}

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    processing_class=tokenizer,
    compute_metrics=compute_metrics,
)

In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"

trainer.train()

In [None]:
import matplotlib.pyplot as plt

# Example data; replace these with your real log-derived lists
#epochs = [1, 2, 3, 4, 5]
# train_loss = [0.5202, 0.309, 0.2708, 0.2419, 0.2284]
# eval_loss = [0.4150600731372833, 0.3777799904346466,0.43656858801841736,0.36971646547317505,0.3870314955711365]
# eval_accuracy = [0.8383233532934131,0.8622754491017964,0.874251497005988,0.8922155688622755,0.8922155688622755]


train_loss, eval_loss, eval_accuracy, epochs = [], [], [], []

for entry in trainer.state.log_history:
    if "loss" in entry and "epoch" in entry:
        train_loss.append(entry["loss"])
    if "eval_loss" in entry:
        eval_loss.append(entry["eval_loss"])
    if "eval_accuracy" in entry:
        eval_accuracy.append(entry["eval_accuracy"])
    if "epoch" in entry:
        epochs.append(entry["epoch"])

# Make sure x-axis aligns
x = list(range(1, len(train_loss)+1))

fig, ax1 = plt.subplots(figsize=(5, 3))

# Plot loss curves on the left y-axis
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss')
ax1.plot(x, train_loss,   marker='o')
ax1.plot(x, eval_loss,   marker='s')
ax1.set_xticks(x)
ax1.tick_params(axis='y')

# Instantiate a second axes sharing the same x-axis for accuracy
ax2 = ax1.twinx()
ax2.set_ylabel('Accuracy')
ax2.plot(x, eval_accuracy, color='green', marker='^')
ax2.tick_params(axis='y')

# Combine legends
lines_1, labels_1 = ax1.get_legend_handles_labels()
lines_2, labels_2 = ax2.get_legend_handles_labels()
#ax1.legend(lines_1 + lines_2, labels_1 + labels_2, loc='upper center')
ax2.set_ylim(0.8, 0.92)
plt.title('Fine tuning ESM2 35M', fontsize=11)
#plt.grid(True)
plt.tight_layout()
plt.savefig('../Figures/With_FineTuning/Fine_tuning_ESM2_35M_without_LoRA.png', dpi=400)  # Save the figure
plt.show()

In [None]:
# Directory to save
# Save model, tokenzier, and Custom Head Classifier separately
# The standard save_pretrained method from Transformers won’t automatically save the custom head class code. 
# We need to save both the model weights and the custom head properly.

save_dir = "Saved_Models/fine_tuned_ESM2_35M_model_with_custom_head"

# Save Hugging Face model weights (excluding custom head)
model.save_pretrained(save_dir)
tokenizer.save_pretrained(save_dir)

# Save custom head weights separately
torch.save(model.classifier.state_dict(), f"{save_dir}/custom_head.pt")

## Load the Saved Model

In [None]:
#Load the Saved Model

import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

import torch.nn as nn
import torch.nn.functional as F

class CustomHeadClassifier(nn.Module):
    def __init__(self, embedding_size, hidden_size, dropout, num_classes):
        super(CustomHeadClassifier, self).__init__()
        self.fc1 = nn.Linear(embedding_size, hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x, attention_mask=None):
        if attention_mask is not None:
            mask = attention_mask.unsqueeze(-1)
            x = (x * mask).sum(1) / mask.sum(1)
        else:
            x = x.mean(dim=1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
save_dir = "Saved_Models/fine_tuned_ESM2_35M_model_with_custom_head"
tokenizer = AutoTokenizer.from_pretrained(save_dir)
model_load = AutoModelForSequenceClassification.from_pretrained(save_dir, num_labels=2)
embedding_size = model_load.config.hidden_size
model_load.classifier = CustomHeadClassifier(embedding_size, hidden_size=128, dropout=0.3, num_classes=2)

# Load the trained weights
model_load.classifier.load_state_dict(torch.load(f"{save_dir}/custom_head.pt"))

# Move to GPU if needed
model_laod = model_load.to("cuda:0")
model_load.eval()  # set to evaluation mode

In [None]:
# Check if CUDA is available
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

## Test on the Independent Set

In [None]:
from Bio import SeqIO
import pandas as pd
local_fasta_path = 'TF_Ind_Labeled_Combined.txt'

# Load FASTA file using Biopython
sequences = []
for record in SeqIO.parse(local_fasta_path, "fasta"):
    # Split the description to extract label
    description_parts = record.description.split("%")
    label = int(description_parts[-1].split("LABEL=")[1])  # Extracting the numeric part of the label
    sequences.append([record.name, str(record.seq), label])

# Create dataframe
df_test = pd.DataFrame(sequences, columns=["name", "sequence", "label"])

# Display the dataframe
df_test.head(5)

In [None]:
df_test.shape

In [None]:
from datasets import Dataset

df_test["label"] = df_test["label"].astype(int)
test_dataset = Dataset.from_pandas(df_test)

In [None]:
def tokenize_function(example):
    return tokenizer(example["sequence"], padding="max_length", truncation=True, max_length=1024)

test_dataset = test_dataset.map(tokenize_function, batched=True)

In [None]:
from transformers import Trainer

trainer = Trainer(model=model_load, tokenizer=tokenizer)

predictions = trainer.predict(test_dataset)

In [None]:
import numpy as np
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    confusion_matrix,
    roc_auc_score,
    accuracy_score,
    classification_report,
)
from sklearn.utils import resample

# --- Extract predictions ---
logits = predictions.predictions
labels = predictions.label_ids
preds = np.argmax(logits, axis=-1)
probs = F.softmax(torch.tensor(logits), dim=-1).numpy()  # shape: [N, num_classes]

# --- Confusion Matrix ---
cm = confusion_matrix(labels, preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()

# --- Define function to compute metrics ---
def compute_metrics(labels, preds, probs):
    cm = confusion_matrix(labels, preds)
    TN, FP, FN, TP = cm.ravel()

    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    specificity = TN / (TN + FP) if (TN + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    numerator = (TP * TN) - (FP * FN)
    denominator = np.sqrt((TP + FP) * (TP + FN) * (TN + FP) * (TN + FN))
    mcc = numerator / denominator if denominator > 0 else 0
    f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    acc = accuracy_score(labels, preds)
    auc = roc_auc_score(labels, probs[:, 1]) if probs.shape[1] > 1 else np.nan

    return {
        "Accuracy": acc,
        "Precision": precision,
        "Specificity": specificity,
        "Recall": recall,
        "MCC": mcc,
        "F1": f1,
        "AUC": auc,
    }

# # --- Compute metrics on full dataset ---
# metrics = compute_metrics(labels, preds, probs)
# print("=== Metrics on Full Data ===")
# for k, v in metrics.items():
#     print(f"{k}: {v:.4f}")

# print("\nClassification Report:")
# print(classification_report(labels, preds))

# --- Bootstrapping for Std Dev ---
n_bootstraps = 1000
rng = np.random.RandomState(42)
boot_metrics = {k: [] for k in metrics.keys()}

for i in range(n_bootstraps):
    indices = rng.randint(0, len(labels), len(labels))  # sample with replacement
    boot_labels = labels[indices]
    boot_preds = preds[indices]
    boot_probs = probs[indices]

    m = compute_metrics(boot_labels, boot_preds, boot_probs)
    for k in m:
        boot_metrics[k].append(m[k])

# --- Compute mean and std ---
print("\n=== Bootstrapped Metrics (Mean ± Std) ===")
for k in metrics.keys():
    mean_val = np.mean(boot_metrics[k])
    std_val = np.std(boot_metrics[k])
    print(f"{k}: {mean_val:.4f} ± {std_val:.4f}")
