In [3]:
#@title Install necessary libraries
!pip install transformers[torch]  # Install transformers with PyTorch support
!pip install biopython  # Install Biopython for sequence handling



# -- **giusto**

In [4]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import matthews_corrcoef
from torch.utils.data import DataLoader, TensorDataset
from Bio import SeqIO
from tqdm import tqdm
from transformers import EsmTokenizer, EsmForSequenceClassification
import random
import logging



In [5]:
# Load the tokenizer and model
model_name = "facebook/esm2_t33_650M_UR50D"
tokenizer = EsmTokenizer.from_pretrained(model_name)
model = EsmForSequenceClassification.from_pretrained(model_name)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of EsmForSequenceClassification were not initialized from the model checkpoint at facebook/esm2_t33_650M_UR50D and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


EsmForSequenceClassification(
  (esm): EsmModel(
    (embeddings): EsmEmbeddings(
      (word_embeddings): Embedding(33, 1280, padding_idx=1)
      (dropout): Dropout(p=0.0, inplace=False)
      (position_embeddings): Embedding(1026, 1280, padding_idx=1)
    )
    (encoder): EsmEncoder(
      (layer): ModuleList(
        (0-32): 33 x EsmLayer(
          (attention): EsmAttention(
            (self): EsmSelfAttention(
              (query): Linear(in_features=1280, out_features=1280, bias=True)
              (key): Linear(in_features=1280, out_features=1280, bias=True)
              (value): Linear(in_features=1280, out_features=1280, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
              (rotary_embeddings): RotaryEmbedding()
            )
            (output): EsmSelfOutput(
              (dense): Linear(in_features=1280, out_features=1280, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (LayerNorm): LayerNorm((1280,

In [6]:
# Load training and benchmark data
pos_train = pd.read_csv('pos_training.tsv', sep='\t', header=None, names=["Accession", "Organism", "Kingdom", "Length", "Cleavage", "Type", "Fold"])
neg_train = pd.read_csv('neg_training.tsv', sep='\t', header=None, names=["Accession", "Organism", "Kingdom", "Length", "Transmembrane", "Type", "Fold"])

pos_bench = pd.read_csv('pos_benchmark.tsv', sep='\t', header=None, names=['Accession', 'Organism', 'Kingdom', 'Length', 'Cleavage', 'Type'])
neg_bench = pd.read_csv('neg_benchmark.tsv', sep='\t', header=None, names=['Accession', 'Organism', 'Kingdom', 'Length', 'Transmembrane', 'Type'])

# Combine training and benchmark data
full_training_df = pd.concat([pos_train, neg_train], ignore_index=True)
full_benchmark_df = pd.concat([pos_bench, neg_bench], ignore_index=True)

# Create label column based on 'Type'
full_training_df['Label'] = full_training_df['Type'].apply(lambda x: 1 if x == '+' else 0)
full_benchmark_df['Label'] = full_benchmark_df['Type'].apply(lambda x: 1 if x == '+' else 0)



In [7]:
def parse_fasta(fasta_file):
    fasta_dict = {}
    with open(fasta_file, "r") as fasta:
        current_id, current_seq = "", ""

        for line in fasta:
            line = line.strip()
            if line.startswith(">"):
                if current_id:
                    fasta_dict[current_id] = current_seq
                current_id = line[1:]  # Remove the '>' character
                current_seq = ""
            else:
                current_seq += line

        if current_id:
            fasta_dict[current_id] = current_seq  # Add the last record

    return fasta_dict

In [8]:
# Parse the FASTA files for positive and negative sequences
total_pos_fasta = parse_fasta("pos_cluster-results_rep_seq.fasta")
total_neg_fasta = parse_fasta("neg_cluster-results_rep_seq.fasta")

# Map sequences from FASTA files to the DataFrames
full_training_df['Sequence'] = full_training_df['Accession'].map(total_pos_fasta).combine_first(
    full_training_df['Accession'].map(total_neg_fasta))

full_benchmark_df['Sequence'] = full_benchmark_df['Accession'].map(total_pos_fasta).combine_first(
    full_benchmark_df['Accession'].map(total_neg_fasta))

# Save updated DataFrames to new files
full_training_df.to_csv('full_training_with_sequences.tsv', sep='\t', index=False)
full_benchmark_df.to_csv('full_benchmark_with_sequences.tsv', sep='\t', index=False)

In [9]:
# Function to compute embeddings
def compute_embeddings(sequence):
  inputs = tokenizer(sequence, return_tensors="pt").to(device)
  outputs = model(**inputs, output_hidden_states=True)
  last_hidden_state = outputs.hidden_states[-1]  # Last layer
  embeddings = last_hidden_state[0, 1:91, :] # Extract embeddings from the last layer
  return embeddings



In [10]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to extract embeddings and labels
def extract_XY(df):
    M, Y = [], []
    folds = df.get('Fold', None).values if 'Fold' in df.columns else None

    total_rows = df.shape[0]
    logger.info(f"Starting to process {total_rows} records.")

    # Process sequences from the DataFrame
    for index, row in tqdm(df.iterrows(), total=total_rows, desc="Processing records"):
        sequence = row['Sequence']

        # Check if sequence is a valid string
        if isinstance(sequence, str):
            sequence = sequence[:1022]  # Ensure slicing only for string types
        else:
            logger.warning(f"Skipping row {index} due to invalid sequence type: {type(sequence)}")
            continue  # Skip this iteration if the sequence is not valid

        label = row['Label']

        # Compute embeddings
        embedding = compute_embeddings(sequence)
        mean_embedding = np.mean(embedding.detach().cpu().numpy(), axis=0)  # Average embedding

        # Append results
        M.append(mean_embedding)
        Y.append(label)

    logger.info("Processing completed successfully.")

    # Return folds only if they were present in the DataFrame
    if folds:
        return np.array(M), np.array(Y), folds
    else:
        return np.array(M), np.array(Y)

In [None]:
train_M, train_Y, train_folds = extract_XY(full_training_df)


Processing records:   0%|          | 3/7901 [01:03<51:48:26, 23.61s/it]

In [None]:
benchmark_M, benchmark_Y = extract_XY(full_benchmark_df)


In [None]:
#  Combine training embeddings into a DataFrame
training_data = pd.DataFrame(train_M)
training_data['Label'] = train_Y
training_data['Fold'] = train_folds  # Only add fold data for the training set

# Combine benchmark embeddings into a separate DataFrame
benchmark_data = pd.DataFrame(benchmark_M)
benchmark_data['Label'] = benchmark_Y  # Labels for benchmark data

# Save DataFrames if needed
training_data.to_csv('full_training_with_embeddings.csv', index=False)
benchmark_data.to_csv('full_benchmark_with_embeddings.csv', index=False)


Layer Construction:

Layers list: layers = [] creates an empty list to store each layer.
Loop through hidden_layer_sizes: For each size in hidden_layer_sizes, the following layers are added in sequence:
*   nn.Linear(in_features, size): A fully connected layer where the input has in_features neurons, and the output has size neurons.
*   nn.ReLU(): A ReLU activation function to introduce non-linearity.
*   nn.Dropout(dropout_p): A dropout layer to randomly set some neurons' output to zero, helping prevent overfitting.


After each loop iteration, in_features is updated to size, preparing it for the next hidden layer.

logits = self.model(x) passes the input x through all layers in self.model

Apply Sigmoid: torch.sigmoid(logits) applies the Sigmoid, which maps the output to values between 0 and 1 (binary class)

In [None]:
class SPMLP(nn.Module):
    def __init__(self, input_size, hidden_layer_sizes, output_size, dropout_p=0.5):
        super(SPMLP, self).__init__()
        layers = []
        in_features = input_size

        for size in hidden_layer_sizes:
            layers.append(nn.Linear(in_features, size))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_p))
            in_features = size

        layers.append(nn.Linear(in_features, output_size))  # Output layer
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        logits = self.model(x)
        return torch.sigmoid(logits)  # Use sigmoid for output

class SPDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        # Convert string labels to numerical labels for categorical classification
        unique_labels = np.unique(y)
        label_mapping = {label: i for i, label in enumerate(unique_labels)}
        labels = np.array([label_mapping[label] for label in y])
        self.y = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [None]:
class ModelArchitectureManager:
    def __init__(self, device='cpu'):
        self.best_model_state_dict = None
        self.best_score = -1
        self.best_hidden_layer_sizes = None
        self.device = device
        logging.basicConfig(level=logging.INFO)

    def _generate_random_hyperparameters(self):
        num_layers = random.randint(1, 5)
        hidden_layer_sizes = [random.randint(20, 100) for _ in range(num_layers)]
        dropout_p = random.uniform(0.1, 0.5)
        learning_rate = random.uniform(1e-5, 1e-2)
        return num_layers, hidden_layer_sizes, dropout_p, learning_rate

    def train_model(self, model, train_loader, criterion, optimizer, epochs):
        model.train()
        for epoch in range(epochs):
            for batch_X, batch_y in train_loader:
                batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
                optimizer.zero_grad()
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y.float())
                loss.backward()
                optimizer.step()

    def evaluate_model(self, model, val_loader):
        model.eval()
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
                outputs = model(batch_X)
                preds = torch.round(outputs).cpu().numpy()
                all_preds.extend(preds)
                all_labels.extend(batch_y.cpu().numpy())
        return matthews_corrcoef(all_labels, all_preds)

    def random_search(self, num_trials, input_size, output_size, train_loader, val_loader, epochs=100):
        best_mcc = -1
        best_model_state = None

        for trial in range(num_trials):
            logging.info(f"Random Search Trial {trial + 1}/{num_trials}")

            num_layers, hidden_layer_sizes, dropout_p, learning_rate = self._generate_random_hyperparameters()

            model = SPMLP(input_size, hidden_layer_sizes, output_size, dropout_p=dropout_p).to(self.device)
            criterion = nn.BCEWithLogitsLoss()
            optimizer = optim.Adam(model.parameters(), lr=learning_rate)

            self.train_model(model, train_loader, criterion, optimizer, epochs=epochs)

            mcc = self.evaluate_model(model, val_loader)
            logging.info(f"Trial {trial + 1}: Validation MCC: {mcc:.4f}")

            if mcc > best_mcc:
                best_mcc = mcc
                best_model_state = model.state_dict()
                self.best_hidden_layer_sizes = hidden_layer_sizes  # Store best architecture sizes

        self.best_model_state_dict = best_model_state
        self.best_score = best_mcc
        logging.info(f"Best MCC: {best_mcc:.4f} with hidden layers: {self.best_hidden_layer_sizes}")
        return self.best_model_state_dict, self.best_score

    def test_model(self, model, test_loader):
        model.eval()
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for batch_X, batch_y in test_loader:
                batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
                outputs = model(batch_X)
                preds = torch.round(outputs).cpu().numpy()
                all_preds.extend(preds)
                all_labels.extend(batch_y.cpu().numpy())
        return matthews_corrcoef(all_labels, all_preds)


In [None]:
def find_best_model(combined_data):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    manager = ModelArchitectureManager(device=device)

    input_size = combined_data.shape[1] - 2  # Adjusting for label and Fold columns
    output_size = 1  # Output size should be 1 for binary classification
    all_mcc_scores = []

    for fold in range(combined_data['Fold'].nunique()):
        logging.info(f"\nFold {fold + 1}")

        # Define test, validation, and training data based on the Fold column
        test_data = combined_data[combined_data['Fold'] == fold]
        val_data = combined_data[combined_data['Fold'] == (fold + 1) % combined_data['Fold'].nunique()]
        train_data = combined_data[combined_data['Fold'] != fold]
        train_data = train_data[train_data['Fold'] != (fold + 1) % combined_data['Fold'].nunique()]

        # Create DataLoaders using SPDataset
        train_dataset = SPDataset(train_data.iloc[:, :-2].values, train_data.iloc[:, -1].values)
        val_dataset = SPDataset(val_data.iloc[:, :-2].values, val_data.iloc[:, -1].values)
        test_dataset = SPDataset(test_data.iloc[:, :-2].values, test_data.iloc[:, -1].values)

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
        test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

        # Optimize hyperparameters using the validation fold
        manager.random_search(
            num_trials=10,
            input_size=input_size,
            output_size=output_size,
            train_loader=train_loader,
            val_loader=val_loader
        )

        # Create the best model with the best architecture
        best_model = SPMLP(input_size, manager.best_hidden_layer_sizes, output_size).to(device)
        best_model.load_state_dict(manager.best_model_state_dict)

        # Test the best model on the test fold
        test_score = manager.evaluate_model(best_model, test_loader)
        logging.info(f"Best architecture for Fold {fold + 1}: {manager.best_hidden_layer_sizes} with Test MCC: {test_score:.4f}")

        all_mcc_scores.append(test_score)

    # Print results summary
    logging.info(f"\nAll MCC scores: {all_mcc_scores}")
    logging.info(f"Average MCC: {np.mean(all_mcc_scores):.4f}")




In [None]:
# Find the best model
find_best_model(training_data)

# After finding the best model, test it on the benchmark data
final_model = SPMLP(input_size, manager.best_hidden_layer_sizes, output_size).to(device)
final_model.load_state_dict(manager.best_model_state_dict)

# Create DataLoader for benchmark dataset
benchmark_dataset = SPDataset(benchmark_data.iloc[:, :-2].values, benchmark_data.iloc[:, -1].values)
benchmark_loader = DataLoader(benchmark_dataset, batch_size=32, shuffle=False)

# Evaluate the best model on the benchmark data using the new test method
benchmark_score = manager.test_model(final_model, benchmark_loader)
print(f"Benchmark Model MCC: {benchmark_score:.4f}")