In [1]:
from typing import Dict, Tuple

import duckdb
import numpy as np
import pandas as pd
import pyarrow as pa


def set_attr(conn: duckdb.DuckDBPyConnection) -> None:
    conn.execute("SET max_memory='64GB';")
    conn.execute("SET threads=4;")
    conn.execute("SET enable_progress_bar=1;")
    conn.execute("SET temp_directory = 'C:/Projet/tmp/';")
    conn.execute("SET disable_parquet_prefetching = TRUE;")
    conn.execute("SET parquet_metadata_cache = TRUE;")


seed = 42

In [2]:
def _build_projection(mapping: Dict[str, Dict[str, Tuple]]) -> str:
    """Return the SELECT list that converts invalid values to NULL and applies alias/type."""
    parts = []
    for raw, info in mapping.items():
        alias = info["alias"]
        inval = info["invalid"]
        typ = info["type"]

        if not inval:
            parts.append(f"CAST({raw} AS {typ}) AS {alias}")
            continue

        # Proper quoting for string invalids
        in_list = ", ".join(f"'{v}'" if isinstance(v, str) else str(v) for v in inval)
        parts.append(
            f"CAST(CASE WHEN {raw} IN ({in_list}) THEN NULL ELSE {raw} END AS {typ}) AS {alias}"
        )
    return ",\n       ".join(parts)


def extract(
    conn: duckdb.DuckDBPyConnection,
    parquet_path: str,
    mapping: Dict[str, Dict[str, Tuple]],
    *,
    show=False,
) -> None:
    """Read Parquet, clean invalid values, and create stg.transactions."""
    projection = _build_projection(mapping)

    conn.execute("CREATE SCHEMA IF NOT EXISTS stg")
    conn.execute("DROP TABLE IF EXISTS stg.transactions")

    conn.execute(
        f"""
        CREATE TABLE stg.transactions AS
        SELECT {projection}
        FROM read_parquet('{parquet_path}');
        """
    )
    if show:
        conn.table("stg.transactions").show(max_rows=20)


In [3]:
columns_map = {
    # Montants ----------------------------------------------------------
    "FLT_DEP_MNT": {"alias": "mnt_depassement", "invalid": (), "type": "DOUBLE"},
    "FLT_PAI_MNT": {"alias": "mnt_pay", "invalid": (), "type": "DOUBLE"},
    "FLT_REM_MNT": {"alias": "mnt_rem", "invalid": (), "type": "DOUBLE"},
    # Période -----------------------------------------------------------
    "SOI_ANN": {"alias": "ann_soin", "invalid": ("0000", "0001"), "type": "USMALLINT"},
    "SOI_MOI": {"alias": "mois_soin", "invalid": ("00",), "type": "UTINYINT"},
    # Bénéficiaire ------------------------------------------------------
    "AGE_BEN_SNDS": {"alias": "age", "invalid": (99,), "type": "UTINYINT"},
    "BEN_RES_REG": {"alias": "region_ben", "invalid": (99,), "type": "UTINYINT"},
    "BEN_SEX_COD": {"alias": "sexe", "invalid": (99,), "type": "UTINYINT"},
    # Prestation --------------------------------------------------------
    "ASU_NAT": {"alias": "nat_assurance", "invalid": (0, 99), "type": "UTINYINT"},
    "CPT_ENV_TYP": {"alias": "type_envlp", "invalid": (9, 98), "type": "UTINYINT"},
    "DRG_AFF_NAT": {
        "alias": "nat_destinataire",
        "invalid": (0, 99),
        "type": "UTINYINT",
    },
    "PRS_PPU_SEC": {"alias": "code_secteur", "invalid": (9,), "type": "UTINYINT"},
    "PRS_REM_TYP": {"alias": "type_remb", "invalid": (99,), "type": "UTINYINT"},
    # Executant ---------------------------------------------------------
    "PRS_FJH_TYP": {
        "alias": "forfait_journalier",
        "invalid": (8, 9),
        "type": "UTINYINT",
    },
    "DDP_SPE_COD": {
        "alias": "discipline_exe",
        "invalid": (0, 121),
        "type": "UTINYINT",
    },
    "ETE_CAT_SNDS": {
        "alias": "categorie_etablissement",
        "invalid": (9999,),
        "type": "USMALLINT",
    },
    "ETE_REG_COD": {"alias": "region_exe", "invalid": (99,), "type": "UTINYINT"},
    "ETE_TYP_SNDS": {"alias": "type_exe", "invalid": (99,), "type": "UTINYINT"},
    "EXE_INS_REG": {"alias": "region_ins", "invalid": (99,), "type": "UTINYINT"},
    "MDT_TYP_COD": {"alias": "mode_traitement", "invalid": (9,), "type": "UTINYINT"},
    "PSE_ACT_CAT": {"alias": "categorie_exe", "invalid": (99,), "type": "UTINYINT"},
    "PSE_ACT_SNDS": {"alias": "nature_exe", "invalid": (0, 99), "type": "UTINYINT"},
    "PSE_SPE_SNDS": {"alias": "specialite_exe", "invalid": (0, 99), "type": "UTINYINT"},
    "PSE_STJ_SNDS": {
        "alias": "stat_juridique_exe",
        "invalid": (9,),
        "type": "UTINYINT",
    },
}


In [4]:
with duckdb.connect(database="projet.db", read_only=False) as conn:
    set_attr(conn)
    extract(conn, "C:\Projet\Raw_data\A202401.parquet", columns_map, show=True)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌─────────────────┬─────────┬─────────┬──────────┬───────────┬───────┬────────────┬───────┬───────────────┬────────────┬──────────────────┬──────────────┬───────────┬────────────────────┬────────────────┬─────────────────────────┬────────────┬──────────┬────────────┬─────────────────┬───────────────┬────────────┬────────────────┬────────────────────┐
│ mnt_depassement │ mnt_pay │ mnt_rem │ ann_soin │ mois_soin │  age  │ region_ben │ sexe  │ nat_assurance │ type_envlp │ nat_destinataire │ code_secteur │ type_remb │ forfait_journalier │ discipline_exe │ categorie_etablissement │ region_exe │ type_exe │ region_ins │ mode_traitement │ categorie_exe │ nature_exe │ specialite_exe │ stat_juridique_exe │
│     double      │ double  │ double  │  uint16  │   uint8   │ uint8 │   uint8    │ uint8 │     uint8     │   uint8    │      uint8       │    uint8     │   uint8   │       uint8        │     uint8      │         uint16          │   uint8    │  uint8   │   uint8    │      uint8      │     uint

In [5]:
def exploratoire(
    conn: duckdb.DuckDBPyConnection,
    stg_table: str,
    columns_map: Dict[str, Dict[str, Tuple]],
    *,
    show: bool = False,
) -> duckdb.DuckDBPyRelation:
    """Compute per‑column null statistics after extract()."""
    conn.execute(
        """
        CREATE OR REPLACE TABLE stg.exploratoire (
            column_name TEXT,
            nb_total BIGINT,
            nb_inutilisables BIGINT,
            pct_inutilisables DOUBLE
        );
        """
    )

    for meta in columns_map.values():
        if not meta["invalid"]:
            continue
        col = meta["alias"]
        conn.execute(
            f"""
            INSERT INTO stg.exploratoire
            SELECT
                '{col}',
                COUNT(*) AS nb_total,
                COUNT(*) FILTER (WHERE {col} IS NULL) AS nb_inutilisables,
                ROUND(nb_inutilisables / nb_total, 6) AS pct_inutilisables
            FROM {stg_table};
            """
        )

    if show:
        conn.table("stg.exploratoire").show()
    else:
        return conn.table("stg.exploratoire")

In [6]:
with duckdb.connect(database="projet.db", read_only=False) as conn:
    set_attr(conn)
    exploratoire(conn, "stg.transactions", columns_map, show=True)

┌─────────────────────────┬──────────┬──────────────────┬───────────────────┐
│       column_name       │ nb_total │ nb_inutilisables │ pct_inutilisables │
│         varchar         │  int64   │      int64       │      double       │
├─────────────────────────┼──────────┼──────────────────┼───────────────────┤
│ ann_soin                │ 38758176 │            15608 │          0.000403 │
│ mois_soin               │ 38758176 │            15604 │          0.000403 │
│ age                     │ 38758176 │            40558 │          0.001046 │
│ region_ben              │ 38758176 │          2427121 │          0.062622 │
│ sexe                    │ 38758176 │                0 │               0.0 │
│ nat_assurance           │ 38758176 │                1 │               0.0 │
│ type_envlp              │ 38758176 │         15826621 │          0.408343 │
│ nat_destinataire        │ 38758176 │         15827773 │          0.408372 │
│ code_secteur            │ 38758176 │                0 │       

In [12]:
import pprint

from boruta import BorutaPy
from joblib import Parallel, delayed, dump, load
from lightgbm import LGBMRegressor
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.metrics import r2_score
from sklearn.model_selection import KFold, RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.utils.validation import check_is_fitted


class BorutaFeatureSelector(BaseEstimator, TransformerMixin):
    def __init__(self, estimator, output=False):
        self.estimator = estimator
        self.output = output

    def fit(self, X, y):
        self.feature_names_ = X.columns.tolist()
        self.selector = BorutaPy(
            estimator=self.estimator,
            n_estimators="auto",
            max_iter=10,
            random_state=seed,
        )
        self.selector.fit(X.values, y)
        self._sklearn_is_fitted = True
        if self.output:
            self._show_ranking()

        return self

    def transform(self, X):
        check_is_fitted(self, attributes=["selector"])
        return X.iloc[:, self.selector.support_]

    def _show_ranking(self):
        print("------Support and Ranking for each feature------")
        feature_info = [
            (self.selector.support_[i], self.selector.ranking_[i], feature)
            for i, feature in enumerate(self.feature_names_)
        ]
        feature_info.sort(key=lambda x: (x[1], x[2]))
        digit = len(str(len(self.feature_names_)))

        for support, ranking, feature in feature_info:
            status = "✅" if support else "❌"
            print(f"{status} Rank: {ranking:<{digit}} - {feature}")


def optimize_hyperparameters(
    X,
    y,
    estimator,
    param_distributions,
    *,
    scoring="accuracy",
    n_jobs=-1,
    cv=5,
    random_state=42,
) -> RandomizedSearchCV:
    """Optimize hyperparameters using RandomizedSearchCV."""
    randomized_search = RandomizedSearchCV(
        estimator=estimator,
        param_distributions=param_distributions,
        scoring=scoring,
        n_jobs=n_jobs,
        cv=cv,
        return_train_score=True,
        random_state=random_state,
    )
    randomized_search.fit(X, y)
    return randomized_search.best_estimator_, randomized_search.best_params_

In [8]:
threshold = 0.4

with duckdb.connect(database="projet.db", read_only=False) as conn:
    set_attr(conn)
    extract(conn, "C:\Projet\Raw_data\A202401.parquet", columns_map)

    X_cols = (
        exploratoire(conn, "stg.transactions", columns_map)
        .filter(f"pct_inutilisables < {threshold}")
        .project("column_name")
        .to_df()["column_name"]
        .tolist()
    )
    Y_cols = ["mnt_depassement", "mnt_pay", "mnt_rem"]

    ar = conn.table("stg.transactions").project(*X_cols, *Y_cols).fetch_arrow_table()

X = ar.select(X_cols).to_pandas(split_blocks=True)
for col in X.columns:
    X[col] = X[col].astype("category")
Y = ar.select(Y_cols).to_pandas()


print(X.shape)
print(Y.shape)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

(38758176, 12)
(38758176, 3)


In [None]:
from tempfile import mkdtemp
from pathlib import Path
import gc

memdir = Path(mkdtemp())
X_mm = dump(X.to_numpy(copy=False), memdir / "X.mmap")[0]
X_mm = load(X_mm, mmap_mode="r")  # ndarray view
feature_names = X.columns.to_list()
del X, ar
gc.collect()


class MemMapFrame(pd.DataFrame):
    @property
    def _constructor(self):
        return MemMapFrame


X_proxy = MemMapFrame(X_mm, columns=feature_names)

pipeline = Pipeline(
    steps=[
        (
            "imputer",
            SimpleImputer(strategy="most_frequent").set_output(transform="pandas"),
        ),
        (
            "feature_selector",
            BorutaFeatureSelector(
                estimator=LGBMRegressor(random_state=seed, n_jobs=-1), output=True
            ),
        ),
        ("model", LGBMRegressor(random_state=seed, n_jobs=-1)),
    ]
)

param_distributions = {
    "model__num_leaves": [64, 128],
    "model__learning_rate": [0.05, 0.1],
}

kf = KFold(n_splits=5, shuffle=True, random_state=seed)


def train_one_target(target):
    y = Y[target]

    best_pipeline, best_params = optimize_hyperparameters(
        X_proxy, y, pipeline, param_distributions=param_distributions, cv=3, n_jobs=1
    )

    # build OOF preds
    oof = np.zeros(len(y))
    for tr, te in kf.split(X_proxy):
        best_pipeline.fit(X_proxy.iloc[tr], y.iloc[tr])
        oof[te] = best_pipeline.predict(X_proxy.iloc[te])
    score = r2_score(y, oof)

    return {
        "target": target,
        "best_params": best_params,
        "oof_score": score,
        "oof_pred": oof,
        "estimator": best_pipeline,
    }


results = Parallel(n_jobs=3)(delayed(train_one_target)(t) for t in Y_cols)

summary = {
    r["target"]: {"R²": r["oof_score"], "params": r["best_params"]} for r in results
}
oof_matrix = np.column_stack([r["oof_pred"] for r in results])
final_pred = oof_matrix.mean(axis=0)  # per-row mean of three models

print("\n── Summary ──")
pprint.pprint(summary, compact=True)
print("Final per-row averaged prediction shape:", final_pred.shape)