In [17]:
import os
import gc
import sys
import uuid
import math
import copy
import time
import glob
import joblib
import zipfile
import pathlib
import warnings
import itertools
from pprint import pprint
from typing import List, Tuple
from itertools import combinations

warnings.filterwarnings("ignore")

In [7]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm


In [13]:
from sklearn.datasets import load_iris
from sklearn.datasets import make_classification

X, y = load_iris(return_X_y=True)
X, y = make_classification(n_samples=100_000, n_features=50, random_state=42)

X.shape, y.shape

((100000, 50), (100000,))

# Feature preprocessing

In [None]:
from sklearn.feature_extraction import DictVectorizer

vec = DictVectorizer()
vec.fit_transform(measurements).toarray()

# Feature Rescaling

In [20]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import StandardScaler


X_scaled = MinMaxScaler().fit_transform(X)
X_scaled = RobustScaler().fit_transform(X)
X_scaled = StandardScaler().fit_transform(X)


X_scaled.shape, y.shape

((100000, 50), (100000,))

# Feature Engineering

In [21]:
from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import KBinsDiscretizer


poly = PolynomialFeatures(degree=2, include_bias=False)
binning = KBinsDiscretizer(n_bins=3, encode="ordinal", strategy="uniform")

# Label Engineering

In [19]:
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder


# label preprocessing
le = LabelEncoder()
le = OneHotEncoder()

# Feature Decomposition

In [23]:
from sklearn.decomposition import PCA

pca = PCA(n_components=8)
X_pca = pca.fit_transform(X_scaled)

print("Original shape:", X.shape)
print("Transformed shape:", X_pca.shape)
print("Explained variance ratio:", pca.explained_variance_ratio_)

Original shape: (100000, 50)
Transformed shape: (100000, 8)
Explained variance ratio: [0.05729842 0.02281557 0.02078212 0.02073949 0.02065715 0.02062331
 0.02058258 0.02053005]


# time feature extraction

In [29]:
df_time["year"] = df_time["timestamp"].dt.year
df_time["month"] = df_time["timestamp"].dt.month
df_time["dayofweek"] = df_time["timestamp"].dt.dayofweek
df_time["is_weekend"] = df_time["dayofweek"] >= 5

NameError: name 'df_time' is not defined

# Feature Selection

In [31]:
from sklearn.datasets import load_iris
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_classif


X, y = load_iris(return_X_y=True)
X.shape
X_new = SelectKBest(f_classif, k=2).fit_transform(X, y)
X_new.shape

(150, 2)

In [32]:
from sklearn.svm import LinearSVC
from sklearn.datasets import load_iris
from sklearn.feature_selection import SelectFromModel


X, y = load_iris(return_X_y=True)
X.shape
lsvc = LinearSVC(C=0.01, penalty="l1", dual=False).fit(X, y)
model = SelectFromModel(lsvc, prefit=True)
X_new = model.transform(X)
X_new.shape

(150, 3)

In [33]:
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.datasets import load_iris
from sklearn.feature_selection import SelectFromModel


X, y = load_iris(return_X_y=True)
X.shape
clf = ExtraTreesClassifier(n_estimators=50)
clf = clf.fit(X, y)
clf.feature_importances_  
model = SelectFromModel(clf, prefit=True)
X_new = model.transform(X)
X_new.shape               

(150, 2)

# Regression Task

In [None]:
# sklearn ≥1.2
import numpy as np
import pandas as pd
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split, cross_validate, KFold
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import make_scorer, mean_squared_error, mean_absolute_error, r2_score

# Models (linear, neighbors, SVM, trees, boosting)
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.neighbors import KNeighborsRegressor
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor, GradientBoostingRegressor
from sklearn.ensemble import HistGradientBoostingRegressor  # fast tree boosting

# ---------------------------
# 1) Data
# ---------------------------
X, y = fetch_california_housing(return_X_y=True, as_frame=True)
num_cols = X.columns  # all numeric here
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# ---------------------------
# 2) Preprocessing blocks
# ---------------------------
# For models that benefit from scaling
pre_scaled = ColumnTransformer([
    ("num", Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler())
    ]), num_cols)
])

# For tree-based models (no scaling needed)
pre_tree = ColumnTransformer([
    ("num", SimpleImputer(strategy="median"), num_cols)
])

def pipe(estimator, scaled: bool):
    return Pipeline([
        ("pre", pre_scaled if scaled else pre_tree),
        ("model", estimator)
    ])

# ---------------------------
# 3) Models to compare
# ---------------------------
models = {
    # scaled
    "Linear":   pipe(LinearRegression(), True),
    "Ridge":    pipe(Ridge(alpha=10.0, random_state=42), True),
    "Lasso":    pipe(Lasso(alpha=0.001, random_state=42, max_iter=10000), True),
    "Elastic":  pipe(ElasticNet(alpha=0.001, l1_ratio=0.5, random_state=42, max_iter=10000), True),
    "KNN":      pipe(KNeighborsRegressor(n_neighbors=10), True),
    "SVR":      pipe(SVR(C=10.0, epsilon=0.1, kernel="rbf"), True),

    # trees/boosting (unscaled)
    "RF":       pipe(RandomForestRegressor(n_estimators=400, random_state=42, n_jobs=-1), False),
    "ET":       pipe(ExtraTreesRegressor(n_estimators=400, random_state=42, n_jobs=-1), False),
    "GBRT":     pipe(GradientBoostingRegressor(random_state=42), False),
    "HGB":      pipe(HistGradientBoostingRegressor(random_state=42), False),
}

# ---------------------------
# 4) CV evaluation
# ---------------------------
scoring = {
    "rmse": make_scorer(mean_squared_error, squared=False),
    "mae":  make_scorer(mean_absolute_error),
    "r2":   make_scorer(r2_score),
}
cv = KFold(n_splits=5, shuffle=True, random_state=42)

summary = []
for name, estimator in models.items():
    cv_res = cross_validate(estimator, X_train, y_train, cv=cv, scoring=scoring, n_jobs=-1)
    summary.append({
        "model": name,
        "rmse_mean": np.mean(cv_res["test_rmse"]),
        "rmse_std":  np.std(cv_res["test_rmse"]),
        "mae_mean":  np.mean(cv_res["test_mae"]),
        "r2_mean":   np.mean(cv_res["test_r2"]),
    })

summary_df = pd.DataFrame(summary).sort_values("rmse_mean")
print(summary_df.to_string(index=False))

# ---------------------------
# 5) Fit best model on full training set and test
# ---------------------------
best_name = summary_df.iloc[0]["model"]
best_model = models[best_name].fit(X_train, y_train)
y_pred = best_model.predict(X_test)

rmse = mean_squared_error(y_test, y_pred, squared=False)
mae  = mean_absolute_error(y_test, y_pred)
r2   = r2_score(y_test, y_pred)

print(f"\nBest CV model: {best_name}")
print(f"Test RMSE: {rmse:.4f} | MAE: {mae:.4f} | R2: {r2:.4f}")


# Classification Task

In [36]:
# sklearn ≥1.2
import numpy as np
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split, cross_validate, StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    make_scorer, accuracy_score, f1_score, roc_auc_score, log_loss
)

# Models
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier, GradientBoostingClassifier
from sklearn.ensemble import HistGradientBoostingClassifier

# ---------------------------
# 1) Data (binary classification)
# ---------------------------
X_np, y = load_breast_cancer(return_X_y=True, as_frame=False)
X = pd.DataFrame(X_np)  # all numeric
num_cols = X.columns

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# ---------------------------
# 2) Preprocessing
# ---------------------------
pre_scaled = ColumnTransformer([
    ("num", Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler())
    ]), num_cols)
])

pre_tree = ColumnTransformer([
    ("num", SimpleImputer(strategy="median"), num_cols)
])

def pipe(estimator, scaled: bool):
    return Pipeline([
        ("pre", pre_scaled if scaled else pre_tree),
        ("model", estimator)
    ])

# ---------------------------
# 3) Models to compare
# ---------------------------
models = {
    # scaled (need standardization)
    "LogReg": pipe(LogisticRegression(max_iter=500, n_jobs=-1, random_state=42), True),
    "KNN":    pipe(KNeighborsClassifier(n_neighbors=11), True),
    "SVC":    pipe(SVC(C=2.0, kernel="rbf", probability=True, random_state=42), True),

    # trees/boosting (no scaling required)
    "RF":     pipe(RandomForestClassifier(n_estimators=400, random_state=42, n_jobs=-1), False),
    "ET":     pipe(ExtraTreesClassifier(n_estimators=400, random_state=42, n_jobs=-1), False),
    "GBDT":   pipe(GradientBoostingClassifier(random_state=42), False),
    "HGB":    pipe(HistGradientBoostingClassifier(random_state=42), False),
}

# ---------------------------
# 4) CV evaluation
# ---------------------------
scoring = {
    "accuracy": "accuracy",
    "f1":       make_scorer(f1_score),  # binary by default; use average="macro" for multiclass
    "roc_auc":  "roc_auc",
    "logloss":  "neg_log_loss",
}
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

summary = []
for name, estimator in models.items():
    cv_res = cross_validate(estimator, X_train, y_train, cv=cv, scoring=scoring, n_jobs=-1)
    summary.append({
        "model": name,
        "acc_mean":  np.mean(cv_res["test_accuracy"]),
        "f1_mean":   np.mean(cv_res["test_f1"]),
        "auc_mean":  np.mean(cv_res["test_roc_auc"]),
        "logloss_mean": -np.mean(cv_res["test_logloss"]),  # negate back
    })

summary_df = pd.DataFrame(summary).sort_values("auc_mean", ascending=False)
print(summary_df.to_string(index=False))

# ---------------------------
# 5) Fit best model on full train and test
# ---------------------------
best_name = summary_df.iloc[0]["model"]
best_model = models[best_name].fit(X_train, y_train)

# Use probabilities for AUC/logloss when available
try:
    y_prob = best_model.predict_proba(X_test)[:, 1]
except Exception:
    # fallback for models without predict_proba (shouldn’t happen here)
    y_prob = best_model.decision_function(X_test)
    # squish to (0,1) if needed
    from scipy.special import expit
    y_prob = expit(y_prob)

y_pred = (y_prob >= 0.5).astype(int)

print(f"\nBest CV model: {best_name}")
print("Test Accuracy:", accuracy_score(y_test, y_pred))
print("Test F1:", f1_score(y_test, y_pred))
print("Test ROC AUC:", roc_auc_score(y_test, y_prob))
print("Test LogLoss:", log_loss(y_test, y_prob))


 model  acc_mean  f1_mean  auc_mean  logloss_mean
   SVC  0.967033 0.973782  0.996078      0.080202
LogReg  0.978022 0.982544  0.995872      0.072316
    ET  0.973626 0.978883  0.993602      0.112830
   HGB  0.969231 0.975528  0.993292      0.130091
   KNN  0.962637 0.970849  0.992157      0.175421
  GBDT  0.951648 0.961363  0.991950      0.129491
    RF  0.960440 0.968348  0.990557      0.122395

Best CV model: SVC
Test Accuracy: 0.9824561403508771
Test F1: 0.9861111111111112
Test ROC AUC: 0.9966931216931216
Test LogLoss: 0.07921591211872678


# Classification Metrics

In [None]:
from sklearn.metrics import (
    accuracy_score, f1_score, roc_auc_score, average_precision_score,
    log_loss, brier_score_loss, confusion_matrix, classification_report,
    top_k_accuracy_score
)

# y_true: 1D ints; y_pred: predicted labels; y_prob: prob for positive class
acc  = accuracy_score(y_true, y_pred)
f1   = f1_score(y_true, y_pred, average="binary")      # "macro"/"micro" for multiclass
auc  = roc_auc_score(y_true, y_prob)                   # binary (use OvR for multiclass probs)
ap   = average_precision_score(y_true, y_prob)         # PR AUC (handles imbalance well)
ll   = log_loss(y_true, y_proba_all_classes)           # requires prob for all classes
bs   = brier_score_loss(y_true, y_prob)                # calibration (lower is better)
cm   = confusion_matrix(y_true, y_pred)
rep  = classification_report(y_true, y_pred)
topk = top_k_accuracy_score(y_true, y_proba_all_classes, k=3)  # multiclass@k


# Regression Metrics

In [25]:
from sklearn.metrics import (
    r2_score, mean_squared_error, mean_absolute_error,
    median_absolute_error, mean_absolute_percentage_error,
    explained_variance_score, mean_pinball_loss
)

r2   = r2_score(y_true, y_pred)
rmse = mean_squared_error(y_true, y_pred, squared=False)
mae  = mean_absolute_error(y_true, y_pred)
medae= median_absolute_error(y_true, y_pred)
mape = mean_absolute_percentage_error(y_true, y_pred)
evs  = explained_variance_score(y_true, y_pred)
q90  = mean_pinball_loss(y_true, y_pred_q90, alpha=0.90)  # quantile/Pinball loss


NameError: name 'y_true' is not defined

# Clustering Metrics

In [None]:
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score, adjusted_rand_score

# Unsupervised quality (requires features X and cluster labels)
sil  = silhouette_score(X, labels)              # higher is better
ch   = calinski_harabasz_score(X, labels)       # higher is better
db   = davies_bouldin_score(X, labels)          # lower is better

# If you *do* have true labels for evaluation
from sklearn.metrics import normalized_mutual_info_score
ari = adjusted_rand_score(y_true, labels)
nmi = normalized_mutual_info_score(y_true, labels)

In [28]:
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import roc_auc_score, make_scorer
from lightgbm import LGBMClassifier

# Split feature types
num_cols = X.select_dtypes(include=np.number).columns
cat_cols = X.columns.difference(num_cols)

pre = ColumnTransformer([
    ("num", Pipeline([("scaler", StandardScaler())]), num_cols),
    ("cat", OneHotEncoder(handle_unknown="ignore")),  # safe for categoricals
])

clf = LGBMClassifier(
    n_estimators=2000,          # big enough; rely on early stopping in manual CV or reduce if slow
    learning_rate=0.05,
    random_state=42,
    n_jobs=-1
)

pipe = Pipeline([
    ("pre", pre),
    ("clf", clf),
])

param_dist = {
    "clf__num_leaves":        np.arange(31, 256),
    "clf__min_child_samples": np.arange(5, 200),
    "clf__subsample":         np.linspace(0.5, 1.0, 11),
    "clf__colsample_bytree":  np.linspace(0.5, 1.0, 11),
    "clf__reg_alpha":         10.0 ** np.linspace(-3, 1, 9),   # L1
    "clf__reg_lambda":        10.0 ** np.linspace(-3, 1, 9),   # L2
    "clf__learning_rate":     10.0 ** np.linspace(-2.0, -0.7, 10),
}

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
search = RandomizedSearchCV(
    pipe,
    param_distributions=param_dist,
    n_iter=60,
    scoring=make_scorer(roc_auc_score, needs_proba=True),
    cv=cv,
    n_jobs=-1,
    verbose=1,
    refit=True,
    random_state=42,
)
search.fit(X, y)

print("Best AUC:", search.best_score_)
print("Best params:", search.best_params_)
best_model = search.best_estimator_


AttributeError: 'numpy.ndarray' object has no attribute 'select_dtypes'

In [26]:
from sklearn.experimental import enable_halving_search_cv  # noqa
from sklearn.model_selection import HalvingRandomSearchCV

halving = HalvingRandomSearchCV(
    pipe,
    param_distributions=param_dist,
    factor=3,               # how aggressively to prune
    scoring="roc_auc",
    cv=cv,
    n_jobs=-1,
    verbose=1,
    random_state=42
)
halving.fit(X, y)
print("Best AUC:", halving.best_score_)


NameError: name 'pipe' is not defined

In [27]:
import optuna
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
from lightgbm import LGBMClassifier

def objective(trial):
    params = {
        "n_estimators":      3000,
        "learning_rate":     trial.suggest_float("lr", 1e-2, 2e-1, log=True),
        "num_leaves":        trial.suggest_int("num_leaves", 31, 255),
        "min_child_samples": trial.suggest_int("min_child_samples", 5, 200),
        "subsample":         trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bytree":  trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "reg_alpha":         trial.suggest_float("reg_alpha", 1e-3, 10, log=True),
        "reg_lambda":        trial.suggest_float("reg_lambda", 1e-3, 10, log=True),
        "random_state": 42,
        "n_jobs": -1,
    }

    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    scores = []
    for tr, va in cv.split(X, y):
        # fit on raw X; or wrap the same ColumnTransformer as above
        model = LGBMClassifier(**params)
        model.fit(X.iloc[tr], y[tr])
        p = model.predict_proba(X.iloc[va])[:, 1]
        scores.append(roc_auc_score(y[va], p))
    return float(np.mean(scores))

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=60, show_progress_bar=True)
print("Best score:", study.best_value)
print("Best params:", study.best_trial.params)


[I 2025-09-08 01:29:41,018] A new study created in memory with name: no-name-0be7bf26-75de-4739-aa9b-01621f0af954


  0%|          | 0/60 [00:00<?, ?it/s]

[W 2025-09-08 01:29:41,036] Trial 0 failed with parameters: {'lr': 0.024520933100169007, 'num_leaves': 204, 'min_child_samples': 30, 'subsample': 0.980222949723482, 'colsample_bytree': 0.7041112579202558, 'reg_alpha': 1.6932713331747067, 'reg_lambda': 2.1667000557877443} because of the following error: AttributeError("'numpy.ndarray' object has no attribute 'iloc'").
Traceback (most recent call last):
  File "/data/sahand/home/quera/mapna-ai-2025/venv/lib/python3.10/site-packages/optuna/study/_optimize.py", line 201, in _run_trial
    value_or_values = func(trial)
  File "/tmp/ipykernel_249556/2915676693.py", line 25, in objective
    model.fit(X.iloc[tr], y[tr])
AttributeError: 'numpy.ndarray' object has no attribute 'iloc'
[W 2025-09-08 01:29:41,037] Trial 0 failed with value None.


AttributeError: 'numpy.ndarray' object has no attribute 'iloc'

In [24]:
def generate_features(
    df: pd.DataFrame,
    interactions: bool = True,
    nonlinear: bool = True,
    nonlinear_funcs: Tuple[str, ...] = ("square", "cube", "log"),
    interaction_degree: int = 2,                       # 2 = pairwise (original behavior)
    include_ratios: bool = False,                      # add col_i / col_j (i != j)
    include_diffs: bool = False,                       # add col_i - col_j
    drop_constant: bool = True,                        # drop zero-variance new features
    eps: float = 1e-12                                 # numerical stability for log/reciprocal/ratios
) -> pd.DataFrame:
    """
    Generate interaction and nonlinear features from a DataFrame.

    Defaults replicate the original behavior:
      - pairwise products when interactions=True
      - square, cube, (safe) log when nonlinear=True

    EXTRA OPTIONS:
      - columns: limit to selected columns (default = numeric columns)
      - nonlinear_funcs: choose from {"square","cube","sqrt","log","exp","abs","reciprocal"}
      - interaction_degree: k-way products (k>=2), e.g. 3 => x1*x2*x3
      - include_ratios: pairwise ratios x_i / x_j
      - include_diffs: pairwise differences x_i - x_j
      - return_only_new: return only engineered cols (not the originals)
      - drop_constant: remove zero-variance engineered cols
    """
    cols = df.select_dtypes(include=[np.number]).columns.tolist()

    # keep original columns unless return_only_new
    df_new = df.copy()

    # ------------ Nonlinear transforms ------------
    if nonlinear and nonlinear_funcs:
        for col in cols:
            s = df[col].astype(float)

            if "square" in nonlinear_funcs:
                df_new[f"{col}^2"] = s ** 2
            if "cube" in nonlinear_funcs:
                df_new[f"{col}^3"] = s ** 3
            if "sqrt" in nonlinear_funcs:
                # sqrt only where non-negative; else NaN
                df_new[f"sqrt_{col}"] = np.where(s >= 0, np.sqrt(s), np.nan)
            if "abs" in nonlinear_funcs:
                df_new[f"abs_{col}"] = np.abs(s)
            if "exp" in nonlinear_funcs:
                df_new[f"exp_{col}"] = np.exp(np.clip(s, a_min=None, a_max=50))  # avoid overflow
            if "reciprocal" in nonlinear_funcs:
                df_new[f"1/{col}"] = np.where(np.abs(s) > eps, 1.0 / s, np.nan)
            if "log" in nonlinear_funcs:
                # safe log: shift only rows where s <= -1 to keep log1p valid
                # (minimal shift per column so s_shifted >= -1 + eps)
                min_allowed = (-1 + eps)
                shift = 0.0
                min_val = float(np.nanmin(s.values))
                if min_val <= min_allowed:
                    shift = (min_allowed - min_val)
                df_new[f"log_{col}"] = np.log1p(s + shift)

    # ------------ Interaction features ------------
    if interactions and interaction_degree >= 2:
        # k-way distinct-feature products
        for k in range(2, interaction_degree + 1):
            for combo in combinations(cols, k):
                name = "*".join(combo)
                prod = np.ones(len(df), dtype=float)
                for c in combo:
                    prod = prod * df[c].astype(float)
                df_new[name] = prod

    # ------------ Pairwise ratios & differences ------------
    if include_ratios or include_diffs:
        for a, b in combinations(cols, 2):
            a_s, b_s = df[a].astype(float), df[b].astype(float)
            if include_diffs:
                df_new[f"{a}-{b}"] = a_s - b_s
                df_new[f"{b}-{a}"] = b_s - a_s
            if include_ratios:
                df_new[f"{a}/{b}"] = np.where(np.abs(b_s) > eps, a_s / b_s, np.nan)
                df_new[f"{b}/{a}"] = np.where(np.abs(a_s) > eps, b_s / a_s, np.nan)

    # ------------ Clean up ------------
    # Replace inf with NaN (may appear in ratios/reciprocal/exp)
    df_new = df_new.replace([np.inf, -np.inf], np.nan)

    if drop_constant:
        # drop newly created constant columns
        constant_cols = [c for c in df_new.columns if df_new[c].nunique(dropna=True) <= 1]
        # but keep original columns if return_only_new=False
        if return_only_new:
            df_new = df_new.drop(columns=constant_cols)
        else:
            # drop only constants that were NOT in the original frame
            extras = set(df_new.columns) - set(df.columns)
            to_drop = list(set(constant_cols) & extras)
            if to_drop:
                df_new = df_new.drop(columns=to_drop)

    return df_new

In [None]:
import numpy as np
import pandas as pd
from typing import List, Tuple, Optional

def select_by_target_corr(
    X: pd.DataFrame,
    y: pd.Series | np.ndarray,
    *,
    method: str = "pearson",          # "pearson" or "spearman"
    top_k: Optional[int] = None,      # keep k strongest features
    threshold: Optional[float] = None,# keep |corr| >= threshold
    drop_constant: bool = True,
    return_scores: bool = False
) -> List[str] | Tuple[List[str], pd.Series]:
    """
    Select features by absolute correlation with the target.

    - For non-numeric y, factorizes to integers (OK for binary classification).
    - Only numeric X columns are considered.

    Returns:
        - list of selected feature names
        - (optional) full absolute correlation Series (sorted desc)
    """
    # 1) numeric features only
    num_cols = X.select_dtypes(include=[np.number]).columns
    Xn = X[num_cols].copy()

    # 2) drop constant cols
    if drop_constant:
        const_cols = Xn.columns[Xn.nunique(dropna=True) <= 1]
        Xn = Xn.drop(columns=const_cols)

    # 3) make y numeric if needed
    y = pd.Series(y, index=X.index if isinstance(X, pd.DataFrame) else None)
    if not pd.api.types.is_numeric_dtype(y):
        y, _ = pd.factorize(y)  # unseen labels at test-time are not handled here
        y = pd.Series(y, index=X.index)

    # 4) compute correlations (pairwise NaN handling by pandas)
    corr = Xn.apply(lambda s: s.corr(y, method=method)).abs().sort_values(ascending=False)

    # 5) select by threshold/top_k
    selected = corr.index
    if threshold is not None:
        selected = corr[corr >= float(threshold)].index
    if top_k is not None:
        selected = selected[:int(top_k)]

    selected = list(selected)

    return (selected, corr) if return_scores else selected
