<a href="https://colab.research.google.com/github/1261945528/DSEE/blob/main/DSEE_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!pip install torch datasets
!pip install -q biopython huggingface_hub
!pip install transformers[torch] accelerate -U
!pip install catboost
!pip install bayesian-optimization

Mounted at /content/drive
Collecting datasets
  Downloading datasets-2.21.0-py3-none-any.whl.metadata (21 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl.metadata (1.

# Import the necessary libraries

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from sklearn.base import BaseEstimator, ClassifierMixin
from catboost import CatBoostClassifier
from torch.utils.data import DataLoader, TensorDataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, EarlyStoppingCallback
from datasets import Dataset load_dataset, DatasetDict
from sklearn.metrics import accuracy_score, f1_score, matthews_corrcoef, log_loss, make_scorer
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from bayes_opt import BayesianOptimization

import lightgbm as lgb
import xgboost as xgb
import pandas as pd
import numpy as np
import random
import os


Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



# Data preparation

## NT

In [None]:
# Define a list of subsets
subsets = [
    "H3",
    "H3K14ac",
    "H3K36me3",
    "H3K4me1",
    "H3K4me2",
    "H3K4me3",
    "H3K79me3",
    "H3K9ac",
    "H4",
    "H4ac",
    "enhancers",
    "enhancers_types",
    "promoter_all",
    "promoter_no_tata",
    "promoter_tata",
    "splice_sites_acceptors",
    "splice_sites_all",
    "splice_sites_donors"
]

# Process each subset
for subset in subsets:

    dataset_name = "InstaDeepAI/nucleotide_transformer_downstream_tasks"

    # Split into training and test sets
    train_dataset = load_dataset(dataset_name, name=subset, split='train')
    test_dataset = load_dataset(dataset_name, name=subset, split='test')

    # Create a save subdirectory path
    save_path = 'Your File Path/data/NT'
    os.makedirs(save_path, exist_ok=True)

    # Save the training and test sets
    train_dataset.to_csv(os.path.join(save_path, f'{subset}_train.csv'), index=False)
    test_dataset.to_csv(os.path.join(save_path, f'{subset}_test.csv'), index=False)

    print(f"{subset} training and test sets have been saved to {save_path}")


## GB

In [None]:
# Define a list of subset tuples containing dataset names and API names
subsets = [
    ("Mouse Enhancers",       "katarinagresova/Genomic_Benchmarks_dummy_mouse_enhancers_ensembl"),
    ("Human Enhancers (Cohn)",   "katarinagresova/Genomic_Benchmarks_human_enhancers_cohn"),
    ("Human Enhancers (Ensembl)",  "katarinagresova/Genomic_Benchmarks_human_enhancers_ensembl"),
    ("Coding vs Intergenomic",   "katarinagresova/Genomic_Benchmarks_demo_coding_vs_intergenomic_seqs"),
    ("Human vs Worm",        "katarinagresova/Genomic_Benchmarks_demo_human_or_worm"),
    ("Human Regulatory Elements",  "katarinagresova/Genomic_Benchmarks_human_ensembl_regulatory"),
    ("Human Promoter (Non-TATA)",  "katarinagresova/Genomic_Benchmarks_human_nontata_promoters"),
    ("Human OCR (Ensembl)",     "katarinagresova/Genomic_Benchmarks_human_ocr_ensembl"),
]

# Define custom column names
column_names = ["sequence", "label"]  # Replace with the desired column names

# Process each subset
for subset, dataset_name in subsets:

    # Split into training and test sets
    train_dataset = load_dataset(dataset_name, split='train')
    test_dataset = load_dataset(dataset_name, split='test')

    # Convert to pandas DataFrame and rename columns
    train_df = pd.DataFrame(train_dataset)
    test_df = pd.DataFrame(test_dataset)

    # If you want to ensure the order and names of the columns, use the following code
    train_df.columns = column_names
    test_df.columns = column_names

    # Create save subdirectory path
    save_path = 'Your File Path/data/GB'
    os.makedirs(save_path, exist_ok=True)

    # Save the training and test sets
    train_df.to_csv(os.path.join(save_path, f'{subset}_train.csv'), index=False)
    test_df.to_csv(os.path.join(save_path, f'{subset}_test.csv'), index=False)

    print(f"{subset} training and test sets have been saved to {save_path}")


In [None]:
# Subset list
NT_subsets = [
    "H3",
    "H3K14ac",
    "H3K36me3",
    "H3K4me1",
    "H3K4me2",
    "H3K4me3",
    "H3K79me3",
    "H3K9ac",
    "H4",
    "H4ac",
    "enhancers",
    "enhancers_types",
    "promoter_all",
    "promoter_no_tata",
    "promoter_tata",
    "splice_sites_acceptors",
    "splice_sites_all",
    "splice_sites_donors"
]

GB_subsets = [
    "Mouse Enhancers",
    "Human Enhancers (Cohn)",
    "Human Enhancers (Ensembl)",
    "Coding vs Intergenomic",
    "Human vs Worm",
    "Human Regulatory Elements",
    "Human Promoter (Non-TATA)",
    "Human OCR (Ensembl)"
]

# Config parameters
num_label = 2
dataset = 'NT'  # Select dataset
subset = NT_subsets[0]

# Load and preprocess data
main_path = 'Your File Path'
train_file_name = f'data/{dataset}/{subset}_train.csv'
test_file_name = f'data/{dataset}/{subset}_test.csv'

# Define full path
train_file = os.path.join(main_path, train_file_name)
test_file = os.path.join(main_path, test_file_name)
results_file = os.path.join(main_path, "model_results(stacking).csv")

test_dataset = Dataset.from_pandas(pd.read_csv(test_file))
true_labels = test_dataset['label']

# DataFrame to save results
columns = ["model", "dataset", "subset", "loss", "epoch", "accuracy", "F1", "MCC"]
results_df = pd.DataFrame(columns=columns)

#Submodels

## DNABert2 model class

In [None]:
class DNABert2Model:
    def __init__(self, model_name="farhan-shaik/Fine-Tuned-DNABERT2-For-Epigenetic-Mark-Prediction", num_labels=2, max_length=512):
        self.model_name = model_name
        self.num_labels = num_labels
        self.max_length = max_length
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels).to(self.device)
        self.cross_val_results = []

    def preprocess_function(self, examples):
        return self.tokenizer(examples['sequence'], padding="max_length", truncation=True, max_length=self.max_length)

    def load_and_preprocess_data(self, train_file, test_file):
        # Read csv file
        train_dataset = Dataset.from_pandas(pd.read_csv(train_file))
        test_dataset = Dataset.from_pandas(pd.read_csv(test_file))

        # Preprocess the data into a format acceptable to the model
        encoded_train_dataset = train_dataset.map(self.preprocess_function, batched=True)
        encoded_test_dataset = test_dataset.map(self.preprocess_function, batched=True)

        # split validation
        train_valid_split = encoded_train_dataset.train_test_split(test_size=0.05)
        self.train_dataset = train_valid_split['train']
        self.valid_dataset = train_valid_split['test']
        self.test_dataset = encoded_test_dataset

    def train(self, output_dir, logging_dir):
        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=5,  # Increase the number of periods for early stop observation
            per_device_train_batch_size=8,
            per_device_eval_batch_size=16,
            learning_rate=3e-5,
            warmup_steps=500,
            weight_decay=0.01,
            logging_dir=logging_dir,
            logging_steps=100,
            eval_strategy="epoch",  # Evaluate at the end of each epoch
            save_strategy="epoch",  # Save the model at the end of each epoch
            save_total_limit=2,
            load_best_model_at_end=True,
            metric_for_best_model="eval_loss"  # Use evaluation loss as a metric to select the best model
        )
        self.trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=self.train_dataset,
            eval_dataset=self.valid_dataset,
            tokenizer=self.tokenizer,
            compute_metrics=self.compute_metrics,
            callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
        )
        self.trainer.train()

    def compute_metrics(self, p):
        preds = p.predictions.argmax(-1)
        accuracy = accuracy_score(p.label_ids, preds)
        f1 = f1_score(p.label_ids, preds, average='weighted')
        mcc = matthews_corrcoef(p.label_ids, preds)
        return {"accuracy": accuracy, "f1": f1, "mcc": mcc}

    def evaluate(self):
        self.trainer.model.eval()
        results = self.trainer.evaluate(eval_dataset=self.test_dataset)
        results["epoch"] = int(results["epoch"])  # Convert epochs to integers
        print("Evaluation Results:")
        for key, value in results.items():
            print(f"{key}: {value:.4f}")
        return results

    def get_logits(self):
        self.model.eval()
        logits = []
        for text in self.test_dataset['sequence']:
            inputs = self.tokenizer(text, return_tensors="pt", padding="max_length", truncation=True, max_length=self.max_length)
            inputs = {k: v.to(self.device) for k, v in inputs.items()}  # Move the input tensor to the same device
            with torch.no_grad():
                outputs = self.model(**inputs)
                logits.append(outputs.logits.squeeze().tolist())
        return logits

##  Nucleotide Transformer model class

In [None]:
class NTModel:
    def __init__(self, model_name="InstaDeepAI/nucleotide-transformer-v2-500m-multi-species", num_labels=2, max_length=64):
        self.model_name = model_name
        self.num_labels = num_labels
        self.max_length = max_length
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels, trust_remote_code=True).to(self.device)


    def preprocess_function(self, examples):
        return self.tokenizer(examples['sequence'], padding="max_length", truncation=True, max_length=self.max_length)

    def load_and_preprocess_data(self, train_file, test_file):
        # Read csv file
        train_dataset = Dataset.from_pandas(pd.read_csv(train_file))
        test_dataset = Dataset.from_pandas(pd.read_csv(test_file))

        # Preprocess the data into a format acceptable to the model
        encoded_train_dataset = train_dataset.map(self.preprocess_function, batched=True)
        encoded_test_dataset = test_dataset.map(self.preprocess_function, batched=True)

        # split validation
        train_valid_split = encoded_train_dataset.train_test_split(test_size=0.05)
        self.train_dataset = train_valid_split['train']
        self.valid_dataset = train_valid_split['test']
        self.test_dataset = encoded_test_dataset

    def train(self, output_dir, logging_dir):
        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=2,
            learning_rate=1e-5,
            per_device_train_batch_size=8,
            per_device_eval_batch_size=64,
            warmup_steps=500,
            weight_decay=0.001,
            logging_steps=100,
            eval_strategy="epoch",
            save_strategy="epoch",
            load_best_model_at_end=True,
            metric_for_best_model="eval_loss", # The eval_loss score is evaluated as a metric for selecting the best model
            dataloader_drop_last=True,
        )
        self.trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=self.train_dataset,
            eval_dataset=self.valid_dataset,
            tokenizer=self.tokenizer,
            compute_metrics=self.compute_metrics,
            callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
        )
        self.trainer.train()

    def compute_metrics(self, p):
        preds = p.predictions.argmax(-1)
        accuracy = accuracy_score(p.label_ids, preds)
        f1 = f1_score(p.label_ids, preds, average='weighted')
        mcc = matthews_corrcoef(p.label_ids, preds)
        return {"accuracy": accuracy, "f1": f1, "mcc": mcc}

    def evaluate(self):
        self.trainer.model.eval()
        results = self.trainer.evaluate(eval_dataset=self.test_dataset)
        results["epoch"] = int(results["epoch"])  # Convert epochs to integers
        print("Evaluation Results:")
        for key, value in results.items():
            print(f"{key}: {value:.4f}")
        return results

    def get_logits(self):
        self.model.eval()
        logits = []
        for text in self.test_dataset['sequence']:
            inputs = self.tokenizer(text, return_tensors="pt", padding="max_length", truncation=True, max_length=self.max_length)
            inputs = {k: v.to(self.device) for k, v in inputs.items()}  # Move the input tensor to the same device
            with torch.no_grad():
                outputs = self.model(**inputs)
                logits.append(outputs.logits.squeeze().tolist())
        return logits

## HyenaDNA model class

In [None]:
class HyenadnaModel:
    def __init__(self, model_name="LongSafari/hyenadna-small-32k-seqlen-hf", num_labels=2, max_length=512):
        self.model_name = model_name
        self.num_labels = num_labels
        self.max_length = max_length
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True).to(self.device)


    def preprocess_function(self, examples):
        return self.tokenizer(examples['sequence'], padding="max_length", truncation=True, max_length=self.max_length)

    def load_and_preprocess_data(self, train_file, test_file):
        # Read csv file
        train_dataset = Dataset.from_pandas(pd.read_csv(train_file))
        test_dataset = Dataset.from_pandas(pd.read_csv(test_file))

        # Preprocess the data into a format acceptable to the model
        encoded_train_dataset = train_dataset.map(self.preprocess_function, batched=True)
        encoded_test_dataset = test_dataset.map(self.preprocess_function, batched=True)

        # split validation
        train_valid_split = encoded_train_dataset.train_test_split(test_size=0.05)
        self.train_dataset = train_valid_split['train']
        self.valid_dataset = train_valid_split['test']
        self.test_dataset = encoded_test_dataset

    def train(self, output_dir, logging_dir):
        training_args = TrainingArguments(
            output_dir=output_dir,
            save_total_limit=2,
            num_train_epochs=100,
            learning_rate=2e-5,
            per_device_train_batch_size=128,
            per_device_eval_batch_size=512,
            gradient_accumulation_steps=4,
            logging_steps=100,
            eval_strategy="epoch",  # Evaluate at the end of each epoch
            save_strategy="epoch",  # Save the model at the end of each epoch
            load_best_model_at_end=True,
            save_safetensors=False,
            metric_for_best_model="eval_loss"
        )

        self.trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=self.train_dataset,
            eval_dataset=self.valid_dataset,
            tokenizer=self.tokenizer,
            compute_metrics=self.compute_metrics,
            callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]  # Stop the callback early
        )
        self.trainer.train()

    def compute_metrics(self, p):
        preds = p.predictions.argmax(-1)
        accuracy = accuracy_score(p.label_ids, preds)
        f1 = f1_score(p.label_ids, preds, average='weighted')
        mcc = matthews_corrcoef(p.label_ids, preds)
        return {"accuracy": accuracy, "f1": f1, "mcc": mcc}

    def evaluate(self):
        self.trainer.model.eval()
        results = self.trainer.evaluate(eval_dataset=self.test_dataset)
        results["epoch"] = int(results["epoch"])  # Convert epochs to integers
        print("Evaluation Results:")
        for key, value in results.items():
            print(f"{key}: {value:.4f}")
        return results

    def get_logits(self):
        self.model.eval()
        logits = []
        for text in self.test_dataset['sequence']:
            inputs = self.tokenizer(text, return_tensors="pt", padding="max_length", truncation=True, max_length=self.max_length)
            inputs = {k: v.to(self.device) for k, v in inputs.items()}  # Move the input tensor to the same device
            with torch.no_grad():
                outputs = self.model(**inputs)
                logits.append(outputs.logits.squeeze().tolist())
        return logits



## usage

In [None]:
# Initialize and train the DNABERT-2 model
dnabert2_model = DNABert2Model(num_labels=num_label)
dnabert2_model.load_and_preprocess_data(train_file, test_file)
dnabert2_model.train(output_dir='./results_dnabert2', logging_dir='./logs_dnabert2')
dnabert2_logits = dnabert2_model.get_logits()
dnabert2_results = dnabert2_model.evaluate()

# Initialize and train the Nucleotide Transformer model
nt_model = NTModel(num_labels=num_label)
nt_model.load_and_preprocess_data(train_file, test_file)
nt_model.train(output_dir='./results_nt', logging_dir='./logs_nt')
nt_model_logits = nt_model.get_logits()
nt_results = nt_model.evaluate()

# Initialize and train the Hyenadna model
hyenadna_model = HyenadnaModel(num_labels=num_label)
hyenadna_model.load_and_preprocess_data(train_file, test_file)
hyenadna_model.train(output_dir='./results_hyenadna', logging_dir='./logs_hyenadna')
hyenadna_logits = hyenadna_model.get_logits()
hyenadna_results = hyenadna_model.evaluate()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/267 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/168k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.13k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/468M [00:00<?, ?B/s]

Some weights of the model checkpoint at farhan-shaik/Fine-Tuned-DNABERT2-For-Epigenetic-Mark-Prediction were not used when initializing BertForSequenceClassification: ['bert.encoder.layer.0.attention.self.Wqkv.bias', 'bert.encoder.layer.0.attention.self.Wqkv.weight', 'bert.encoder.layer.0.mlp.gated_layers.weight', 'bert.encoder.layer.0.mlp.layernorm.bias', 'bert.encoder.layer.0.mlp.layernorm.weight', 'bert.encoder.layer.0.mlp.wo.bias', 'bert.encoder.layer.0.mlp.wo.weight', 'bert.encoder.layer.1.attention.self.Wqkv.bias', 'bert.encoder.layer.1.attention.self.Wqkv.weight', 'bert.encoder.layer.1.mlp.gated_layers.weight', 'bert.encoder.layer.1.mlp.layernorm.bias', 'bert.encoder.layer.1.mlp.layernorm.weight', 'bert.encoder.layer.1.mlp.wo.bias', 'bert.encoder.layer.1.mlp.wo.weight', 'bert.encoder.layer.10.attention.self.Wqkv.bias', 'bert.encoder.layer.10.attention.self.Wqkv.weight', 'bert.encoder.layer.10.mlp.gated_layers.weight', 'bert.encoder.layer.10.mlp.layernorm.bias', 'bert.encoder.lay

Map:   0%|          | 0/968 [00:00<?, ? examples/s]

Map:   0%|          | 0/242 [00:00<?, ? examples/s]

Epoch,Training Loss,Validation Loss,Accuracy,F1,Mcc
1,0.7704,0.662802,0.571429,0.543554,0.327789
2,0.6534,0.752145,0.530612,0.487774,0.269322
3,0.7281,0.573576,0.714286,0.712857,0.519533
4,0.6455,0.448772,0.857143,0.85861,0.71321
5,0.5585,0.587471,0.734694,0.734694,0.547368


Evaluation Results:
eval_loss: 0.8619
eval_accuracy: 0.7603
eval_f1: 0.7561
eval_mcc: 0.5399
eval_runtime: 3.6412
eval_samples_per_second: 66.4610
eval_steps_per_second: 4.3940
epoch: 5.0000


tokenizer_config.json:   0%|          | 0.00/129 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/28.7k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/101 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.06k [00:00<?, ?B/s]

esm_config.py:   0%|          | 0.00/14.9k [00:00<?, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/InstaDeepAI/nucleotide-transformer-v2-500m-multi-species:
- esm_config.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


modeling_esm.py:   0%|          | 0.00/58.2k [00:00<?, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/InstaDeepAI/nucleotide-transformer-v2-500m-multi-species:
- modeling_esm.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


model.safetensors:   0%|          | 0.00/1.99G [00:00<?, ?B/s]

Some weights of the model checkpoint at InstaDeepAI/nucleotide-transformer-v2-500m-multi-species were not used when initializing EsmForSequenceClassification: ['lm_head.bias', 'lm_head.decoder.weight', 'lm_head.dense.bias', 'lm_head.dense.weight', 'lm_head.layer_norm.bias', 'lm_head.layer_norm.weight']
- This IS expected if you are initializing EsmForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing EsmForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of EsmForSequenceClassification were not initialized from the model checkpoint at InstaDeepAI/nucleotide-transformer-v2-500m-multi-species and are newly initialized: ['classifier.dense.bias', 

Map:   0%|          | 0/968 [00:00<?, ? examples/s]

Map:   0%|          | 0/242 [00:00<?, ? examples/s]

Epoch,Training Loss,Validation Loss


AttributeError: 'NoneType' object has no attribute 'get'

In [None]:
logits_list=[dnabert2_logits, nt_model_logits, hyenadna_logits]

# Base Learners

In [None]:
# 准备数据
X = np.hstack(logits_list)
scaler = StandardScaler()
X = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, true_labels, test_size=0.2, random_state=42)

## Logistic Regression

In [None]:
# Logistic Regression
param_grid_lr = {'C': [0.01, 0.1, 1, 10, 100]}
scorer = make_scorer(f1_score, average='weighted')
grid_search_lr = GridSearchCV(LogisticRegression(max_iter=100), param_grid_lr, scoring=scorer, cv=5)
grid_search_lr.fit(X_train, y_train)
best_lr = grid_search_lr.best_estimator_
logits_lr = best_lr.predict_proba(X_test)

## SVM

In [None]:
# SVM
param_grid_svm = {'C': [0.01, 0.1, 1, 10, 100], 'gamma': [0.001, 0.01, 0.1, 1]}
grid_search_svm = GridSearchCV(SVC(probability=True), param_grid_svm, scoring=scorer, cv=5)
grid_search_svm.fit(X_train, y_train)
best_svm = grid_search_svm.best_estimator_
logits_svm = best_svm.predict_proba(X_test)

## Random Forest

In [None]:
# Random Forest
def rf_black_box_function(n_estimators, max_depth):
    model = RandomForestClassifier(n_estimators=int(n_estimators), max_depth=int(max_depth), random_state=42)
    model.fit(X_train, y_train)
    val_predictions = model.predict(X_test)
    return f1_score(y_test, val_predictions, average='weighted')

optimizer = BayesianOptimization(
    f=rf_black_box_function,
    pbounds={'n_estimators': (50, 500), 'max_depth': (5, 50)},
    random_state=42,
    verbose=2
)

optimizer.maximize(init_points=10, n_iter=50)

best_params = optimizer.max['params']
best_rf = RandomForestClassifier(n_estimators=int(best_params['n_estimators']), max_depth=int(best_params['max_depth']), random_state=42)
best_rf.fit(X_train, y_train)
logits_rf = best_rf.predict_proba(X_test)

## XGBoost

In [None]:
# XGBoost
param_grid_xgb = {
    'max_depth': [3, 6, 9],
    'learning_rate': [0.01, 0.05, 0.1, 0.15, 0.2],
    'n_estimators': [100, 200, 300]
}
xgb_model = xgb.XGBClassifier(objective='binary:logistic', eval_metric='logloss')
grid_search_xgb = GridSearchCV(xgb_model, param_grid_xgb, scoring='f1_weighted', cv=5)
grid_search_xgb.fit(X_train, y_train)
best_xgb = grid_search_xgb.best_estimator_
logits_xgb = best_xgb.predict_proba(X_test)

## CatBoost

In [None]:
# CatBoost
param_grid_cat = {
    'depth': [4, 6, 8, 10],
    'learning_rate': [0.01, 0.05, 0.1, 0.15],
    'iterations': [100, 200, 300]
}
cat_model = CatBoostClassifier(loss_function='Logloss', eval_metric='Accuracy', verbose=0)
grid_search_cat = GridSearchCV(cat_model, param_grid_cat, scoring='f1_weighted', cv=5)
grid_search_cat.fit(X_train, y_train)
best_cat = grid_search_cat.best_estimator_
logits_cat = best_cat.predict_proba(X_test)

##KNN

In [None]:
# KNN
param_grid_knn = {
    'n_neighbors': [3, 5, 7, 9, 11],
    'weights': ['uniform', 'distance'],
    'p': [1, 2]  # p=1 for Manhattan distance, p=2 for Euclidean distance
}
knn_model = KNeighborsClassifier()
grid_search_knn = GridSearchCV(knn_model, param_grid_knn, scoring='f1_weighted', cv=5)
grid_search_knn.fit(X_train, y_train)
best_knn = grid_search_knn.best_estimator_
logits_knn = best_knn.predict_proba(X_test)

## Evaluate

In [None]:
# 评估每个基模型
def evaluate_model(model, X_test, y_test):
    test_predictions = model.predict(X_test)
    test_accuracy = accuracy_score(y_test, test_predictions)
    test_f1 = f1_score(y_test, test_predictions, average='weighted')
    test_mcc = matthews_corrcoef(y_test, test_predictions)
    return test_accuracy, test_f1, test_mcc

lr_accuracy, lr_f1, lr_mcc = evaluate_model(best_lr, X_test, y_test)
svm_accuracy, svm_f1, svm_mcc = evaluate_model(best_svm, X_test, y_test)
rf_accuracy, rf_f1, rf_mcc = evaluate_model(best_rf, X_test, y_test)
xgb_accuracy, xgb_f1, xgb_mcc = evaluate_model(best_xgb, X_test, y_test)
cat_accuracy, cat_f1, cat_mcc = evaluate_model(best_cat, X_test, y_test)
knn_accuracy, knn_f1, knn_mcc = evaluate_model(best_knn, X_test, y_test)

print(f"Logistic Regression Test Accuracy: {lr_accuracy:.4f}")
print(f"Logistic Regression Test F1 Score: {lr_f1:.4f}")
print(f"Logistic Regression Test MCC: {lr_mcc:.4f}\n")

print(f"SVM Test Accuracy: {svm_accuracy:.4f}")
print(f"SVM Test F1 Score: {svm_f1:.4f}")
print(f"SVM Test MCC: {svm_mcc:.4f}\n")

print(f"Random Forest Test Accuracy: {rf_accuracy:.4f}")
print(f"Random Forest Test F1 Score: {rf_f1:.4f}")
print(f"Random Forest Test MCC: {rf_mcc:.4f}\n")

print(f"XGBoost Test Accuracy: {xgb_accuracy:.4f}")
print(f"XGBoost Test F1 Score: {xgb_f1:.4f}")
print(f"XGBoost Test MCC: {xgb_mcc:.4f}\n")

print(f"CatBoost Test Accuracy: {cat_accuracy:.4f}")
print(f"CatBoost Test F1 Score: {cat_f1:.4f}")
print(f"CatBoost Test MCC: {cat_mcc:.4f}\n")


print(f"KNN Test Accuracy: {knn_accuracy:.4f}")
print(f"KNN Test F1 Score: {knn_f1:.4f}")
print(f"KNN Test MCC: {knn_mcc:.4f}\n")

In [None]:
# 获取基模型的logit
logits_lr = best_lr.predict_proba(X_test)
logits_svm = best_svm.predict_proba(X_test)
logits_rf = best_rf.predict_proba(X_test)
logits_cat = best_cat.predict_proba(X_test)
logits_xgb = best_xgb.predict_proba(X_test)
logits_knn = best_knn.predict_proba(X_test)
logits_dnn = dnn_wrapper.predict_proba(X_test)
true_labels = y_test

# 创建logits列表
bs_logits = [logits_lr, logits_svm, logits_rf, logits_cat, logits_xgb, logits_knn] # base model

# 确认logits_list维度
print(f"Logits shape: {[logits.shape for logits in bs_logits]}")

# Meta Learner



# Ensemble model class (Stacking)

In [None]:
def set_seed(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

# Set the seed at the entry point of the code
set_seed(42)

class StackingAdjustableMetaLearner(nn.Module):
    def __init__(self, input_dim, num_classes, layers, width):
        super(StackingAdjustableMetaLearner, self).__init__()
        self.layers = nn.ModuleList()
        self.layers.append(nn.Linear(input_dim, width))
        self.bns = nn.ModuleList([nn.BatchNorm1d(width) for _ in range(layers)])
        self.dropouts = nn.ModuleList([nn.Dropout(0.5) for _ in range(layers)])
        for _ in range(layers - 1):
            self.layers.append(nn.Linear(width, width))
        self.output_layer = nn.Linear(width, num_classes)

    def forward(self, x):
        for layer, bn, dropout in zip(self.layers, self.bns, self.dropouts):
            x = F.relu(bn(layer(x)))
            x = dropout(x)
        x = self.output_layer(x)
        return x

def Stacking_evaluate_network(network, X_val, y_val):
    network.eval()
    with torch.no_grad():
        val_outputs = network(X_val)
        val_loss = nn.CrossEntropyLoss()(val_outputs, y_val).item()
    return val_loss

def Stacking_mutate_network(network, input_dim, num_classes, mutation_rate):
    layers = len(network.layers) + (1 if random.random() < mutation_rate else 0)
    width = network.layers[0].out_features + (32 if random.random() < mutation_rate else 0)
    return StackingAdjustableMetaLearner(input_dim, num_classes, layers, width)

class StackingEnsembleModel:
    def __init__(self, logits_list, true_labels, num_labels=2, max_epochs=1000, learning_rate=0.001, early_stopping_patience=10, population_size=20, mutation_rate=0.1, generations=100, seed=42):
        set_seed(seed)  # Ensure to set the random seed for reproducibility
        self.logits_list = logits_list
        self.true_labels = true_labels
        self.num_labels = num_labels
        self.max_epochs = max_epochs
        self.learning_rate = learning_rate
        self.early_stopping_patience = early_stopping_patience
        self.population_size = population_size
        self.mutation_rate = mutation_rate
        self.generations = generations
        self.seed = seed  # Save the seed
        self.train_losses = []
        self.val_losses = []
        self.test_losses = []

        # Calculate input_dim
        input_dim = len(logits_list) * logits_list[0].shape[1]

        # Initialize population
        self.population = [StackingAdjustableMetaLearner(input_dim, num_labels, random.randint(1, 5), random.randint(64, 512)) for _ in range(population_size)]

    def prepare_data(self):
        X = np.hstack(self.logits_list)
        y = self.true_labels
        scaler = StandardScaler()
        X = scaler.fit_transform(X)
        # Use the same seed to ensure reproducibility of data splitting
        X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=self.seed)
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=self.seed)
        return X_train, X_val, X_test, y_train, y_val, y_test


    def train(self):
        X_train, X_val, X_test, y_train, y_val, y_test = self.prepare_data()
        X_train = torch.tensor(X_train, dtype=torch.float32)
        y_train = torch.tensor(y_train, dtype=torch.long)
        X_val = torch.tensor(X_val, dtype=torch.float32)
        y_val = torch.tensor(y_val, dtype=torch.long)
        X_test = torch.tensor(X_test, dtype=torch.float32)
        y_test = torch.tensor(y_test, dtype=torch.long)

        best_network = self.evolve_networks(X_val, y_val)
        self.meta_learner = best_network

        self.optimizer = optim.Adam(self.meta_learner.parameters(), lr=self.learning_rate)
        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, mode='min', factor=0.1, patience=2, verbose=True)
        self.criterion = nn.CrossEntropyLoss()
        best_val_loss = float('inf')
        epochs_no_improve = 0

        for epoch in range(self.max_epochs):
            self.meta_learner.train()
            self.optimizer.zero_grad()
            outputs = self.meta_learner(X_train)
            loss = self.criterion(outputs, y_train)
            loss.backward()
            self.optimizer.step()

            train_loss = loss.item()
            self.train_losses.append(train_loss)

            # Validation loss
            self.meta_learner.eval()
            with torch.no_grad():
                val_outputs = self.meta_learner(X_val)
                val_loss = self.criterion(val_outputs, y_val).item()
                self.val_losses.append(val_loss)

            print(f"Epoch {epoch+1}/{self.max_epochs} - Training loss: {train_loss:.4f} - Validation loss: {val_loss:.4f}")

            # Update learning rate scheduler
            self.scheduler.step(val_loss)

            # Early stopping
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                epochs_no_improve = 0
            else:
                epochs_no_improve += 1
                if epochs_no_improve >= self.early_stopping_patience:
                    print(f"Early stopping at epoch {epoch+1}")
                    break

        # Test loss
        self.meta_learner.eval()
        with torch.no_grad():
            test_outputs = self.meta_learner(X_test)
            test_loss = self.criterion(test_outputs, y_test).item()
            self.test_losses.append(test_loss)
            print(f"Test loss: {test_loss:.4f}")

    def evolve_networks(self, X_val, y_val, patience=10):
        best_val_loss = float('inf')
        generations_no_improve = 0

        for generation in range(self.generations):
            print(f"Generation {generation + 1}/{self.generations}")
            scores = [Stacking_evaluate_network(net, X_val, y_val) for net in self.population]
            sorted_population = [net for _, net in sorted(zip(scores, self.population), key=lambda pair: pair[0])]
            self.population = sorted_population[:len(self.population)//2]

            new_population = []
            while len(new_population) < len(sorted_population):
                parent = random.choice(self.population)
                child = Stacking_mutate_network(parent, X_val.shape[1], self.num_labels, self.mutation_rate)
                new_population.append(child)

            self.population = new_population

            current_best_val_loss = min(scores)
            if current_best_val_loss < best_val_loss:
                best_val_loss = current_best_val_loss
                generations_no_improve = 0
            else:
                generations_no_improve += 1
                if generations_no_improve >= patience:
                    print(f"Early stopping at generation {generation + 1}")
                    break

        best_network = sorted_population[0]
        self.optimizer = optim.Adam(best_network.parameters(), lr=self.learning_rate, weight_decay=0.01)
        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, mode='min', factor=0.1, patience=2, verbose=True)
        self.criterion = nn.CrossEntropyLoss()

        return best_network

    def evaluate(self):
        X_train, X_val, X_test, y_train, y_val, y_test = self.prepare_data()
        X_test = torch.tensor(X_test, dtype=torch.float32)
        y_test = torch.tensor(y_test, dtype=torch.long)

        self.meta_learner.eval()
        with torch.no_grad():
            outputs = self.meta_learner(X_test)
            _, y_pred = torch.max(outputs, 1)

        accuracy = accuracy_score(y_test.numpy(), y_pred.numpy())
        f1 = f1_score(y_test.numpy(), y_pred.numpy(), average='weighted')
        mcc = matthews_corrcoef(y_test.numpy(), y_pred.numpy())
        print(f"Ensemble Model Accuracy: {accuracy:.4f}")
        print(f"Ensemble Model F1 Score: {f1:.4f}")
        print(f"Ensemble Model MCC: {mcc:.4f}")
        return {
            "accuracy": accuracy,
            "f1": f1,
            "mcc": mcc,
            "epoch": len(self.train_losses),
            "train_losses": self.train_losses,
            "val_losses": self.val_losses,
            "test_losses": self.test_losses
        }

In [None]:
# Initialize and train the Stacking ensembleModel
stacking_ensemble_model = StackingEnsembleModel(bs_logits, true_labels, num_labels=2, max_epochs=1000, learning_rate=0.001, early_stopping_patience=10)
stacking_ensemble_model.train()

# Evaluate the model
stacking_ensemble_results = stacking_ensemble_model.evaluate()

# Save result

In [None]:
# DataFrame to save results
columns = ["model", "dataset", "subset", "loss", "epoch", "accuracy", "F1", "MCC"]
results_df = pd.DataFrame(columns=columns)

def save_results_to_csv(results_df, results_file):
    # Check if the file exists
    if not os.path.isfile(results_file):
        # File does not exist, create file and write data including header
        results_df.to_csv(results_file, index=False)
    else:
        # File exists, append data without writing the header
        results_df.to_csv(results_file, mode='a', header=False, index=False)

# Save DNABERT-2 model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "DNABERT-2",
    "dataset": dataset,
    "subset": subset,
    "loss": dnabert2_results['eval_loss'],
    "epoch": dnabert2_results['epoch'],
    "accuracy": dnabert2_results['eval_accuracy'],
    "F1": dnabert2_results['eval_f1'],
    "MCC": dnabert2_results['eval_mcc']
}])])
save_results_to_csv(results_df, results_file)

# Save Nucleotide Transformer model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "Nucleotide Transformer",
    "dataset": dataset,
    "subset": subset,
    "loss": nt_results['eval_loss'],
    "epoch": nt_results['epoch'],
    "accuracy": nt_results['eval_accuracy'],
    "F1": nt_results['eval_f1'],
    "MCC": nt_results['eval_mcc']
}])])
save_results_to_csv(results_df, results_file)

# Save Hyenadna model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "Hyenadna",
    "dataset": dataset,
    "subset": subset,
    "loss": hyenadna_results['eval_loss'],
    "epoch": hyenadna_results['epoch'],
    "accuracy": hyenadna_results['eval_accuracy'],
    "F1": hyenadna_results['eval_f1'],
    "MCC": hyenadna_results['eval_mcc']
}])])
save_results_to_csv(results_df, results_file)

# Save Logistic Regression model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "LogisticRegression",
    "dataset": dataset,
    "subset": subset,
    "accuracy": lr_accuracy,
    "F1": lr_f1,
    "MCC": lr_mcc
}])])
save_results_to_csv(results_df, results_file)

# Save Random Forest model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "RandomForest",
    "dataset": dataset,
    "subset": subset,
    "accuracy": rf_accuracy,
    "F1": rf_f1,
    "MCC": rf_mcc
}])])
save_results_to_csv(results_df, results_file)

# Save XGBoost model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "XGBoost",
    "dataset": dataset,
    "subset": subset,
    "accuracy": xgb_accuracy,
    "F1": xgb_f1,
    "MCC": xgb_mcc
}])])
save_results_to_csv(results_df, results_file)

# Save CatBoost model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "CatBoost",
    "dataset": dataset,
    "subset": subset,
    "accuracy": cat_accuracy,
    "F1": cat_f1,
    "MCC": cat_mcc
}])])
save_results_to_csv(results_df, results_file)

# Save SVM model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "SVM",
    "dataset": dataset,
    "subset": subset,
    "accuracy": svm_accuracy,
    "F1": svm_f1,
    "MCC": svm_mcc
}])])
save_results_to_csv(results_df, results_file)

# Save KNN model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "KNN",
    "dataset": dataset,
    "subset": subset,
    "accuracy": knn_accuracy,
    "F1": knn_f1,
    "MCC": knn_mcc
}])])
save_results_to_csv(results_df, results_file)

# Save DNN model results to DataFrame
results_df = pd.DataFrame(columns=columns)
results_df = pd.concat([results_df, pd.DataFrame([{
    "model": "Stacking",
    "dataset": dataset,
    "subset": subset,
    "loss": stacking_ensemble_results['test_losses'],
    "epoch": stacking_ensemble_results['epoch'],
    "accuracy": stacking_ensemble_results['accuracy'],
    "F1": stacking_ensemble_results['f1'],
    "MCC": stacking_ensemble_results['mcc']
}])])
save_results_to_csv(results_df, results_file)

print("Saved successfully")
