In [3]:
import torch
import itertools
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler, TensorDataset

In [5]:
from google.colab import files

# Automatically use GPU if available, otherwise fall back to CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [9]:
def compute_sample_weights(df, label_col='label', taxa_col='taxa', meiosis_taxa=None):
    """
    Compute sample weights based on label and optionally taxa for meiosis proteins.

    Parameters:
        df (pd.DataFrame): DataFrame containing the data.
        label_col (str): Name of the label column ('Meiosis' vs. 'Non-meiosis').
        taxa_col (str): Name of the column with taxonomic group info.
        meiosis_taxa (list or set): Taxa considered for upweighting (e.g., fungi, plants).

    Returns:
        pd.Series: Sample weights aligned with df rows.
    """
    if meiosis_taxa is None:
        meiosis_taxa = {'chordates', 'arthropods', 'fungi', 'plants', 'other animals'}

    weights = []
    n_non_meiosis = len(df[df[label_col] == 0])
    n_meiosis = len(df[df[label_col] == 1])
    global_weight = n_non_meiosis / n_meiosis  # global class weight

    for _, row in df.iterrows():
        if row[label_col] == 1 and row[taxa_col] in meiosis_taxa:
            weights.append(global_weight)
        else:
            weights.append(1.0)

    return pd.Series(weights, index=df.index)


In [16]:
# Dataset builder
class ProteinDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# MLP model
class MLPClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, dropout=0.3):
        super(MLPClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_dim, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x)

In [41]:
# MLP model
class MLPClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, num_layers=1, dropout=0.3, activation='relu'):
        super(MLPClassifier, self).__init__()
        self.activation_fn = getattr(F, activation)
        layers = []
        layers.append(nn.Linear(input_dim, hidden_dim))
        layers.append(nn.Dropout(dropout))
        for _ in range(num_layers - 1):
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(nn.Dropout(dropout))
        self.hidden = nn.Sequential(*layers)
        self.output = nn.Linear(hidden_dim, 2)

    def forward(self, x):
        for layer in self.hidden:
            if isinstance(layer, nn.Linear):
                x = self.activation_fn(layer(x))
            else:
                x = layer(x)
        return self.output(x)

In [42]:
# Introduce class of early stopping
class EarlyStopping:
    def __init__(self, patience=5, delta=0, mode='max'):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.delta = delta
        self.mode = mode
        self.best_model_state = None

    def __call__(self, score, model):
        if self.best_score is None:
            self.best_score = score
            self.best_model_state = model.state_dict()
            return
        if (self.mode == 'max' and score > self.best_score + self.delta) or \
           (self.mode == 'min' and score < self.best_score - self.delta):
            self.best_score = score
            self.best_model_state = model.state_dict()
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

In [53]:
def tune_mlp_hyperparameters(X, y, sample_weights, param_grid=None, n_splits=10, epochs=20):

    """
    Perform grid search to tune hyperparameters for a multi-layer perceptron (MLP) classifier using cross-validation.

    This function explores combinations of hyperparameters for an MLP model on the input dataset
    using stratified k-fold cross-validation. It evaluates each model using the F1 score and applies early
    stopping to avoid overfitting. The model with the highest average F1 score across folds is returned.

    Parameters:
    ----------
    X : np.ndarray
        Feature matrix of shape (n_samples, n_features).

    y : np.ndarray
        Target labels of shape (n_samples,).

    sample_weights : np.ndarray
        Sample weights for handling class imbalance during training. Used in the WeightedRandomSampler.

    param_grid : dict, optional
        Dictionary specifying the hyperparameter search space. Keys are parameter names and values are lists
        of possible values. If None, a default grid is used.

    n_splits : int, default=10
        Number of cross-validation splits (StratifiedKFold).

    epochs : int, default=20
        Maximum number of training epochs per fold. Early stopping may halt training sooner.

    Returns:
    -------
    best_model : torch.nn.Module
        The MLP model trained with the best hyperparameter combination.

    best_params : dict
        Dictionary of the best-performing hyperparameter settings.

    Notes:
    -----
    - Evaluation is based on the average F1 score over all folds.
    - Early stopping is triggered if the F1 score does not improve over 'patience' epochs.
    - The function assumes the use of GPU if `device` is set to 'cuda'.
    - The model architecture and training behavior are controlled via the param_grid.
    """

    if param_grid is None:
        param_grid = {
            'hidden_dim': [64, 128],
            'dropout': [0.2, 0.3],
            'lr': [1e-3, 5e-4],
            'batch_size': [32],
            'num_layers': [1, 2],
            'activation': ['relu'],
            'weight_decay': [0, 1e-4],
            'optimizer': ['adamW']
    }

    keys, values = zip(*param_grid.items())
    grid = [dict(zip(keys, v)) for v in itertools.product(*values)]

    best_f1 = -1
    best_params = None
    best_model = None

    for params in grid:
        print(f"\n🔍 Trying: {params}")
        f1s = []
        skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

        for train_idx, val_idx in skf.split(X, y):
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]
            weights = sample_weights[train_idx]

            train_dataset = ProteinDataset(X_train, y_train)
            val_dataset = ProteinDataset(X_val, y_val)

            sampler = WeightedRandomSampler(weights, len(weights), replacement=True)
            train_loader = DataLoader(train_dataset, batch_size=params['batch_size'], sampler=sampler)
            val_loader = DataLoader(val_dataset, batch_size=params['batch_size'], shuffle=False)

            model = MLPClassifier(
                input_dim=X.shape[1],
                hidden_dim=params['hidden_dim'],
                dropout=params['dropout'],
                num_layers=params['num_layers'],
                activation=params['activation']
            ).to(device)

            if params['optimizer'] == 'adam':
                optimizer = torch.optim.Adam(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay'])
            elif params['optimizer'] == 'sgd':
                optimizer = torch.optim.SGD(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay'])
            elif params['optimizer'] == 'adamW':
                optimizer = torch.optim.AdamW(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay'])
            else:
                raise ValueError(f"Unsupported optimizer: {params['optimizer']}")

            criterion = torch.nn.CrossEntropyLoss()
            early_stopper = EarlyStopping(patience=5, mode='max')

            for epoch in range(epochs):
                model.train()
                for xb, yb in train_loader:
                    xb, yb = xb.to(device), yb.to(device)
                    optimizer.zero_grad()
                    loss = criterion(model(xb), yb)
                    loss.backward()
                    optimizer.step()

                model.eval()
                y_pred, y_true = [], []
                with torch.no_grad():
                    for xb, yb in val_loader:
                        xb = xb.to(device)
                        logits = model(xb)
                        pred = torch.argmax(logits, dim=1).cpu().numpy()
                        y_pred.extend(pred)
                        y_true.extend(yb.numpy())

                f1 = f1_score(y_true, y_pred)
                early_stopper(f1, model)

                if early_stopper.early_stop:
                    print(f"⏹️ Early stopping at epoch {epoch + 1}")
                    break

            model.load_state_dict(early_stopper.best_model_state)

            model.eval()
            y_pred, y_true = [], []
            with torch.no_grad():
                for xb, yb in val_loader:
                    xb = xb.to(device)
                    logits = model(xb)
                    pred = torch.argmax(logits, dim=1).cpu().numpy()
                    y_pred.extend(pred)
                    y_true.extend(yb.numpy())

            f1 = f1_score(y_true, y_pred)
            f1s.append(f1)

        avg_f1 = np.mean(f1s)
        print(f"✅ Avg F1: {avg_f1:.4f}")

        if avg_f1 > best_f1:
            best_f1 = avg_f1
            best_params = params.copy()
            best_model = model

    print(f"\n🎯 Best params: {best_params}, F1: {best_f1:.4f}")
    return best_model, best_params


In [7]:
uploaded=files.upload()

Saving mrmr_selected_train_dmc1_esm_50.csv to mrmr_selected_train_dmc1_esm_50.csv


In [10]:
df = pd.read_csv("mrmr_selected_train_dmc1_esm_50.csv")
weights=compute_sample_weights(df)

In [12]:
# ---- Prepare data ----
# Assume df is already loaded and sample_weights computed
id_col = df.columns[0]
feature_cols = df.columns[1:-2]
label_col = df.columns[-1]
taxa_col = df.columns[-2]

X_train = df[feature_cols].values
y_train = df[label_col].values
sample_weights = compute_sample_weights(df, label_col=label_col, taxa_col=taxa_col).values

In [None]:
# ---- Train model with CV ----
model, best_params = tune_mlp_hyperparameters(X_train, y_train, sample_weights)

In [14]:
# Best parameters from the trained model recorded
best_params = {
    'hidden_dim': 128,
    'dropout': 0.3,
    'lr': 0.001,
    'batch_size': 32,
    'num_layers': 2,
    'activation': 'relu',
    'weight_decay': 0,
    'optimizer': 'adamW'
}

In [21]:
# --- 2. Prepare Dataset ---
class ProteinDataset(Dataset):
    def __init__(self, X, y, sample_weights):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
        self.sample_weights = torch.tensor(sample_weights, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx], self.sample_weights[idx]

In [18]:
class MLPClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, num_layers=1, dropout=0.3, activation='relu'):
        super(MLPClassifier, self).__init__()

        # Activation function
        activations = {
            'relu': nn.ReLU(),
            'tanh': nn.Tanh(),
            'sigmoid': nn.Sigmoid(),
            'leaky_relu': nn.LeakyReLU(),
            'elu': nn.ELU()
        }
        self.activation = activations.get(activation.lower(), nn.ReLU())

        # Input layer
        layers = [nn.Linear(input_dim, hidden_dim), self.activation, nn.Dropout(dropout)]

        # Hidden layers
        for _ in range(num_layers - 1):
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(self.activation)
            layers.append(nn.Dropout(dropout))

        # Output layer (2 for binary classification logits)
        layers.append(nn.Linear(hidden_dim, 2))

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

In [27]:
# Final model with best hyperparameters built
best_model = MLPClassifier(
    input_dim=X_train.shape[1],
    hidden_dim=best_params['hidden_dim'],
    dropout=best_params['dropout'],
    num_layers=best_params['num_layers'],
    activation=best_params['activation']
).to(device)

# Optimizer
optimizer = torch.optim.AdamW(
    best_model.parameters(),
    lr=best_params['lr'],
    weight_decay=best_params['weight_decay']
)

# --- 6. Prepare data ---
df_train = pd.read_csv("mrmr_selected_train_dmc1_esm_50.csv")
sample_weights = compute_sample_weights(df_train)  # Pass your training dataframe here
train_dataset = ProteinDataset(X_train, y_train, sample_weights)
train_loader = DataLoader(train_dataset, batch_size=best_params['batch_size'], shuffle=True)

# --- 7. Train with weighted loss ---
num_epochs = 50
for epoch in range(num_epochs):
    best_model.train()
    epoch_loss = 0.0

    for batch_x, batch_y, batch_w in train_loader:
        batch_x, batch_y, batch_w = batch_x.to(device), batch_y.to(device), batch_w.to(device)
        optimizer.zero_grad()

        logits = best_model(batch_x)

        # Compute per-sample losses
        losses = F.cross_entropy(logits, batch_y, reduction='none')
        weighted_loss = (losses * batch_w).mean()

        weighted_loss.backward()
        optimizer.step()

        epoch_loss += weighted_loss.item()

#    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

In [24]:
uploaded=files.upload()

Saving mrmr_selected_test_dmc1_esm_50.csv to mrmr_selected_test_dmc1_esm_50.csv


In [26]:
# Prepare test set
df_test = pd.read_csv("mrmr_selected_test_dmc1_esm_50.csv")
X_test = df_test.iloc[:, 1:-2].values  # features
y_test = df_test.iloc[:, -1].values   # labels
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)

# Predict
best_model.eval()
with torch.no_grad():
    logits = best_model(X_test_tensor)
    y_pred = torch.argmax(logits, dim=1).cpu().numpy()

# Evaluate
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# print("✅ Accuracy: ", accuracy_score(y_test, y_pred))
# print("✅ Precision:", precision_score(y_test, y_pred))
# print("✅ Recall:   ", recall_score(y_test, y_pred))
# print("✅ F1 Score: ", f1_score(y_test, y_pred))