In [1]:
# =============================
# Standard Libraries
# =============================
import os
import time
import math
import io
import zipfile
import requests
from urllib.parse import urlparse
from itertools import chain, combinations
import json
import re
import warnings
import logging

# progress / kaggle
from tqdm.auto import tqdm
import kagglehub
from IPython.display import display

# =============================
# Data Science Libraries
# =============================
import numpy as np
import pandas as pd
import optuna
import scipy.stats as stats
from scipy import sparse
from scipy.stats import loguniform, randint, uniform
from scipy.special import expit, logit

# =============================
# Visualization
# =============================
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.ticker as mticker
import seaborn as sns
from matplotlib.colors import ListedColormap


# =============================
# Scikit-learn
# =============================
from sklearn import clone
from sklearn.model_selection import (
    train_test_split,
    cross_val_score,
    GridSearchCV,
    RandomizedSearchCV,
    RepeatedKFold,
    ParameterGrid,
    ParameterSampler,
    KFold,
    RepeatedStratifiedKFold,
    StratifiedKFold,
)
import sklearn.model_selection._search as sk_search
import sklearn.model_selection._validation as sk_validation

from sklearn.preprocessing import (
    StandardScaler,
    OrdinalEncoder,
    MultiLabelBinarizer,
    Normalizer,
    MinMaxScaler,
)
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
    mean_squared_error,
    mean_absolute_error,
    r2_score,
    accuracy_score,
    confusion_matrix,
    ConfusionMatrixDisplay,
    classification_report,
    roc_auc_score,
    roc_curve,
    auc,
    get_scorer,
)
from sklearn.feature_selection import (
    SequentialFeatureSelector,
    f_regression,
    SelectKBest,
    mutual_info_classif,
)
from sklearn.linear_model import (
    LinearRegression,
    Ridge,
    Lasso,
    ElasticNet,
    RidgeClassifier,
    LogisticRegression,
    RidgeCV,
    LassoCV,
    ElasticNetCV,
)
from sklearn.ensemble import (
    BaggingRegressor,
    RandomForestRegressor,
    GradientBoostingRegressor,
    RandomForestClassifier,
    GradientBoostingClassifier,
)
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.cross_decomposition import PLSRegression
from sklearn.decomposition import PCA
from sklearn.utils import resample
from sklearn.exceptions import NotFittedError
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.multiclass import OneVsRestClassifier
from sklearn.svm import LinearSVC, SVC
from sklearn.calibration import CalibratedClassifierCV
from sklearn.compose import ColumnTransformer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.kernel_approximation import Nystroem
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.inspection import permutation_importance
from sklearn.neighbors import KNeighborsRegressor

# extra joblib tools
from joblib import Parallel, delayed, parallel_backend

import math
import numpy as np
import pandas as pd
import warnings

from sklearn.model_selection import cross_val_score, StratifiedKFold, KFold, RandomizedSearchCV, cross_val_predict
from sklearn.pipeline import Pipeline as SKPipeline
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_regression, f_classif, VarianceThreshold
from sklearn.impute import SimpleImputer
from sklearn.exceptions import ConvergenceWarning
from sklearn.metrics import f1_score, confusion_matrix

from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier, GradientBoostingRegressor, RandomForestRegressor
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.svm import LinearSVC
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.dummy import DummyClassifier, DummyRegressor

from sklearn.base import BaseEstimator, TransformerMixin

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, TimeSeriesSplit, RandomizedSearchCV
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.feature_selection import SequentialFeatureSelector, SelectKBest
from sklearn.feature_selection import mutual_info_classif, mutual_info_regression
from sklearn.metrics import f1_score, confusion_matrix, mean_absolute_error
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import RandomOverSampler


# =============================
# Global Settings
# =============================
warnings.filterwarnings("ignore", category=UserWarning)
logging.getLogger("optuna").setLevel(logging.WARNING)

random_state = 42
N_ROWS = 1_000_000
pd.set_option("display.float_format", lambda x: f"{x:.6f}")  # no scientific notation

In [2]:
def robust_eda(df, name):
    # simple settings
    top_k_categories = 20
    max_corr_cols = 30
    max_rows_to_show = 25

    # make printing wide and avoid scientific notation
    with pd.option_context(
        "display.max_rows", max_rows_to_show,
        "display.max_columns", None,
        "display.width", 1000,
        "display.max_colwidth", 200,
        "display.float_format", lambda x: f"{x:.6f}"
    ):
        report_lines = []

        # title
        report_lines.append(f"=== Robust EDA Report: {name} ===")

        # shapes and memory
        info_df = pd.DataFrame(
            {
                "rows": [df.shape[0]],
                "columns": [df.shape[1]],
                "memory_bytes": [int(df.memory_usage(deep=True).sum())],
            }
        )
        report_lines.append("\n=== Info ===")
        report_lines.append(info_df.to_string(index=False))

        # dtypes
        dtypes_df = (
            df.dtypes.rename("dtype")
            .astype(str)
            .reset_index()
            .rename(columns={"index": "column"})
            .sort_values("column")
            .reset_index(drop=True)
        )
        report_lines.append("\n=== Dtypes ===")
        report_lines.append(dtypes_df.head(max_rows_to_show).to_string(index=False))

        # missing values
        total_rows = len(df)
        missing_counts = df.isna().sum()
        if total_rows > 0:
            missing_percent = (missing_counts / total_rows * 100).round(2)
        else:
            missing_percent = pd.Series([0] * len(df.columns), index=df.columns)
        missing_df = (
            pd.DataFrame(
                {
                    "column": df.columns,
                    "missing_count": missing_counts.values,
                    "missing_percent": missing_percent.values,
                }
            )
            .sort_values(["missing_count", "missing_percent"], ascending=False)
            .reset_index(drop=True)
        )
        report_lines.append("\n=== Missing Values ===")
        report_lines.append(missing_df.head(max_rows_to_show).to_string(index=False))

        # duplicates (safe fallback for unhashable types)
        try:
            duplicate_count = int(df.duplicated().sum())
            duplicate_index = df.index[df.duplicated(keep=False)]
            duplicates_preview_df = df.loc[duplicate_index].head(20)
        except TypeError:
            df_hashable = df.astype(str)
            duplicate_count = int(df_hashable.duplicated().sum())
            duplicate_index = df_hashable.index[df_hashable.duplicated(keep=False)]
            duplicates_preview_df = df.loc[duplicate_index].head(20)

        duplicates_summary_df = pd.DataFrame({"duplicate_rows": [duplicate_count]})
        report_lines.append("\n=== Duplicates Summary ===")
        report_lines.append(duplicates_summary_df.to_string(index=False))
        report_lines.append("\n=== Duplicates Preview (up to 20 rows) ===")
        if len(duplicates_preview_df) > 0:
            report_lines.append(duplicates_preview_df.to_string(index=False))
        else:
            report_lines.append("(No duplicate rows found.)")

        # column groups
        numeric_columns = df.select_dtypes(include=np.number).columns.tolist()
        categorical_columns = df.select_dtypes(exclude=np.number).columns.tolist()

        # numeric summary
        if len(numeric_columns) > 0:
            percentiles = [0.05, 0.25, 0.50, 0.75, 0.95]
            numeric_summary_df = (
                df[numeric_columns]
                .describe(percentiles=percentiles)
                .T.reset_index()
                .rename(columns={"index": "column"})
            )
            report_lines.append("\n=== Numeric Summary (5%..95%) ===")
            report_lines.append(numeric_summary_df.head(max_rows_to_show).to_string(index=False))

            # skew and kurtosis
            skew_kurt_df = pd.DataFrame(
                {
                    "column": numeric_columns,
                    "skew": df[numeric_columns].skew(numeric_only=True).values,
                    "kurtosis": df[numeric_columns].kurtosis(numeric_only=True).values,
                }
            )
            report_lines.append("\n=== Skew and Kurtosis ===")
            report_lines.append(skew_kurt_df.head(max_rows_to_show).to_string(index=False))

            # IQR outliers per column
            q1 = df[numeric_columns].quantile(0.25)
            q3 = df[numeric_columns].quantile(0.75)
            iqr = q3 - q1
            outlier_mask = (df[numeric_columns] < (q1 - 1.5 * iqr)) | (df[numeric_columns] > (q3 + 1.5 * iqr))
            iqr_outliers_df = (
                outlier_mask.sum()
                .rename("outlier_count")
                .reset_index()
                .rename(columns={"index": "column"})
            )
            report_lines.append("\n=== IQR Outlier Counts ===")
            report_lines.append(iqr_outliers_df.head(max_rows_to_show).to_string(index=False))

            # correlation on first N numeric columns
            if len(numeric_columns) > 1:
                selected_cols = numeric_columns[:max_corr_cols]
                correlation_df = df[selected_cols].corr(method="pearson", numeric_only=True)
                correlation_df.index.name = "column"
                report_lines.append(f"\n=== Correlation (first {max_corr_cols} numeric columns) ===")
                report_lines.append(correlation_df.to_string())
        else:
            report_lines.append("\n(No numeric columns found.)")

        # categorical value counts (top K each)
        if len(categorical_columns) > 0:
            cat_rows = []
            for col in categorical_columns:
                try:
                    vc = df[col].value_counts(dropna=False).head(top_k_categories)
                except TypeError:
                    vc = df[col].astype(str).value_counts(dropna=False).head(top_k_categories)
                for value, count in vc.items():
                    percent = (count / total_rows * 100) if total_rows > 0 else 0
                    cat_rows.append(
                        {"column": col, "value": value, "count": int(count), "percent": round(percent, 2)}
                    )
            categorical_values_df = pd.DataFrame(cat_rows)
            report_lines.append(f"\n=== Categorical Values (Top {top_k_categories} per column) ===")
            report_lines.append(categorical_values_df.head(max_rows_to_show).to_string(index=False))
        else:
            report_lines.append("\n(No categorical columns found.)")

        # unique counts per column
        def _safe_nunique(series):
            try:
                return int(series.nunique(dropna=False))
            except TypeError:
                return np.nan

        unique_counts_df = pd.DataFrame(
            {"column": df.columns, "unique_values": [_safe_nunique(df[c]) for c in df.columns]}
        )
        report_lines.append("\n=== Unique Counts Per Column ===")
        report_lines.append(unique_counts_df.head(max_rows_to_show).to_string(index=False))

        # sample head
        report_lines.append("\n=== Head (10 rows) ===")
        report_lines.append(df.head(10).to_string(index=False))

        # end
        report_lines.append("\n=== End of EDA Report ===")

        # one giant print
        print("\n".join(report_lines))


def is_sparse_dtype(dtype):
    # check if a dtype is pandas sparse
        return pd.api.types.is_sparse(dtype)

def dollar_format(x, pos=None):
    # format money like $12,345
    return f"${x:,.0f}"

def format_hms(seconds):
    # format seconds to H:M:S and handle >24 hours
    seconds = int(seconds)
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    secs = seconds % 60
    return f"{hours:02d}:{minutes:02d}:{secs:02d}"

def try_read_csv(folder_path, file_name, **kwargs):
    # try to read a csv; file_name can be a full path
    full_path = os.path.join(folder_path, file_name) if folder_path else file_name
    if full_path and os.path.exists(full_path):
        try:
            return pd.read_csv(full_path, **kwargs)
        except Exception:
            return None
    return None

def list_csvs(folder_path):
    # list csv files (sorted)
    if not folder_path or not os.path.exists(folder_path):
        return []
    return sorted([f for f in os.listdir(folder_path) if f.lower().endswith(".csv")])

def simple_random_sample(data_frame, n_rows=None, frac=None, random_state=42):
    # sample without replacement
    if data_frame is None:
        raise ValueError("data_frame is None")
    total_rows = len(data_frame)
    if (n_rows is None) == (frac is None):
        raise ValueError("pass exactly one of n_rows or frac")

    if frac is not None:
        if not (0 < frac <= 1):
            raise ValueError("frac must be between 0 and 1")
        pick_rows = int(np.floor(frac * total_rows))
    else:
        if int(n_rows) <= 0:
            raise ValueError("n_rows must be > 0")
        pick_rows = min(int(n_rows), total_rows)

    if pick_rows >= total_rows:
        print("simple_random_sample: taking all rows")
        return data_frame.copy()

    start = time.perf_counter()
    rng = np.random.default_rng(random_state)
    pick_index = rng.choice(total_rows, size=pick_rows, replace=False)
    pick_index = np.sort(pick_index)  # keep original order
    out_df = data_frame.iloc[pick_index].copy()
    end = time.perf_counter()
    print(f"simple_random_sample: picked {len(out_df)} of {total_rows} rows in {round(end - start, 3)} sec")
    return out_df

def stratified_sample(data_frame, y, n_rows=None, frac=None, random_state=42):
    # stratified sample on labels y
    if data_frame is None:
        raise ValueError("data_frame is None")

    y_array = data_frame[y].to_numpy() if isinstance(y, str) else np.asarray(y)
    total_rows = len(data_frame)
    if len(y_array) != total_rows:
        raise ValueError("X and y length mismatch")

    # prefer n_rows if both given
    if n_rows is not None and frac is not None:
        frac = None
    if n_rows is None and frac is None:
        raise ValueError("provide n_rows or frac")

    if frac is not None:
        if not (0 < frac <= 1):
            raise ValueError("frac must be between 0 and 1")
        test_size = float(frac)
        use_frac, use_n = frac, None
    else:
        if int(n_rows) <= 0:
            raise ValueError("n_rows must be > 0")
        test_size = min(float(n_rows) / total_rows, 1.0)
        use_frac, use_n = None, int(n_rows)

    if test_size >= 1.0:
        print("stratified_sample: taking all rows")
        return data_frame.copy()

    _, counts = np.unique(y_array, return_counts=True)
    min_count = counts.min()

    # need at least 1 per class in both splits
    if min_count < 2 or (min_count * test_size < 1) or (min_count * (1.0 - test_size) < 1):
        print("stratified_sample: class counts too small for requested size, falling back to simple sample")
        return simple_random_sample(data_frame, n_rows=use_n, frac=use_frac, random_state=random_state)

    start = time.perf_counter()
    index_array = np.arange(total_rows)
    _, test_idx, _, _ = train_test_split(
        index_array,
        y_array,
        test_size=test_size,
        stratify=y_array,
        random_state=random_state
    )
    out_df = data_frame.iloc[np.sort(test_idx)].copy()  # keep original order
    end = time.perf_counter()
    print(f"stratified_sample: picked {len(out_df)} of {total_rows} rows in {round(end - start, 3)} sec")
    return out_df

def safe_kaggle_download(dataset_name):
    # download from kaggle with timing and errors
    print(f"download: starting {dataset_name}")
    start = time.perf_counter()
    try:
        path = kagglehub.dataset_download(dataset_name)
        end = time.perf_counter()
        print(f"download: done {dataset_name} -> {path} in {round(end - start, 3)} sec")
        return path
    except Exception as e:
        end = time.perf_counter()
        print(f"download: error {dataset_name} -> {str(e)} in {round(end - start, 3)} sec")
        return None

def coerce_datetime_columns(df):
    # convert likely date/time columns if they are strings
    if df is None:
        return None
    print("dates: converting possible date/time columns")
    for col_name in df.columns:
        lower = col_name.lower()
        if ("date" in lower) or ("time" in lower):
            s = df[col_name]
            try:
                if pd.api.types.is_object_dtype(s) or pd.api.types.is_string_dtype(s):
                    df[col_name] = pd.to_datetime(s, errors="coerce")
            except Exception:
                pass
    return df

def float_range(start, stop, step):
    # float range with guards and tolerance
    if step == 0:
        raise ValueError("step must not be 0")
    values = []
    value = float(start)
    tolerance = abs(step) / 1_000_000
    if step > 0:
        while value <= stop + tolerance:
            values.append(round(value, 12))
            value += step
    else:
        while value >= stop - tolerance:
            values.append(round(value, 12))
            value += step
    return values


In [3]:
# =============================
# Steam Loader
# =============================
def load_steam_dataset(base_path, n_rows=100_000, seed=42):
    print("steam: start")
    if base_path is None:
        print("steam: skip because base_path is None")
        return None

    games = try_read_csv(base_path, "games.csv", low_memory=False)
    users = try_read_csv(base_path, "users.csv", low_memory=False)
    recommendations = try_read_csv(base_path, "recommendations.csv", low_memory=False)

    metadata = None
    meta_path = os.path.join(base_path, "games_metadata.json")
    if os.path.exists(meta_path):
        try:
            metadata = pd.read_json(meta_path, lines=True)
        except Exception as e:
            print(f"steam: metadata read error -> {str(e)}")

    print(
        f"steam: shapes games={None if games is None else games.shape}, "
        f"users={None if users is None else users.shape}, "
        f"recs={None if recommendations is None else recommendations.shape}, "
        f"meta={None if metadata is None else metadata.shape}"
    )

    steam_table = None
    if recommendations is not None:
        if "is_recommended" in recommendations.columns:
            recs_sample = stratified_sample(recommendations, y="is_recommended", n_rows=n_rows, random_state=seed)
        else:
            recs_sample = simple_random_sample(recommendations, n_rows=n_rows, random_state=seed)

        games_plus = games
        if (
            metadata is not None
            and games is not None
            and "app_id" in metadata.columns
            and "app_id" in games.columns
        ):
            print("steam: merge games with metadata")
            games_plus = games.merge(metadata, on="app_id", how="left", suffixes=("", "_meta"))

        steam_table = recs_sample
        if games_plus is not None and "app_id" in recs_sample.columns and "app_id" in games_plus.columns:
            print("steam: merge recommendations with games")
            steam_table = steam_table.merge(games_plus, on="app_id", how="left", suffixes=("", "_game"))

        if users is not None and "user_id" in steam_table.columns and "user_id" in users.columns:
            print("steam: merge with users")
            steam_table = steam_table.merge(users, on="user_id", how="left", suffixes=("", "_user"))

        steam_table = coerce_datetime_columns(steam_table)
        print(f"steam: done shape={None if steam_table is None else steam_table.shape}")
    else:
        print("steam: skip because recommendations.csv is missing")

    return steam_table


# =============================
# Olist Loader
# =============================
def load_olist_dataset(base_path, n_rows=1_000_000, seed=42):
    print("olist: start")
    if base_path is None:
        print("olist: skip because base_path is None")
        return None

    olist_customers = try_read_csv(base_path, "olist_customers_dataset.csv", low_memory=False)
    olist_geolocation = try_read_csv(base_path, "olist_geolocation_dataset.csv", low_memory=False)
    olist_items = try_read_csv(base_path, "olist_order_items_dataset.csv", low_memory=False)
    olist_payments = try_read_csv(base_path, "olist_order_payments_dataset.csv", low_memory=False)
    olist_reviews = try_read_csv(base_path, "olist_order_reviews_dataset.csv", low_memory=False)
    olist_orders = try_read_csv(base_path, "olist_orders_dataset.csv", low_memory=False)
    olist_products = try_read_csv(base_path, "olist_products_dataset.csv", low_memory=False)
    olist_sellers = try_read_csv(base_path, "olist_sellers_dataset.csv", low_memory=False)
    olist_cat_trans = try_read_csv(base_path, "product_category_name_translation.csv", low_memory=False)

    print(
        "olist: shapes "
        f"customers={None if olist_customers is None else olist_customers.shape}, "
        f"geolocation={None if olist_geolocation is None else olist_geolocation.shape}, "
        f"items={None if olist_items is None else olist_items.shape}, "
        f"payments={None if olist_payments is None else olist_payments.shape}, "
        f"reviews={None if olist_reviews is None else olist_reviews.shape}, "
        f"orders={None if olist_orders is None else olist_orders.shape}, "
        f"products={None if olist_products is None else olist_products.shape}, "
        f"sellers={None if olist_sellers is None else olist_sellers.shape}, "
        f"cat_trans={None if olist_cat_trans is None else olist_cat_trans.shape}"
    )

    if not all(x is not None for x in [olist_orders, olist_items, olist_products, olist_sellers, olist_customers]):
        print("olist: skip because core tables are missing")
        return None

    print("olist: sample orders")
    orders_small = simple_random_sample(olist_orders, n_rows=min(n_rows, len(olist_orders)), random_state=seed)

    print("olist: filter items for sampled orders")
    items_small = olist_items[olist_items["order_id"].isin(orders_small["order_id"])].copy()

    if olist_cat_trans is not None and "product_category_name" in olist_products.columns:
        print("olist: merge category translation")
        products_en = olist_products.merge(olist_cat_trans, on="product_category_name", how="left")
    else:
        products_en = olist_products

    if olist_reviews is not None:
        print("olist: build product review stats")
        product_reviews = (
            items_small[["order_id", "product_id"]]
            .merge(olist_reviews[["order_id", "review_score"]], on="order_id", how="inner")
        )
        product_reviews = product_reviews.drop_duplicates(["order_id", "product_id"])
        product_stats = (
            product_reviews.groupby("product_id", as_index=False)
            .agg(
                review_count_product=("review_score", "count"),
                review_score_mean_product=("review_score", "mean"),
            )
        )
    else:
        product_stats = None

    print("olist: merge items, products, and sellers")
    items_ext = (
        items_small.merge(products_en, on="product_id", how="left")
        .merge(olist_sellers, on="seller_id", how="left", suffixes=("", "_seller"))
    )

    if olist_geolocation is not None:
        print("olist: build basic zip geo")
        geo_zip = (
            olist_geolocation.groupby("geolocation_zip_code_prefix", as_index=False).agg(
                geolocation_lat=("geolocation_lat", "mean"),
                geolocation_lng=("geolocation_lng", "mean"),
                geo_points=("geolocation_city", "count"),
            )
        )
        print("olist: merge customers with geo")
        customers_geo = (
            olist_customers.merge(
                geo_zip,
                left_on="customer_zip_code_prefix",
                right_on="geolocation_zip_code_prefix",
                how="left",
            )
            .drop(columns=["geolocation_zip_code_prefix"])
        )
    else:
        customers_geo = olist_customers

    if olist_payments is not None:
        print("olist: aggregate payments")
        payments_agg = (
            olist_payments.groupby("order_id", as_index=False).agg(
                payment_value_total=("payment_value", "sum"),
                payment_installments_max=("payment_installments", "max"),
                payment_count=("payment_type", "count"),
            )
        )
    else:
        payments_agg = None

    print("olist: assemble main table")
    olist_full = (
        orders_small.merge(customers_geo, on="customer_id", how="left")
        .merge(items_ext, on="order_id", how="left")
    )

    if payments_agg is not None:
        print("olist: merge payments")
        olist_full = olist_full.merge(payments_agg, on="order_id", how="left")

    if product_stats is not None:
        print("olist: merge product stats")
        olist_full = olist_full.merge(product_stats, on="product_id", how="left")

    olist_full = coerce_datetime_columns(olist_full)

    print(f"olist: shape after assemble {olist_full.shape}")
    print("olist: done")
    return olist_full


# =============================
# VG2019 Loader
# =============================
def load_vg2019_dataset(base_path, n_rows=1_000_000, seed=42):
    print("vg2019: start")
    if base_path is None:
        print("vg2019: skip because base_path is None")
        return None

    csv_files = list_csvs(base_path)
    pick = None
    for f in csv_files:
        if "vgsales" in f.lower():
            pick = f
            break
    target_csv = pick if pick else (csv_files[0] if csv_files else None)

    if target_csv is None:
        print("vg2019: skip because no csv found")
        return None

    full_path = os.path.join(base_path, target_csv)
    try:
        sales = pd.read_csv(full_path, low_memory=False)
    except Exception as e:
        print(f"vg2019: read error -> {str(e)}")
        return None

    print(f"vg2019: loaded {target_csv} with shape {sales.shape}")

    if "Genre" in sales.columns:
        print("vg2019: stratified sample by Genre")
        sales = stratified_sample(sales, y="Genre", n_rows=n_rows, random_state=seed)
    else:
        print("vg2019: simple random sample")
        sales = simple_random_sample(sales, n_rows=n_rows, random_state=seed)

    print(f"vg2019: done shape={sales.shape}")
    return sales


In [4]:
class KeepTrainColumns(BaseEstimator, TransformerMixin):
    # remembers training columns and reindexes any input to match
    def fit(self, X, y=None):
        if hasattr(X, "columns"):
            self.keep_columns_ = list(X.columns)
        else:
            self.keep_columns_ = None
        return self

    def transform(self, X):
        if self.keep_columns_ is None:
            return X
        if hasattr(X, "reindex"):
            return X.reindex(columns=self.keep_columns_, fill_value=0)
        return X


def predict_with_threshold(model, X, threshold=0.5):
    # turn scores into 0/1 using a chosen threshold
    import numpy as np
    if hasattr(model, "predict_proba"):
        scores = model.predict_proba(X)[:, 1]
    elif hasattr(model, "decision_function"):
        raw = model.decision_function(X)
        raw_min, raw_max = float(raw.min()), float(raw.max())
        scores = (raw - raw_min) / (raw_max - raw_min + 1e-9)
    else:
        scores = model.predict(X).astype(float)
    return (scores >= threshold).astype(int)


# =============================
# Model builder + tuner
# =============================
def build_and_tune_models(
    X_train, y_train,
    task_type,
    num_folds,
    num_iterations,
    oversample=False,
    oversample_method="random"
):

    # optional oversampling tools
    ImbPipeline = None
    RandomOverSampler = None
    SMOTE = None
    if oversample and str(task_type).strip().lower() == "classification":
        try:
            from imblearn.pipeline import Pipeline as ImbPipeline
            from imblearn.over_sampling import RandomOverSampler, SMOTE
        except Exception:
            print("imblearn not available. Oversampling disabled.")
            oversample = False

    # task settings
    task = str(task_type).strip().lower()
    if task == "classification":
        scoring = "f1_macro"
        selector_score_func = f_classif
        min_class = int(y_train.value_counts().min())
        eff_folds = max(2, min(int(num_folds), min_class))
        baseline_cv = StratifiedKFold(n_splits=eff_folds, shuffle=True, random_state=42)
        search_cv = StratifiedKFold(n_splits=eff_folds, shuffle=True, random_state=42)

        sampler_obj = None
        if oversample:
            if oversample_method == "smote":
                k_neighbors_for_smote = max(1, min(5, min_class - 1))
                if k_neighbors_for_smote < 1:
                    print("SMOTE not possible (minority class too small). Using RandomOverSampler.")
                    sampler_obj = RandomOverSampler(random_state=42)
                else:
                    sampler_obj = SMOTE(random_state=42, k_neighbors=k_neighbors_for_smote)
            else:
                sampler_obj = RandomOverSampler(random_state=42)

        class_weight_choice = None if oversample else "balanced"

        model_space = {
            "GBT": GradientBoostingClassifier(random_state=42),
            "RandomForest": RandomForestClassifier(random_state=42, class_weight=class_weight_choice, n_jobs=-1),
            "DecisionTree": DecisionTreeClassifier(random_state=42, class_weight=class_weight_choice),
            "LogisticRegression": LogisticRegression(solver="saga", max_iter=5000, class_weight=class_weight_choice),
            "LinearSVM": LinearSVC(max_iter=5000, class_weight=class_weight_choice),
            "NaiveBayes": GaussianNB(),
            "KNN": KNeighborsClassifier(),
            "Dummy": DummyClassifier(strategy="most_frequent", random_state=42),
        }
        metric_name = "F1_macro"
    elif task == "regression":
        scoring = "neg_mean_absolute_error"
        selector_score_func = f_regression
        eff_folds = max(2, int(num_folds))
        baseline_cv = KFold(n_splits=eff_folds, shuffle=True, random_state=42)
        search_cv = KFold(n_splits=eff_folds, shuffle=True, random_state=42)

        model_space = {
            "GBT": GradientBoostingRegressor(random_state=42),
            "RandomForest": RandomForestRegressor(random_state=42, n_jobs=-1),
            "DecisionTree": DecisionTreeRegressor(random_state=42),
            "LinearRegression": LinearRegression(),
            "Ridge": Ridge(max_iter=5000),
            "Lasso": Lasso(max_iter=5000),
            "ElasticNet": ElasticNet(max_iter=5000),
            "KNN": KNeighborsRegressor(),
            "Dummy": DummyRegressor(strategy="mean"),
        }
        metric_name = "CV_MAE"
        sampler_obj = None
    else:
        raise ValueError('task_type must be "classification" or "regression"')

    total_features = X_train.shape[1]
    feature_fractions = [0.10, 0.25, 0.50, 0.75, 1.00]

    # which models need scaling and selection
    needs_scaling = {"LogisticRegression", "LinearSVM", "KNN", "LinearRegression", "Ridge", "Lasso", "ElasticNet", "NaiveBayes"}
    skip_selection = {"Dummy"}
    tree_like = {"RandomForest", "DecisionTree", "GBT"}

    def k_from_fraction(frac, total_cols):
        if frac >= 1.0:
            return "all"
        k = int(max(1, math.ceil(frac * total_cols)))
        return min(k, total_cols)

    # dynamic KNN neighbors cap
    per_fold_train = int(len(X_train) * (eff_folds - 1) / eff_folds)
    max_knn_k = max(3, min(101, per_fold_train - 1))
    knn_ks = list(range(3, max_knn_k + 1, 2))

    def logspace_list(low_exp, high_exp, num):
        return list(np.logspace(low_exp, high_exp, num))

    def linspace_list(low_val, high_val, num):
        return list(np.linspace(low_val, high_val, num))

    param_spaces_classification = {
        "GBT": {
            "model__n_estimators": [100, 200, 300, 500],
            "model__learning_rate": logspace_list(-3, 0, 12),
            "model__max_depth": [2, 3, 4, 5],
            "model__subsample": linspace_list(0.6, 1.0, 5),
        },
        "RandomForest": {
            "model__n_estimators": [200, 400, 700],
            "model__max_depth": [None, 20, 40],
            "model__min_samples_split": [2, 5, 10],
            "model__min_samples_leaf": [1, 2, 4],
            "model__max_features": ["sqrt", "log2", None],
        },
        "DecisionTree": {
            "model__max_depth": [None, 10, 20, 40],
            "model__min_samples_split": [2, 5, 10],
            "model__min_samples_leaf": [1, 2, 4],
            "model__splitter": ["best", "random"],
        },
        "LogisticRegression": {
            "model__C": logspace_list(-3, 3, 20),
            "model__penalty": ["l1", "l2"],
            "model__solver": ["saga"],
        },
        "LinearSVM": {
            "model__C": logspace_list(-3, 3, 20),
            "model__loss": ["hinge", "squared_hinge"],
        },
        "NaiveBayes": {
            "model__var_smoothing": list(10 ** np.linspace(-11, -7, 9))
        },
        "KNN": {
            "model__n_neighbors": knn_ks,
            "model__weights": ["uniform", "distance"],
            "model__p": [1, 2],
            "model__leaf_size": list(range(10, 61, 10)),
        },
        "Dummy": {"model__strategy": ["most_frequent", "stratified", "uniform"]},
    }

    param_spaces_regression = {
        "GBT": {
            "model__n_estimators": [100, 200, 300, 500],
            "model__learning_rate": logspace_list(-3, 0, 12),
            "model__max_depth": [2, 3, 4, 5],
            "model__subsample": linspace_list(0.6, 1.0, 5),
        },
        "RandomForest": {
            "model__n_estimators": [200, 400, 700],
            "model__max_depth": [None, 20, 40],
            "model__min_samples_split": [2, 5, 10],
            "model__min_samples_leaf": [1, 2, 4],
            "model__max_features": ["sqrt", "log2", None],
        },
        "DecisionTree": {
            "model__max_depth": [None, 10, 20, 40],
            "model__min_samples_split": [2, 5, 10],
            "model__min_samples_leaf": [1, 2, 4],
            "model__splitter": ["best", "random"],
        },
        "LinearRegression": {},
        "Ridge": {"model__alpha": logspace_list(-3, 3, 20), "model__fit_intercept": [True, False]},
        "Lasso": {"model__alpha": logspace_list(-4, 1, 20), "model__fit_intercept": [True, False]},
        "ElasticNet": {"model__alpha": logspace_list(-4, 1, 20), "model__l1_ratio": linspace_list(0.1, 0.9, 9), "model__fit_intercept": [True, False]},
        "KNN": {
            "model__n_neighbors": knn_ks,
            "model__weights": ["uniform", "distance"],
            "model__p": [1, 2],
            "model__leaf_size": list(range(10, 61, 10)),
        },
        "Dummy": {"model__strategy": ["mean", "median"]},
    }

    # build a pipeline for a given k
    def make_pipeline_for_k(model_name, model_obj, k_value):
        # order: align -> impute -> variance -> select -> scale -> sampler -> model
        align_step = ("align", KeepTrainColumns())
        impute_step = ("impute", SimpleImputer(strategy="median"))
        variance_step = ("variance", VarianceThreshold(threshold=0.0))

        if model_name in skip_selection or model_name in tree_like:
            select_step = ("select", "passthrough")
        else:
            select_step = ("select", SelectKBest(score_func=selector_score_func, k=k_value))

        scale_step = ("scale", StandardScaler() if model_name in needs_scaling else "passthrough")

        steps = [align_step, impute_step, variance_step, select_step, scale_step]

        if task == "classification" and oversample and sampler_obj is not None:
            steps.append(("sampler", sampler_obj))

        steps.append(("model", model_obj))

        if oversample and ImbPipeline is not None and task == "classification":
            return ImbPipeline(steps)
        else:
            return SKPipeline(steps)

    # baseline sweep across models × k
    rows = []
    total_steps = len(model_space) * len(feature_fractions)
    step = 0
    print("Streaming results (each line is one model × feature count):")
    for model_name, model_obj in model_space.items():
        for frac in feature_fractions:
            step += 1
            k_val = k_from_fraction(frac, total_features)
            k_print = total_features if k_val == "all" else int(k_val)
            pipeline = make_pipeline_for_k(model_name, model_obj, k_val)
            scores = cross_val_score(pipeline, X_train, y_train, cv=baseline_cv, scoring=scoring, n_jobs=1)
            mean_score = float(np.mean(scores))
            std_score = float(np.std(scores))
            if task == "regression":
                display_mean = -mean_score
                display_std = float(np.std(-scores))
            else:
                display_mean = mean_score
                display_std = std_score
            rows.append({"Model": model_name, "K_features": k_print, "MeanScore": display_mean, "StdDev": display_std, "Metric": metric_name})
            print(f"[{step}/{total_steps}] {model_name} | k={k_print} | {metric_name}={display_mean:.6f} ± {display_std:.6f}", flush=True)

    results_df = pd.DataFrame(rows)
    if task == "classification":
        results_df = results_df.sort_values(by=["MeanScore", "Model"], ascending=[False, True]).reset_index(drop=True)
    else:
        results_df = results_df.sort_values(by=["MeanScore", "Model"], ascending=[True, True]).reset_index(drop=True)

    print("\n=== Baseline results (CV) ===")
    print(results_df[["Model", "K_features", "MeanScore", "StdDev", "Metric"]])

    best_row = results_df.iloc[0]
    best_model_name = str(best_row["Model"])
    best_k = int(best_row["K_features"])
    best_model_obj = model_space[best_model_name]
    k_val_for_search = "all" if best_k >= total_features else best_k
    best_pipeline = make_pipeline_for_k(best_model_name, best_model_obj, k_val_for_search)

    # pick search space
    search_space = (param_spaces_classification if task == "classification" else param_spaces_regression).get(best_model_name, {})
    if len(search_space) == 0:
        best_pipeline.fit(X_train, y_train)
        try:
            best_pipeline.input_columns_ = list(X_train.columns)
        except Exception:
            pass
        # threshold tuning only for classification
        if task == "classification":
            try:
                _tune_threshold_inplace(best_pipeline, X_train, y_train, search_cv)
            except Exception as e:
                print(f"[warn] threshold tuning failed: {e}")
        print("\nBest model had no tunable params. Returning fitted pipeline.")
        return best_pipeline

    # hyperparameter search
    search = RandomizedSearchCV(
        estimator=best_pipeline,
        param_distributions=search_space,
        n_iter=int(max(1, num_iterations)),
        scoring=scoring,
        cv=search_cv,
        random_state=42,
        n_jobs=-1,
        verbose=2
    )
    search.fit(X_train, y_train)

    # print tuned CV result
    if task == "regression":
        tuned_score_display = -float(search.best_score_)
        tuned_metric_name = "CV MAE"
    else:
        tuned_score_display = float(search.best_score_)
        tuned_metric_name = "F1 macro"

    print("\n=== Best model after randomized search ===")
    print(f"Model name: {best_model_name}")
    print(f"Number of features: {best_k}")
    print(f"Best hyperparameters: {search.best_params_}")
    print(f"Best CV score ({tuned_metric_name}): {tuned_score_display:.6f}")

    # remember training columns
    try:
        search.best_estimator_.input_columns_ = list(X_train.columns)
    except Exception:
        pass

    # threshold tuning only for classification
    if task == "classification":
        try:
            _tune_threshold_inplace(search.best_estimator_, X_train, y_train, search_cv)
        except Exception as e:
            print(f"[warn] threshold tuning failed: {e}")

    return search.best_estimator_


def _tune_threshold_inplace(fitted_estimator, X, y, cv):
    """
    Finds a good decision threshold using out-of-fold scores on the training set.
    Stores results on the estimator as .best_threshold_ and .best_threshold_cv_f1_.
    """
    import numpy as np
    from sklearn.model_selection import cross_val_predict
    from sklearn.metrics import f1_score, confusion_matrix

    # try probabilities first
    scores = None
    try:
        proba_oof = cross_val_predict(fitted_estimator, X, y, cv=cv, method="predict_proba", n_jobs=1)  # shape (n, 2)
        scores = proba_oof[:, 1]
    except Exception:
        pass

    # fallback to decision function
    if scores is None:
        try:
            decision_oof = cross_val_predict(fitted_estimator, X, y, cv=cv, method="decision_function", n_jobs=1)
            dec_min, dec_max = float(decision_oof.min()), float(decision_oof.max())
            scores = (decision_oof - dec_min) / (dec_max - dec_min + 1e-9)
        except Exception:
            pass

    # if no scores available, keep default threshold
    if scores is None:
        fitted_estimator.best_threshold_ = 0.5
        fitted_estimator.best_threshold_cv_f1_ = None
        print("[info] model does not expose scores for thresholding. Using 0.5.")
        return

    # sweep thresholds
    best_threshold = 0.5
    best_f1_macro = -1.0
    thresholds_to_try = np.linspace(0.05, 0.95, 19)

    for t in thresholds_to_try:
        y_hat = (scores >= t).astype(int)
        f1_macro_val = float(f1_score(y, y_hat, average="macro"))
        if f1_macro_val > best_f1_macro:
            best_f1_macro = f1_macro_val
            best_threshold = float(t)

    # show OOF result at best threshold
    y_hat_final = (scores >= best_threshold).astype(int)
    print("\n=== Threshold tuning (OOF on train) ===")
    print(f"Best threshold: {best_threshold:.2f} | F1_macro: {best_f1_macro:.6f}")
    print("Confusion matrix at best threshold:")
    print(confusion_matrix(y, y_hat_final))

    # store on estimator
    fitted_estimator.best_threshold_ = best_threshold
    fitted_estimator.best_threshold_cv_f1_ = best_f1_macro


# =============================
# Holdout evaluation helper (uses tuned threshold if available)
# =============================
def evaluate_on_holdout(model, X_test, y_test, task_type, threshold=None):
    import pandas as pd
    from sklearn.metrics import f1_score, mean_absolute_error, confusion_matrix

    # align columns to what the model saw at fit
    try:
        if hasattr(model, "input_columns_") and hasattr(X_test, "reindex"):
            X_test = X_test.reindex(columns=model.input_columns_, fill_value=0)
        elif hasattr(model, "named_steps") and "align" in getattr(model, "named_steps", {}):
            keep_cols = getattr(model.named_steps["align"], "keep_columns_", None)
            if keep_cols is not None and hasattr(X_test, "reindex"):
                X_test = X_test.reindex(columns=list(keep_cols), fill_value=0)
    except Exception as e:
        print(f"[warn] could not align columns: {e}")

    # choose prediction path
    if str(task_type).strip().lower() == "classification":
        final_threshold = threshold
        if final_threshold is None and hasattr(model, "best_threshold_"):
            final_threshold = float(model.best_threshold_)
        if final_threshold is not None:
            y_pred = predict_with_threshold(model, X_test, threshold=final_threshold)
        else:
            y_pred = model.predict(X_test)
    else:
        y_pred = model.predict(X_test)

    print("\n=== Holdout (time split) ===")
    if str(task_type).strip().lower() == "classification":
        f1 = float(f1_score(y_test, y_pred, average="macro"))
        print(f"F1 macro: {f1:.6f}")
        print("Confusion matrix:")
        print(confusion_matrix(y_test, y_pred))
        return f1
    else:
        mae = float(mean_absolute_error(y_test, y_pred))
        print(f"MAE: {mae:.6f}")
        return mae


In [5]:
# =========================
# Imports
# =========================
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.feature_selection import SelectKBest, mutual_info_classif, mutual_info_regression


# =========================
# Timer + memory helpers
# =========================
class SimpleTimer:
    def __init__(self, enabled=True):
        self.enabled = enabled
        self.t0 = time.perf_counter()

    def tick(self, label):
        if not self.enabled:
            return
        t = time.perf_counter() - self.t0
        print(f"[timer] {label}: {t:.2f} s")
        self.t0 = time.perf_counter()

def df_mem_gb(df):
    try:
        return float(df.memory_usage(deep=True).sum()) / (1024**3)
    except Exception:
        return float("nan")

def show_shape_mem(label, X_train=None, X_test=None):
    parts = [label]
    if X_train is not None:
        parts.append(f"X_train shape={tuple(X_train.shape)} mem={df_mem_gb(X_train):.3f} GB")
    if X_test is not None:
        parts.append(f"X_test shape={tuple(X_test.shape)} mem={df_mem_gb(X_test):.3f} GB")
    print("[info]", " | ".join(parts))


# =========================
# Text feature helpers (fast)
# =========================
def clean_keyword_name(s):
    s = str(s).lower().strip().replace(" ", "_")
    keep = []
    for ch in s:
        if ch.isalnum() or ch == "_":
            keep.append(ch)
    return "".join(keep)[:60]

def text_features_fit(X, keyword_map):
    keyword_map = keyword_map or {}
    new_cols = []
    new_parts = []

    for col, keywords in keyword_map.items():
        if col not in X.columns:
            continue
        if str(X[col].dtype) not in ["object", "category"]:
            continue
        if col == "tags":
            continue

        col_str = X[col].fillna("").astype(str).str.lower()
        len_col = f"{col}_len"
        wc_col = f"{col}_wc"

        part = {
            len_col: col_str.str.len(),
            wc_col: col_str.str.split().apply(len),
        }
        for kw in keywords:
            safe = clean_keyword_name(kw)
            name = f"{col}_has_{safe}"
            part[name] = col_str.str.contains(str(kw).lower(), regex=False).astype(np.uint8)

        df_part = pd.DataFrame(part, index=X.index)
        new_parts.append(df_part)
        new_cols.extend(df_part.columns.tolist())

    if new_parts:
        X_new = pd.concat(new_parts, axis=1)
        X = pd.concat([X, X_new], axis=1)

    return {"new_cols": new_cols, "keyword_map": keyword_map}

def text_features_apply(X, text_info):
    keyword_map = text_info.get("keyword_map") or {}
    new_parts = []

    for col, keywords in keyword_map.items():
        len_col = f"{col}_len"
        wc_col = f"{col}_wc"

        if col not in X.columns:
            part = {
                len_col: pd.Series(0, index=X.index, dtype=np.int64),
                wc_col: pd.Series(0, index=X.index, dtype=np.int64),
            }
            for kw in keywords:
                name = f"{col}_has_{clean_keyword_name(kw)}"
                part[name] = pd.Series(0, index=X.index, dtype=np.uint8)
            new_parts.append(pd.DataFrame(part, index=X.index))
            continue

        if str(X[col].dtype) not in ["object", "category"] or col == "tags":
            continue

        col_str = X[col].fillna("").astype(str).str.lower()
        part = {
            len_col: col_str.str.len(),
            wc_col: col_str.str.split().apply(len),
        }
        for kw in keywords:
            name = f"{col}_has_{clean_keyword_name(kw)}"
            part[name] = col_str.str.contains(str(kw).lower(), regex=False).astype(np.uint8)

        new_parts.append(pd.DataFrame(part, index=X.index))

    if new_parts:
        X_new = pd.concat(new_parts, axis=1)
        X = pd.concat([X, X_new], axis=1)

    for c in text_info.get("new_cols", []):
        if c not in X.columns:
            X[c] = 0

    return X


# =========================
# General helpers
# =========================
def datetimes_to_numeric_inplace(X):
    for c in X.columns:
        if np.issubdtype(X[c].dtype, np.datetime64):
            mask = X[c].isna()
            vals_int = X[c].values.astype("datetime64[ns]").astype("int64")
            arr = vals_int.astype("float64") / 1000000000.0
            if mask.any():
                arr[mask.values] = np.nan
            X[c] = arr
    return X

def downcast_numeric_inplace(X):
    for c in X.columns:
        dt = X[c].dtype
        if np.issubdtype(dt, np.floating):
            X[c] = X[c].astype("float32")
        elif np.issubdtype(dt, np.integer) and X[c].nunique(dropna=True) > 2:
            X[c] = X[c].astype("int32")
    return X

def scale_fit(method, X_train_num):
    if method == "standard":
        return StandardScaler().fit(X_train_num)
    if method == "minmax":
        return MinMaxScaler().fit(X_train_num)
    return None

def scale_numeric_only(X_train, X_test, scale_method):
    num_cols = X_train.select_dtypes(include=["number"]).columns.tolist()
    if not num_cols:
        return X_train, X_test
    scaler = scale_fit(scale_method, X_train[num_cols])
    if scaler is None:
        return X_train, X_test
    X_train[num_cols] = scaler.transform(X_train[num_cols]).astype("float32")
    X_test[num_cols] = scaler.transform(X_test[num_cols]).astype("float32")
    return X_train, X_test


# =========================
# Safer OHE with caps and drop of excluded
# =========================
def _auto_exclude_mask(series, max_unique=500, max_avg_len=25):
    s = series.fillna("Unknown").astype(str)
    nunq = int(s.nunique(dropna=False))
    avg_len = float(s.map(len).mean())
    return (nunq > max_unique) or (avg_len > max_avg_len and nunq > 50)

def _cap_categories(series, top_k=50, min_freq=5, other_label="Other"):
    s = series.fillna("Unknown").astype(str)
    vc = s.value_counts()
    kept = vc[vc >= min_freq].index.tolist()
    if top_k is not None and len(kept) > top_k:
        kept = vc.index[:top_k].tolist()
    mapped = s.where(s.isin(kept), other_label)
    return mapped.astype("category"), kept

def ohe_fit(
    X,
    exclude_cols=None,
    top_k_per_col=50,
    min_freq_per_col=5,
    auto_exclude=True,
    high_card_threshold=500,
    long_text_avglen=25,
):
    exclude = set(exclude_cols or [])
    value_map = {}

    X_tmp = X.drop(columns=list(exclude), errors="ignore").copy()

    obj_all = [c for c in X_tmp.select_dtypes(include=["object", "category"]).columns]

    auto_excluded = []
    for c in obj_all:
        s = X_tmp[c]
        if auto_exclude and _auto_exclude_mask(s, high_card_threshold, long_text_avglen):
            auto_excluded.append(c)

    excluded = list(exclude) + auto_excluded
    X_tmp = X_tmp.drop(columns=auto_excluded, errors="ignore")

    obj_cols = [c for c in X_tmp.select_dtypes(include=["object", "category"]).columns]
    for c in obj_cols:
        capped, kept = _cap_categories(X_tmp[c], top_k=top_k_per_col, min_freq=min_freq_per_col)
        X_tmp[c] = capped
        value_map[c] = kept

    X_ohe = pd.get_dummies(X_tmp, columns=obj_cols, dummy_na=False)
    schema_cols = X_ohe.columns.tolist()

    return {
        "obj_cols": obj_cols,
        "schema_cols": schema_cols,
        "value_map": value_map,
        "excluded": excluded,
        "other_label": "Other",
    }

def ohe_apply(X, ohe_info):
    obj_cols = ohe_info["obj_cols"]
    schema_cols = ohe_info["schema_cols"]
    value_map = ohe_info["value_map"]
    other = ohe_info.get("other_label", "Other")
    excluded = ohe_info.get("excluded", [])

    X_tmp = X.drop(columns=excluded, errors="ignore").copy()

    for c in obj_cols:
        if c in X_tmp.columns:
            s = X_tmp[c].fillna("Unknown").astype(str)
            kept = set(value_map.get(c, []))
            s = s.where(s.isin(kept), other).astype("category")
            X_tmp[c] = s

    X_ohe = pd.get_dummies(X_tmp, columns=[c for c in obj_cols if c in X_tmp.columns], dummy_na=False)
    X_ohe = X_ohe.reindex(columns=schema_cols, fill_value=0)
    return X_ohe


# =========================
# Outliers
# =========================
def outlier_bounds_fit(X_num, lower_q=0.025, upper_q=0.975, exclude_binary=True, sample_rows=200000):
    bounds = {}
    if lower_q is None or upper_q is None:
        return bounds
    X_use = X_num
    if len(X_num) > sample_rows:
        X_use = X_num.sample(n=sample_rows, random_state=123)
    for c in X_use.columns:
        vals = X_use[c].astype("float32")
        if exclude_binary and X_use[c].nunique(dropna=True) <= 2:
            continue
        lo = np.nanquantile(vals, lower_q)
        hi = np.nanquantile(vals, upper_q)
        if np.isfinite(lo) and np.isfinite(hi) and hi >= lo:
            bounds[c] = (float(lo), float(hi))
    return bounds

def outlier_mask(X, bounds):
    if not bounds:
        return pd.Series(True, index=X.index)
    m = pd.Series(True, index=X.index)
    for c, (lo, hi) in bounds.items():
        if c in X.columns:
            col = X[c].astype("float32")
            m &= (col >= lo) & (col <= hi)
    return m


# =========================
# Feature selection
# =========================
def forward_feature_selection(X, y, model,
                              scoring="neg_mean_absolute_error",
                              cv=5, tol=None, max_features=None, n_jobs=-1, verbose=False):
    try:
        feature_names = list(X.columns)
        X_arr = X.values
    except AttributeError:
        X_arr = X
        feature_names = [f"f{i}" for i in range(X_arr.shape[1])]

    selected_idx = []
    remaining_idx = list(range(X_arr.shape[1]))
    best_scores = []
    previous_score = float("inf")
    best_feature_set_idx = []
    best_score = float("inf")

    while remaining_idx:
        scores = {}
        for idx in remaining_idx:
            trial_idx = selected_idx + [idx]
            cv_score = -cross_val_score(
                model, X_arr[:, trial_idx], y,
                scoring=scoring, cv=cv, n_jobs=n_jobs
            ).mean()
            scores[idx] = cv_score

        best_idx = min(scores, key=scores.get)
        current_score = scores[best_idx]

        if tol is not None and previous_score - current_score < tol:
            if verbose:
                print("Stopping early (improvement < tol).")
            break

        selected_idx.append(best_idx)
        remaining_idx.remove(best_idx)
        best_scores.append(current_score)
        previous_score = current_score

        if verbose:
            name = feature_names[best_idx]
            print(f"Added {name} -> CV score = {current_score:.4f}")

        if current_score < best_score:
            best_score = current_score
            best_feature_set_idx = selected_idx.copy()

        if max_features is not None and len(selected_idx) >= max_features:
            break

    selected_features = [feature_names[i] for i in selected_idx]
    best_feature_set = [feature_names[i] for i in best_feature_set_idx]

    if not best_feature_set:
        best_feature_set = selected_features[:]
        best_score = best_scores[-1] if best_scores else float("inf")

    try:
        index = np.argmax(np.array(selected_features) == best_feature_set[-1])
        plt.figure(figsize=(10, 6))
        plt.plot(range(1, len(best_scores) + 1), best_scores, marker=".")
        plt.plot([index + 1], [best_score], marker="x")
        plt.xticks(range(1, len(selected_features) + 1),
                   selected_features, rotation=60, ha="right", fontsize=6)
        plt.title("Forward Feature Selection and CV Scores")
        plt.xlabel("Features Added")
        plt.ylabel("CV Score (MAE)")
        plt.grid()
        plt.tight_layout()
        plt.show()
    except Exception:
        pass

    print(f"Best Features: {best_feature_set}")
    print(f"Best CV MAE Score: {best_score:.4f}")
    return selected_features, best_scores, best_feature_set, best_score

def select_features(method, max_features, task_type, random_state, X_train, y_train):
    if method is None or method == "none":
        return X_train.columns.tolist()

    k = min(max_features, X_train.shape[1]) if max_features else X_train.shape[1]
    if k < 1:
        return X_train.columns.tolist()

    if method == "tree":
        est = RandomForestClassifier(random_state=random_state) if task_type == "classification" else RandomForestRegressor(random_state=random_state)
        est.fit(X_train, y_train)
        imp = pd.Series(est.feature_importances_, index=X_train.columns)
        return imp.nlargest(k).index.tolist()

    if method == "forward":
        if task_type != "regression":
            raise ValueError("Forward selection is only supported for regression tasks.")
        model = RandomForestRegressor(random_state=random_state)
        _, _, best_set, _ = forward_feature_selection(
            X=X_train, y=y_train, model=model,
            scoring="neg_mean_absolute_error", cv=3,
            tol=None, max_features=max_features, n_jobs=-1, verbose=True
        )
        return best_set

    if method == "mutual_info":
        sel = SelectKBest(mutual_info_classif if task_type == "classification" else mutual_info_regression, k=k)
        sel.fit(X_train, y_train)
        return X_train.columns[sel.get_support()].tolist()

    return X_train.columns.tolist()


# =========================
# Batched poly features
# =========================
def add_poly_features_batched(X_train, X_test, squares, pairs):
    train_parts = {}
    for c in squares:
        if c in X_train.columns:
            train_parts[f"{c}_sq"] = X_train[c].astype("float32") ** 2
    for a, b in pairs:
        if (a in X_train.columns) and (b in X_train.columns):
            name = f"{a}_x_{b}"
            train_parts[name] = (X_train[a].astype("float32") * X_train[b].astype("float32"))

    test_parts = {}
    for name in train_parts:
        if name.endswith("_sq"):
            c = name[:-3]
            if c in X_test.columns:
                test_parts[name] = X_test[c].astype("float32") ** 2
            else:
                test_parts[name] = pd.Series(0.0, index=X_test.index, dtype="float32")
        else:
            a, b = name.split("_x_")
            if (a in X_test.columns) and (b in X_test.columns):
                test_parts[name] = (X_test[a].astype("float32") * X_test[b].astype("float32"))
            else:
                test_parts[name] = pd.Series(0.0, index=X_test.index, dtype="float32")

    if train_parts:
        X_train = pd.concat([X_train, pd.DataFrame(train_parts, index=X_train.index)], axis=1)
    if test_parts:
        X_test = pd.concat([X_test, pd.DataFrame(test_parts, index=X_test.index)], axis=1)

    X_train = downcast_numeric_inplace(X_train.copy())
    X_test = downcast_numeric_inplace(X_test.copy())
    return X_train, X_test


# =========================
# Main prep
# =========================
def prepare_data(steam_df, olist_df, sales_df, test_size, random_state,
                 feature_selection, max_features, task_type, scale_method,
                 tag_min_count=5, tag_top_k=200,
                 outlier_lower_q=0.025, outlier_upper_q=0.975,
                 verbose=False,
                 ohe_top_k_per_col=50,
                 ohe_min_freq_per_col=5,
                 ohe_auto_exclude=True,
                 ohe_high_card_threshold=500,
                 ohe_long_text_avglen=25):
    feature_selection = (feature_selection or "none").lower()
    outputs = {}
    timer = SimpleTimer(enabled=verbose)

    # ---------- STEAM ----------
    steam = steam_df.copy()
    if task_type == "classification":
        steam["target"] = (steam["positive_ratio"] >= 80).astype(int)
    else:
        steam["target"] = steam["positive_ratio"]
    steam.dropna(subset=["target"], inplace=True)

    steam.drop(columns=["app_id", "user_id", "review_id", "positive_ratio"],
               errors="ignore", inplace=True)

    if {"date", "date_release"}.issubset(steam.columns):
        steam["days_since_release"] = (steam["date"] - steam["date_release"]).dt.days

    for col in ["is_recommended", "mac", "linux", "win", "steam_deck"]:
        if col in steam.columns:
            steam[col] = steam[col].astype(int)

    if "hours" in steam.columns:
        steam["log_hours"] = np.log1p(steam["hours"])
    if {"hours", "user_reviews"}.issubset(steam.columns):
        steam["reviews_per_hour"] = steam["user_reviews"] / (steam["hours"] + 0.000000001)

    X = steam.drop(columns=["target", "rating"], errors="ignore")
    y = steam["target"]
    strat = y if task_type == "classification" else None
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=strat
    )
    timer.tick("steam split")
    show_shape_mem("steam post split", X_train, X_test)

    datetimes_to_numeric_inplace(X_train)
    datetimes_to_numeric_inplace(X_test)
    timer.tick("steam datetime to numeric")

    steam_kw = {
        "title": ["vr", "dlc", "multiplayer", "co-op", "coop", "online", "free", "demo", "survival"],
        "description": ["vr", "dlc", "multiplayer", "co-op", "open world", "story", "puzzle", "horror", "early access"],
    }
    steam_text_info = text_features_fit(X_train, steam_kw)
    X_test = text_features_apply(X_test, steam_text_info)
    timer.tick("steam text features")
    show_shape_mem("steam after text", X_train, X_test)

    if "tags" in X_train.columns:
        from collections import Counter

        def tag_col_name(t):
            s = str(t).lower().strip().replace(" ", "_")
            return "tag_" + "".join(ch for ch in s if ch.isalnum() or ch == "_")[:60]

        cnt = Counter()
        for v in X_train["tags"].fillna("").values:
            lst = v if isinstance(v, list) else []
            for t in lst:
                cnt[t] += 1

        items = [(t, n) for t, n in cnt.items() if n >= tag_min_count]
        items.sort(key=lambda x: x[1], reverse=True)
        vocab = [t for t, _ in items[:tag_top_k]]
        tag_cols = [tag_col_name(t) for t in vocab]
        if verbose:
            print(f"[info] steam tags unique={len(cnt)}, kept={len(vocab)} (min_count={tag_min_count}, top_k={tag_top_k})")

        def add_tag_cols_fast(df):
            if "tags" in df.columns:
                tag_lists = df["tags"].apply(lambda v: v if isinstance(v, list) else [])
            else:
                tag_lists = pd.Series([[]] * len(df), index=df.index)
            new_data = {}
            for tag, col_name in zip(vocab, tag_cols):
                new_data[col_name] = np.fromiter(
                    (1 if tag in lst else 0 for lst in tag_lists),
                    dtype=np.uint8,
                    count=len(df)
                )
            new_df = pd.DataFrame(new_data, index=df.index)
            return pd.concat([df.drop(columns=["tags"], errors="ignore"), new_df], axis=1)

        X_train = add_tag_cols_fast(X_train)
        X_test = add_tag_cols_fast(X_test)
        timer.tick("steam tags multi-hot")
        show_shape_mem("steam after tags", X_train, X_test)

    explicit_exclude = [c for c in ["title", "description"] if c in X_train.columns]
    ohe_info = ohe_fit(
        X_train,
        exclude_cols=explicit_exclude,
        top_k_per_col=ohe_top_k_per_col,
        min_freq_per_col=ohe_min_freq_per_col,
        auto_exclude=ohe_auto_exclude,
        high_card_threshold=ohe_high_card_threshold,
        long_text_avglen=ohe_long_text_avglen,
    )
    if verbose and ohe_info.get("excluded"):
        print(f"[info] OHE auto-excluded (steam): {sorted(ohe_info['excluded'])[:10]}{'...' if len(ohe_info['excluded'])>10 else ''}")

    X_train = ohe_apply(X_train, ohe_info)
    X_test = ohe_apply(X_test, ohe_info)
    timer.tick("steam OHE")
    show_shape_mem("steam after OHE", X_train, X_test)

    # numeric-only safety
    X_train = X_train.select_dtypes(include=["number"]).copy()
    X_test = X_test.select_dtypes(include=["number"]).copy()
    if verbose:
        print("steam non-numeric after OHE:", X_train.select_dtypes(exclude=["number"]).columns.tolist())

    num_cols = X_train.select_dtypes(include=["number"]).columns.tolist()
    if num_cols:
        simp = SimpleImputer(strategy="median").fit(X_train[num_cols])
        X_train[num_cols] = simp.transform(X_train[num_cols])
        X_test[num_cols] = simp.transform(X_test[num_cols])
    timer.tick("steam impute")

    X_train = downcast_numeric_inplace(X_train)
    X_test = downcast_numeric_inplace(X_test)
    show_shape_mem("steam after impute+downcast", X_train, X_test)

    steam_squares = []
    if "log_hours" in X_train.columns:
        steam_squares.append("log_hours")
    elif "hours" in X_train.columns:
        steam_squares.append("hours")
    if "discount" in X_train.columns:
        steam_squares.append("discount")
    if "days_since_release" in X_train.columns:
        steam_squares.append("days_since_release")

    steam_pairs = []
    if ("price_final" in X_train.columns) and ("discount" in X_train.columns):
        steam_pairs.append(("price_final", "discount"))
    if ("price_original" in X_train.columns) and ("discount" in X_train.columns):
        steam_pairs.append(("price_original", "discount"))
    if ("days_since_release" in X_train.columns) and ("discount" in X_train.columns):
        steam_pairs.append(("days_since_release", "discount"))
    if ("user_reviews" in X_train.columns) and ("reviews" in X_train.columns):
        steam_pairs.append(("user_reviews", "reviews"))

    X_train, X_test = add_poly_features_batched(X_train, X_test, steam_squares, steam_pairs)
    timer.tick("steam poly")
    show_shape_mem("steam after poly", X_train, X_test)

    num_cols2 = X_train.select_dtypes(include=["number"]).columns.tolist()
    bounds = outlier_bounds_fit(
        X_train[num_cols2],
        lower_q=outlier_lower_q,
        upper_q=outlier_upper_q,
        exclude_binary=True,
        sample_rows=200000
    )
    m_tr = outlier_mask(X_train, bounds)
    m_te = outlier_mask(X_test, bounds)
    X_train = X_train[m_tr]
    y_train = y[y.index.isin(X_train.index)]
    X_test = X_test[m_te]
    y_test = y[y.index.isin(X_test.index)]
    timer.tick("steam outlier filter")
    show_shape_mem("steam after outlier", X_train, X_test)

    X_train, X_test = scale_numeric_only(X_train, X_test, scale_method)
    timer.tick("steam scale")

    keep_cols = select_features(feature_selection, max_features, task_type, random_state, X_train, y_train)
    X_train = X_train[keep_cols]
    X_test = X_test[keep_cols]
    timer.tick("steam select features")
    show_shape_mem("steam after select", X_train, X_test)

    outputs["steam"] = (X_train, X_test, y_train, y_test)

    # ---------- OLIST ----------
    olist = olist_df.copy()
    if task_type == "classification":
        olist["target"] = (olist["review_score_mean_product"] >= 4.0).astype(int)
    else:
        olist["target"] = olist["review_score_mean_product"]
    olist.dropna(subset=["target"], inplace=True)
    olist.drop(columns=["order_id", "customer_id", "customer_unique_id"],
               errors="ignore", inplace=True)

    olist["delivery_delay"] = (olist["order_estimated_delivery_date"] - olist["order_purchase_timestamp"]).dt.days
    denom = olist["payment_installments_max"].replace(0, 1)
    olist["avg_installment"] = olist["payment_value_total"] / denom

    if {"product_length_cm", "product_width_cm", "product_height_cm"}.issubset(olist.columns):
        olist["product_volume_cm3"] = (
            olist["product_length_cm"] * olist["product_width_cm"] * olist["product_height_cm"]
        )

    X = olist.drop(columns=["review_score_mean_product", "target"], errors="ignore")
    y = olist["target"]
    strat = y if task_type == "classification" else None
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=strat
    )
    timer.tick("olist split")
    show_shape_mem("olist post split", X_train, X_test)

    datetimes_to_numeric_inplace(X_train)
    datetimes_to_numeric_inplace(X_test)
    timer.tick("olist datetime to numeric")

    olist_kw = {
        "order_status": ["delivered", "shipped", "canceled", "invoiced", "processing"],
        "product_category_name": ["moveis", "auto", "pet", "perfumaria", "utilidades", "brinquedos"]
    }
    olist_text_info = text_features_fit(X_train, olist_kw)
    X_test = text_features_apply(X_test, olist_text_info)
    timer.tick("olist text features")

    ohe_info = ohe_fit(
        X_train,
        exclude_cols=[],  # add explicit excludes if needed
        top_k_per_col=ohe_top_k_per_col,
        min_freq_per_col=ohe_min_freq_per_col,
        auto_exclude=ohe_auto_exclude,
        high_card_threshold=ohe_high_card_threshold,
        long_text_avglen=ohe_long_text_avglen,
    )
    if verbose and ohe_info.get("excluded"):
        print(f"[info] OHE auto-excluded (olist): {sorted(ohe_info['excluded'])[:10]}{'...' if len(ohe_info['excluded'])>10 else ''}")
    X_train = ohe_apply(X_train, ohe_info)
    X_test = ohe_apply(X_test, ohe_info)
    timer.tick("olist OHE")
    show_shape_mem("olist after OHE", X_train, X_test)

    X_train = X_train.select_dtypes(include=["number"]).copy()
    X_test = X_test.select_dtypes(include=["number"]).copy()

    num_cols = X_train.select_dtypes(include=["number"]).columns.tolist()
    if num_cols:
        simp = SimpleImputer(strategy="median").fit(X_train[num_cols])
        X_train[num_cols] = simp.transform(X_train[num_cols])
        X_test[num_cols] = simp.transform(X_test[num_cols])
    timer.tick("olist impute")

    X_train = downcast_numeric_inplace(X_train)
    X_test = downcast_numeric_inplace(X_test)
    show_shape_mem("olist after impute+downcast", X_train, X_test)

    olist_squares = []
    if "delivery_delay" in X_train.columns:
        olist_squares.append("delivery_delay")
    if "price" in X_train.columns:
        olist_squares.append("price")
    if "freight_value" in X_train.columns:
        olist_squares.append("freight_value")

    olist_pairs = []
    if ("freight_value" in X_train.columns) and ("product_weight_g" in X_train.columns):
        olist_pairs.append(("freight_value", "product_weight_g"))
    if ("freight_value" in X_train.columns) and ("product_volume_cm3" in X_train.columns):
        olist_pairs.append(("freight_value", "product_volume_cm3"))
    if ("delivery_delay" in X_train.columns) and ("price" in X_train.columns):
        olist_pairs.append(("delivery_delay", "price"))
    if ("delivery_delay" in X_train.columns) and ("freight_value" in X_train.columns):
        olist_pairs.append(("delivery_delay", "freight_value"))
    if ("payment_installments_max" in X_train.columns) and ("price" in X_train.columns):
        olist_pairs.append(("payment_installments_max", "price"))

    X_train, X_test = add_poly_features_batched(X_train, X_test, olist_squares, olist_pairs)
    timer.tick("olist poly")
    show_shape_mem("olist after poly", X_train, X_test)

    num_cols2 = X_train.select_dtypes(include=["number"]).columns.tolist()
    bounds = outlier_bounds_fit(
        X_train[num_cols2],
        lower_q=outlier_lower_q,
        upper_q=outlier_upper_q,
        exclude_binary=True,
        sample_rows=200000
    )
    m_tr = outlier_mask(X_train, bounds)
    m_te = outlier_mask(X_test, bounds)
    X_train = X_train[m_tr]
    y_train = y[y.index.isin(X_train.index)]
    X_test = X_test[m_te]
    y_test = y[y.index.isin(X_test.index)]
    timer.tick("olist outlier filter")
    show_shape_mem("olist after outlier", X_train, X_test)

    X_train, X_test = scale_numeric_only(X_train, X_test, scale_method)
    timer.tick("olist scale")

    keep_cols = select_features(feature_selection, max_features, task_type, random_state, X_train, y_train)
    X_train = X_train[keep_cols]
    X_test = X_test[keep_cols]
    timer.tick("olist select features")
    show_shape_mem("olist after select", X_train, X_test)

    outputs["olist"] = (X_train, X_test, y_train, y_test)

    # ---------- SALES ----------
    sales = sales_df.copy()
    if task_type == "classification":
        sales["target"] = (sales["Critic_Score"] >= 8.0).astype(int)
    else:
        sales["target"] = sales["Critic_Score"]
    sales.dropna(subset=["target"], inplace=True)

    for c in ["ESRB_Rating", "Genre", "Platform", "Publisher", "Developer"]:
        if c in sales.columns:
            sales[c] = sales[c].fillna("Unknown")

    X = sales.drop(columns=["target", "Critic_Score"], errors="ignore")
    y = sales["target"]
    strat = y if task_type == "classification" else None
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=strat
    )
    timer.tick("sales split")
    show_shape_mem("sales post split", X_train, X_test)

    datetimes_to_numeric_inplace(X_train)
    datetimes_to_numeric_inplace(X_test)
    timer.tick("sales datetime to numeric")

    sales_kw = {
        "Name": ["mario", "pokemon", "zelda", "call of duty", "fifa", "minecraft", "final fantasy"],
        "Genre": ["action", "sports", "shooter", "racing", "role", "adventure", "platform", "puzzle"],
        "Publisher": ["nintendo", "electronic arts", "ea", "activision", "ubisoft", "sony", "sega"],
        "ESRB_Rating": ["e", "t", "m"]
    }
    sales_text_info = text_features_fit(X_train, sales_kw)
    X_test = text_features_apply(X_test, sales_text_info)
    timer.tick("sales text features")

    ohe_info = ohe_fit(
        X_train,
        exclude_cols=[],  # add explicit excludes if needed
        top_k_per_col=ohe_top_k_per_col,
        min_freq_per_col=ohe_min_freq_per_col,
        auto_exclude=ohe_auto_exclude,
        high_card_threshold=ohe_high_card_threshold,
        long_text_avglen=ohe_long_text_avglen,
    )
    if verbose and ohe_info.get("excluded"):
        print(f"[info] OHE auto-excluded (sales): {sorted(ohe_info['excluded'])[:10]}{'...' if len(ohe_info['excluded'])>10 else ''}")
    X_train = ohe_apply(X_train, ohe_info)
    X_test = ohe_apply(X_test, ohe_info)
    timer.tick("sales OHE")
    show_shape_mem("sales after OHE", X_train, X_test)

    X_train = X_train.select_dtypes(include=["number"]).copy()
    X_test = X_test.select_dtypes(include=["number"]).copy()

    num_cols = X_train.select_dtypes(include=["number"]).columns.tolist()
    if num_cols:
        simp = SimpleImputer(strategy="median").fit(X_train[num_cols])
        X_train[num_cols] = simp.transform(X_train[num_cols])
        X_test[num_cols] = simp.transform(X_test[num_cols])
    timer.tick("sales impute")

    X_train = downcast_numeric_inplace(X_train)
    X_test = downcast_numeric_inplace(X_test)
    show_shape_mem("sales after impute+downcast", X_train, X_test)

    sales_squares = []
    if "Year" in X_train.columns:
        sales_squares.append("Year")
    if "User_Score" in X_train.columns:
        sales_squares.append("User_Score")

    sales_pairs = []
    if ("NA_Sales" in X_train.columns) and ("PAL_Sales" in X_train.columns):
        sales_pairs.append(("NA_Sales", "PAL_Sales"))
    if ("NA_Sales" in X_train.columns) and ("JP_Sales" in X_train.columns):
        sales_pairs.append(("NA_Sales", "JP_Sales"))
    if ("PAL_Sales" in X_train.columns) and ("JP_Sales" in X_train.columns):
        sales_pairs.append(("PAL_Sales", "JP_Sales"))

    X_train, X_test = add_poly_features_batched(X_train, X_test, sales_squares, sales_pairs)
    timer.tick("sales poly")
    show_shape_mem("sales after poly", X_train, X_test)

    num_cols2 = X_train.select_dtypes(include=["number"]).columns.tolist()
    bounds = outlier_bounds_fit(
        X_train[num_cols2],
        lower_q=outlier_lower_q,
        upper_q=outlier_upper_q,
        exclude_binary=True,
        sample_rows=200000
    )
    m_tr = outlier_mask(X_train, bounds)
    m_te = outlier_mask(X_test, bounds)
    X_train = X_train[m_tr]
    y_train = y[y.index.isin(X_train.index)]
    X_test = X_test[m_te]
    y_test = y[y.index.isin(X_test.index)]
    timer.tick("sales outlier filter")
    show_shape_mem("sales after outlier", X_train, X_test)

    X_train, X_test = scale_numeric_only(X_train, X_test, scale_method)
    timer.tick("sales scale")

    keep_cols = select_features(feature_selection, max_features, task_type, random_state, X_train, y_train)
    X_train = X_train[keep_cols]
    X_test = X_test[keep_cols]
    timer.tick("sales select features")
    show_shape_mem("sales after select", X_train, X_test)

    outputs["sales"] = (X_train, X_test, y_train, y_test)

    # ---------- Shape summary ----------
    for name, parts in outputs.items():
        Xtr, Xte, ytr, yte = parts
        print(f"[{name}] X_train: {Xtr.shape} | X_test: {Xte.shape} | y_train: {ytr.shape} | y_test: {yte.shape}")

    return outputs


In [6]:
# Download Paths
steam_path = safe_kaggle_download("antonkozyriev/game-recommendations-on-steam")
olist_path = safe_kaggle_download("olistbr/brazilian-ecommerce")
vg2019_path = safe_kaggle_download("ashaheedq/video-games-sales-2019")

# Load All
start_total = time.perf_counter()
steam = load_steam_dataset(steam_path, n_rows=N_ROWS, seed=random_state)
olist = load_olist_dataset(olist_path, n_rows=N_ROWS, seed=random_state)
sales = load_vg2019_dataset(vg2019_path, n_rows=N_ROWS, seed=random_state)
end_total = time.perf_counter()
print(f"main: load all done in {round(end_total - start_total, 3)} sec ({format_hms(end_total - start_total)})")

# Download Shapes
print("download: shapes summary")
print(f"download: steam shape = {None if steam is None else steam.shape}")
print(f"download: olist shape = {None if olist is None else olist.shape}")
print(f"download: sales shape = {None if sales is None else sales.shape}")

download: starting antonkozyriev/game-recommendations-on-steam
download: done antonkozyriev/game-recommendations-on-steam -> /Users/chandlercampbell/.cache/kagglehub/datasets/antonkozyriev/game-recommendations-on-steam/versions/28 in 0.315 sec
download: starting olistbr/brazilian-ecommerce
download: done olistbr/brazilian-ecommerce -> /Users/chandlercampbell/.cache/kagglehub/datasets/olistbr/brazilian-ecommerce/versions/2 in 0.173 sec
download: starting ashaheedq/video-games-sales-2019
download: done ashaheedq/video-games-sales-2019 -> /Users/chandlercampbell/.cache/kagglehub/datasets/ashaheedq/video-games-sales-2019/versions/2 in 0.184 sec
steam: start
steam: shapes games=(50872, 13), users=(14306064, 3), recs=(41154794, 8), meta=(50872, 3)
stratified_sample: picked 1000001 of 41154794 rows in 5.889 sec
steam: merge games with metadata
steam: merge recommendations with games
steam: merge with users
dates: converting possible date/time columns
steam: done shape=(1000001, 24)
olist: sta

In [7]:
robust_eda(steam, name="steam")
robust_eda(olist, name="olist")
robust_eda(sales, name="sales")


=== Robust EDA Report: steam ===

=== Info ===
   rows  columns  memory_bytes
1000001       24     588511404

=== Dtypes ===
        column          dtype
        app_id          int64
          date datetime64[ns]
  date_release datetime64[ns]
   description         object
      discount        float64
         funny          int64
       helpful          int64
         hours        float64
is_recommended           bool
         linux           bool
           mac           bool
positive_ratio          int64
   price_final        float64
price_original        float64
      products          int64
        rating         object
     review_id          int64
       reviews          int64
    steam_deck           bool
          tags         object
         title         object
       user_id          int64
  user_reviews          int64
           win           bool

=== Missing Values ===
        column  missing_count  missing_percent
        app_id              0         0.000000
       

# Classification

In [10]:

# Classification call
splits = prepare_data(
    steam, olist, sales,
    test_size=0.2,
    random_state=42,
    feature_selection="tree",  # try none first
    max_features=50,
    task_type="classification",
    scale_method="standard",
    tag_min_count=5, tag_top_k=100,
    verbose=True,
    ohe_top_k_per_col=50,        # cap per column
    ohe_min_freq_per_col=5,
    ohe_auto_exclude=True,       # auto skip long/free-text
    ohe_high_card_threshold=500, # tune if needed
    ohe_long_text_avglen=25
)

X_train_steam, X_test_steam, y_train_steam, y_test_steam = splits["steam"]
X_train_olist, X_test_olist, y_train_olist, y_test_olist = splits["olist"]
X_train_sales, X_test_sales, y_train_sales, y_test_sales = splits["sales"]

print("\n=== STEAM Dataset ===")
best_steam_model = build_and_tune_models(
    X_train_steam, y_train_steam,
    task_type="classification",
    num_folds=3,
    num_iterations=20,
    oversample=True
)

score_steam = evaluate_on_holdout(best_steam_model, X_test_steam, y_test_steam, task_type="classification")

print("steam threshold:", getattr(best_steam_model, "best_threshold_", None))

print("\n=== OLIST Dataset ===")
best_olist_model = build_and_tune_models(
    X_train_olist, y_train_olist,
    task_type="classification",
    num_folds=3,
    num_iterations=20,
    oversample=True
)

score_olist = evaluate_on_holdout(best_olist_model, X_test_olist, y_test_olist, task_type="classification")

print("olist threshold:", getattr(best_olist_model, "best_threshold_", None))

print("\n=== SALES Dataset ===")
best_sales_model = build_and_tune_models(
    X_train_sales, y_train_sales,
    task_type="classification",
    num_folds=3,
    num_iterations=20,
    oversample=True
) 

score_sales = evaluate_on_holdout(best_sales_model, X_test_sales, y_test_sales, task_type="classification")

print("sales threshold:", getattr(best_sales_model, "best_threshold_", None))


[timer] steam split: 0.33 s
[info] steam post split | X_train shape=(800000, 22) mem=0.438 GB | X_test shape=(200001, 22) mem=0.109 GB
[timer] steam datetime to numeric: 0.20 s
[timer] steam text features: 4.03 s
[info] steam after text | X_train shape=(800000, 22) mem=0.438 GB | X_test shape=(200001, 43) mem=0.119 GB
[info] steam tags unique=441, kept=100 (min_count=5, top_k=100)
[timer] steam tags multi-hot: 11.02 s
[info] steam after tags | X_train shape=(800000, 121) mem=0.399 GB | X_test shape=(200001, 142) mem=0.109 GB
[info] OHE auto-excluded (steam): ['description', 'title']
[timer] steam OHE: 0.33 s
[info] steam after OHE | X_train shape=(800000, 119) mem=0.194 GB | X_test shape=(200001, 119) mem=0.048 GB
steam non-numeric after OHE: []
[timer] steam impute: 3.96 s
[info] steam after impute+downcast | X_train shape=(800000, 119) mem=0.361 GB | X_test shape=(200001, 119) mem=0.090 GB
[timer] steam poly: 0.34 s
[info] steam after poly | X_train shape=(800000, 126) mem=0.381 GB |

KeyboardInterrupt: 

# Regression

In [None]:

# Regression call
splits = prepare_data(
    steam, olist, sales,
    task_type="regression",
    random_state=42,
    feature_selection="none",  # try none first
    max_features=None,
    scale_method="standard",
    tag_min_count=5, tag_top_k=200,
    verbose=True,
    ohe_top_k_per_col=50,        # cap per column
    ohe_min_freq_per_col=5,
    ohe_auto_exclude=True,       # auto skip long/free-text
    ohe_high_card_threshold=500, # tune if needed
    ohe_long_text_avglen=25,
    test_size=0.2
)

X_train_steam, X_test_steam, y_train_steam, y_test_steam = splits["steam"]
X_train_olist, X_test_olist, y_train_olist, y_test_olist = splits["olist"]
X_train_sales, X_test_sales, y_train_sales, y_test_sales = splits["sales"]

best_steam_model = build_and_tune_models(
    X_train_steam, y_train_steam,
    task_type="regression",
    num_folds=3,
    num_iterations=20
)

best_olist_model = build_and_tune_models(
    X_train_olist, y_train_olist,
    task_type="regression",
    num_folds=3,
    num_iterations=20
)

best_sales_model = build_and_tune_models(
    X_train_sales, y_train_sales,
    task_type="regression",
    num_folds=3,
    num_iterations=20
)

NameError: name 'prepare_all' is not defined