In [1]:
import numpy as np
import pandas as pd

from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import mean_squared_error
from scipy.stats import pearsonr

from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

from joblib import Parallel, delayed

def build_mlp_pipeline(hidden_layer_sizes=(64, 32),
                       alpha=1e-3,
                       learning_rate_init=1e-3,
                       random_state=42):
    """
    Build a Pipeline: StandardScaler -> MLPRegressor
    with the given hyperparameters.
    """
    mlp = MLPRegressor(
        hidden_layer_sizes=hidden_layer_sizes,
        activation="relu",
        solver="adam",
        alpha=alpha,                  # L2 regularization
        learning_rate="adaptive",
        learning_rate_init=learning_rate_init,
        max_iter=300,
        early_stopping=True,
        validation_fraction=0.1,
        n_iter_no_change=10,
        random_state=random_state
    )
    return Pipeline([
        ("scaler", StandardScaler()),
        ("mlp", mlp)
    ])


def cv_tune_mlp_on_train(X_train, y_train,
                         param_grid,
                         n_splits=5,
                         random_state=42):
    """
    Hyperparameter tuning using K-fold CV on the 80% train set.

    For each hyperparameter configuration:
      - Run K-fold CV on (X_train, y_train)
      - Compute mean RMSE across folds
    Choose the configuration with the lowest mean RMSE.

    Returns:
      best_params (dict), best_mean_rmse (float)
    """
    cv = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)

    best_rmse = np.inf
    best_params = None

    for params in param_grid:
        fold_rmses = []

        for train_idx, val_idx in cv.split(X_train, y_train):
            X_tr, X_val = X_train[train_idx], X_train[val_idx]
            y_tr, y_val = y_train[train_idx], y_train[val_idx]

            model = build_mlp_pipeline(
                hidden_layer_sizes=params["hidden_layer_sizes"],
                alpha=params["alpha"],
                learning_rate_init=params["learning_rate_init"],
                random_state=random_state
            )

            model.fit(X_tr, y_tr)
            y_val_pred = model.predict(X_val)
            rmse = np.sqrt(mean_squared_error(y_val, y_val_pred))
            fold_rmses.append(rmse)

        mean_rmse = np.mean(fold_rmses)

        if mean_rmse < best_rmse:
            best_rmse = mean_rmse
            best_params = params

    return best_params, best_rmse


def run_tf_training_with_heldout_test(X, y, pert_name,
                                      train_idx,
                                      test_idx,
                                      param_grid,
                                      cv_splits=5,
                                      random_state=42):
    """
    For one TF (one target y):
      - Use global train_idx/test_idx to define 80%/20% split.
      - On the 80% (train) perform CV-based hyperparameter tuning.
      - Retrain best model on full 80%.
      - Evaluate once on the 20% held-out test set.
    """

    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]

    # Hyperparameter tuning on train set only
    best_params, best_cv_rmse = cv_tune_mlp_on_train(
        X_train, y_train,
        param_grid=param_grid,
        n_splits=cv_splits,
        random_state=random_state
    )

    # Train final model on full 80% using best hyperparams
    final_model = build_mlp_pipeline(
        hidden_layer_sizes=best_params["hidden_layer_sizes"],
        alpha=best_params["alpha"],
        learning_rate_init=best_params["learning_rate_init"],
        random_state=random_state
    )
    final_model.fit(X_train, y_train)

    # Evaluate on the untouched 20%
    y_pred = final_model.predict(X_test)
    test_rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    test_corr, _ = pearsonr(y_test, y_pred)

    return {
        "perturbation": pert_name,
        "rmse_test": test_rmse,
        "pearson_corr_test": test_corr,
        "cv_rmse_mean": best_cv_rmse,
        "hidden_layer_sizes": best_params["hidden_layer_sizes"],
        "alpha": best_params["alpha"],
        "learning_rate_init": best_params["learning_rate_init"]
    }

In [13]:

# ======================
# Load input data
# ======================
EMBEDDING_TYPE = "tss"  # or "gencode"

input_file = "../../../data/active_guides_CRISPRa_mean_pop_mean.csv"
mean_pop = pd.read_csv(input_file, index_col=0)

# Load embeddings and metadata based on selected type
if EMBEDDING_TYPE == "tss":
    embeddings_path = "../../../embeddings/embeddings_enformer_tss.npy"
    metadata_path = "../../../embeddings/enformer_gene_names.txt"
    print(f"Loading TSS embeddings from: {embeddings_path}")
elif EMBEDDING_TYPE == "gencode":
    embeddings_path = "../../../embeddings/embeddings_enformer_gencode.v49.pc_transcripts.npy"
    metadata_path = "../../../embeddings/gencode.v49.pc_transcripts_gene_names.txt"
    print(f"Loading Gencode v49 PC transcripts embeddings from: {embeddings_path}")
else:
    raise ValueError(f"Unknown embedding type: {EMBEDDING_TYPE}. Use 'tss' or 'gencode'.")

embeddings = np.load(embeddings_path, allow_pickle=True)
print(f"Embeddings shape: {embeddings.shape}")

meta_table = pd.read_csv(metadata_path, sep="\t", header=None, names=["index", "gene"])
print(f"Loaded {len(meta_table)} gene names from metadata")

embeddings = pd.DataFrame(embeddings, index=meta_table['gene'])

# Align datasets: same genes in embeddings and mean_pop
embeddings = embeddings.groupby(embeddings.index).mean()
embeddings.head()
common_genes = mean_pop.columns.intersection(embeddings.index)
X = embeddings.loc[common_genes]            # genes x features
Y = mean_pop[common_genes].T               # genes x perturbations

perturb_names = Y.columns

print("X shape:", X.shape, "| Y shape:", Y.shape)

# ======================
# Create a single 80/20 train/test split for genes
# ======================
n_genes = X.shape[0]
all_indices = np.arange(n_genes)

train_idx, test_idx = train_test_split(
    all_indices,
    test_size=0.2,
    shuffle=True,
    random_state=42
)

print(f"Train genes: {len(train_idx)}, Test genes: {len(test_idx)}")

# Convert X to numpy once
X_np = X.values



Loading TSS embeddings from: ../../../embeddings/embeddings_enformer_tss.npy
Embeddings shape: (19685, 32)
Loaded 19685 gene names from metadata
X shape: (4773, 32) | Y shape: (4773, 231)
Train genes: 3818, Test genes: 955
