In [4]:
import polars as pl
import numpy as np
from typing import List, Dict, Optional

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, brier_score_loss
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import make_scorer, recall_score, precision_score, f1_score

from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE, ADASYN, RandomOverSampler


def create_model_pipeline(
    on_base_lf: pl.LazyFrame,
    responses: List[str],
    cat_predictors_drop: List[str] = [],
    cat_predictors_mode: List[str] = [],
    num_predictors_drop: List[str] = [],
    num_predictors_median: List[str] = [],
    model_type: str = "LogisticRegression",
    oversampling_method: str = "SMOTE",
    param_grid: Optional[Dict] = None,
    scoring: Dict = {'brier_score': 'neg_brier_score'},
    refit: str = "brier_score",
    cv: int = 5,
    test_size: float = 0.30,
    random_state: int = 123,
    verbose: bool = True):
    """
    Create and train a machine learning pipeline with preprocessing,
    oversampling, and model selection.
    
    Args:
        on_base_lf: Polars LazyFrame with the data
        responses: List of response variable column names
        cat_predictors_drop: Categorical predictors with drop imputation
        cat_predictors_mode: Categorical predictors with mode imputation  
        num_predictors_drop: Numerical predictors with drop imputation
        num_predictors_median: Numerical predictors with median imputation
        model_type: Type of model to use
        oversampling_method: Method for handling class imbalance
        param_grid: Parameters for GridSearchCV
        scoring: Scoring metric for model selection
        cv: Cross-validation folds
        test_size: Proportion of data for testing
        random_state: Random seed for reproducibility
        verbose: Whether to print progress information
    
    Returns:
        dict: Contains trained pipeline, test data, predictions, and performance metrics
    """
    # ==== Preprocessing Pipeline ====
    # Column specific preprocessing steps
    numeric_median_pipeline = Pipeline([
        ("imputer", SimpleImputer(strategy="median")), 
        ("scaler", StandardScaler())
    ])
    
    numeric_drop_pipeline = Pipeline([
        ("scaler", StandardScaler())
    ])
    
    categorical_drop_pipeline = Pipeline([
        ("encoder", OneHotEncoder(handle_unknown="ignore"))
    ])
    
    categorical_mode_pipeline = Pipeline([
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("encoder", OneHotEncoder(handle_unknown="ignore"))
    ])
    
    # Combine column specific preprocessing steps into a preprocessing pipeline
    transformers = []
    if num_predictors_drop:
        transformers.append(("num_drop", numeric_drop_pipeline, num_predictors_drop))
    if num_predictors_median:
        transformers.append(("num_median", numeric_median_pipeline, num_predictors_median))
    if cat_predictors_drop:
        transformers.append(("cat_drop", categorical_drop_pipeline, cat_predictors_drop))
    if cat_predictors_mode:
        transformers.append(("cat_mode", categorical_mode_pipeline, cat_predictors_mode))
    
    preprocessor = ColumnTransformer(transformers=transformers)
    
    # Over sampling selection
    if oversampling_method == "SMOTE":
        oversampler = SMOTE(random_state=random_state)
    elif oversampling_method == "ADASYN":
        oversampler = ADASYN(random_state=random_state)
    elif oversampling_method == "RandomOverSampler":
        oversampler = RandomOverSampler(random_state=random_state)
    else:
        raise ValueError(f"Unknown oversampling method: {oversampling_method}")
    
    # Model selection
    if model_type == "LogisticRegression":
        base_model = LogisticRegression(random_state=random_state)
    elif model_type == "RandomForestClassifier":
        base_model = RandomForestClassifier(random_state=random_state)
    elif model_type == "GradientBoostingClassifier":
        base_model = GradientBoostingClassifier(random_state=random_state)
    elif model_type == "KNeighborsClassifier":
        base_model = KNeighborsClassifier()
    elif model_type == "MLPClassifier":
        base_model = MLPClassifier(random_state=random_state)
    else:
        raise ValueError(f"Unknown model type: {model_type}")
    
    # Combine preprocessor, over sampler, and model into one pipeline
    pipeline = ImbPipeline([
        ("preprocessor", preprocessor),
        ("oversampler", oversampler),
        ("classifier", base_model)
    ])
    
    # Wrap pipeline in a Gridsearch. Each CV set will have its own pipeline.
    # For param_grid, start with classifier__{parameter} as the name.
    grid_search = GridSearchCV(
        estimator=pipeline,
        param_grid=param_grid,
        cv=cv,
        scoring=scoring,
        verbose=1 if verbose else 0,
        n_jobs=-1,
        refit=refit
    )

    # ==== Data Preparation ====
    all_features = (responses + cat_predictors_drop + cat_predictors_mode + 
                    num_predictors_drop + num_predictors_median)
    all_predictors = (cat_predictors_drop + cat_predictors_mode + 
                      num_predictors_drop + num_predictors_median)
    drop_null_features = cat_predictors_drop + num_predictors_drop + responses
    
    if verbose:
        print(f"Total features: {len(all_features)}")
        print(f"Total Predictors: {len(all_predictors)}")
        print(f"Total Responses: {len(responses)}")
    
    # Select all features and perform drop imputation on specific columns
    on_base_pl = (on_base_lf
        .select(all_features)
        .drop_nulls(drop_null_features)
        .collect()
    )
    
    # Create predictor and response sets
    X = on_base_pl.select(all_predictors).to_pandas()
    y = on_base_pl.select(responses).to_pandas().squeeze()
    
    if verbose:
        print(f"\nDataset shape after drop imputation: {X.shape}")
        print(f"Response distribution: {y.value_counts().to_dict()}")
    
    # Split the predictor and response sets into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, 
        test_size=test_size, 
        shuffle=True,
        stratify=y,
        random_state=random_state
    )
    
    # Train the model
    grid_search.fit(X_train, y_train)
    
    if verbose:
        best_param = [f"{param} = {value}" for param, value in grid_search.best_params_.items()]
        print(f"\nBest parameters: \n{"\n".join(best_param)}")
        print(f"\nBest cross-validation score: {grid_search.best_score_:.4f}\n")
    
    # ==== Model Evaluation ====
    best_pipeline = grid_search.best_estimator_
    y_pred = best_pipeline.predict(X_test)
    y_pred_proba = best_pipeline.predict_proba(X_test)[:, 1]
    
    # Calculate metrics
    brier_score = brier_score_loss(y_test, y_pred_proba)
    
    if verbose:
        print(classification_report(y_test, y_pred))
        print(f"Brier Score: {brier_score:.4f}")
        print(f"\nPredictors:\n{"\n".join(all_predictors)}")
        print(f"\nResponse:\n{"\n".join(responses)}")
    
    # Return important objects
    results = {
        'pipeline': best_pipeline,
        'grid_search': grid_search,
        'X_train': X_train,
        'X_test': X_test,
        'y_train': y_train,
        'y_test': y_test,
        'y_pred': y_pred,
        'y_pred_proba': y_pred_proba,
        'brier_score': brier_score,
        'best_params': grid_search.best_params_,
        'best_score': grid_search.best_score_,
        'feature_names': all_predictors,
        'response_names': responses
    }
    
    return results

In [5]:
import os

# Load data
on_base_path = "../data/throw_home_runner_on_third_wide_sprint_arm.parquet"
on_base_lf = pl.scan_parquet(on_base_path)

# Feature column names
responses = ["is_successful"]
num_predictors_drop = [
    "hang_time",
    "distance_catch_to_home", 
    "distance_traveled_fielder",
    "distance_traveled_all_fielders",
    "distance_to_home_diff"
]
num_predictors_median = [
    "seconds_since_hit_085_mlb_person_id_R3",
    "arm_overall_fielder_mlb_person_id"
]

# Parameter grid
param_grid = {
    'classifier__penalty': ['elasticnet'],
    'classifier__C': [0.001, 0.01, 0.1, 1, 10, 100],
    'classifier__l1_ratio': [0.1, 0.3, 0.5, 0.7, 0.9],
    'classifier__solver': ['saga'],
    'classifier__max_iter': [100, 250, 500, 1000]
}

# Scoring method
specificity_scorer = make_scorer(recall_score, pos_label=0)
precision_neg_scorer = make_scorer(precision_score, pos_label=0, zero_division=0)
f1_neg_scorer = make_scorer(f1_score, pos_label=0, zero_division=0)
scoring = {
    'log_loss': 'neg_log_loss',
    'brier_score': 'neg_brier_score'
}
refit = "brier_score"

# Train model
results = create_model_pipeline(
    on_base_lf=on_base_lf,
    responses=responses,
    num_predictors_drop=num_predictors_drop,
    num_predictors_median=num_predictors_median,
    model_type="LogisticRegression",
    oversampling_method="SMOTE",
    param_grid=param_grid,
    scoring=scoring,
    refit=refit,
    cv=5,
    verbose=True
)

Total features: 8
Total Predictors: 7
Total Responses: 1

Dataset shape after drop imputation: (6183, 7)
Response distribution: {True: 4814, False: 1369}
Fitting 5 folds for each of 120 candidates, totalling 600 fits



Best parameters: 
classifier__C = 1
classifier__l1_ratio = 0.9
classifier__max_iter = 100
classifier__penalty = elasticnet
classifier__solver = saga

Best cross-validation score: -0.0686

              precision    recall  f1-score   support

       False       0.71      0.92      0.80       411
        True       0.97      0.89      0.93      1444

    accuracy                           0.90      1855
   macro avg       0.84      0.90      0.87      1855
weighted avg       0.92      0.90      0.90      1855

Brier Score: 0.0739

Predictors:
hang_time
distance_catch_to_home
distance_traveled_fielder
distance_traveled_all_fielders
distance_to_home_diff
seconds_since_hit_085_mlb_person_id_R3
arm_overall_fielder_mlb_person_id

Response:
is_successful


In [7]:
# Parameter grid
param_grid = {
    'classifier__hidden_layer_sizes': [
        (50,), (100,), (150,),
        (50, 50), (100, 50), (100, 100),
        (100, 50, 25)                   
    ],
    'classifier__activation': ['relu', 'tanh'],
    'classifier__solver': ['adam', 'lbfgs'],
    'classifier__alpha': [0.0001, 0.001, 0.01],     
    'classifier__learning_rate': ['constant', 'adaptive'],
    'classifier__max_iter': [500, 1000]
}


# Train model
results = create_model_pipeline(
    on_base_lf=on_base_lf,
    responses=responses,
    num_predictors_drop=num_predictors_drop,
    num_predictors_median=num_predictors_median,
    model_type="MLPClassifier",
    oversampling_method="SMOTE",
    param_grid=param_grid,
    scoring=scoring,
    refit=refit,
    cv=5,
    verbose=True
)

Total features: 8
Total Predictors: 7
Total Responses: 1

Dataset shape after drop imputation: (6183, 7)
Response distribution: {True: 4814, False: 1369}
Fitting 5 folds for each of 336 candidates, totalling 1680 fits


STOP: TOTAL NO. OF ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter)
STOP: TOTAL NO. OF ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter)
STOP: TOTAL NO. OF ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter)
STOP: TOTAL NO. OF ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
  self.n_iter_ = _check_optimize_result("


Best parameters: 
classifier__activation = relu
classifier__alpha = 0.01
classifier__hidden_layer_sizes = (50,)
classifier__learning_rate = constant
classifier__max_iter = 500
classifier__solver = adam

Best cross-validation score: -0.0675

              precision    recall  f1-score   support

       False       0.72      0.91      0.80       411
        True       0.97      0.90      0.93      1444

    accuracy                           0.90      1855
   macro avg       0.84      0.90      0.87      1855
weighted avg       0.91      0.90      0.90      1855

Brier Score: 0.0734

Predictors:
hang_time
distance_catch_to_home
distance_traveled_fielder
distance_traveled_all_fielders
distance_to_home_diff
seconds_since_hit_085_mlb_person_id_R3
arm_overall_fielder_mlb_person_id

Response:
is_successful
