In this example, we focused on integrating Polars DataFrames with scikit-learn's pipeline and preprocessing functionality to create a streamlined and efficient machine learning workflow. The main objective was to leverage the power of Polars for data manipulation and the flexibility of scikit-learn for model training and evaluation.

Here's a summary of what we accomplished:

1. Data Preparation:
   - We defined a `convert_utf8_to_enum` function to convert categorical columns in a Polars DataFrame from UTF-8 to the Enum data type based on a specified threshold.
   - We created Pydantic classes (`Feature`, `NumericalFeature`, `EmbeddingFeature`, `CategoricalFeature`, `FeatureSet`, and `InputConfig`) to define and validate the feature sets used in the machine learning pipeline.
   - We generated simulated data using the `generate_simulated_data` function to demonstrate the workflow.

2. Pipeline Creation:
   - We defined a `create_pipeline` function that takes an `InputConfig` instance and creates a scikit-learn `Pipeline` object.
   - The pipeline consists of a `ColumnTransformer` for preprocessing numerical and categorical features, followed by a `LinearSVC` classifier.
   - For numerical features, we used `StandardScaler` to standardize the data.
   - For categorical features, we used `"passthrough"` to pass the physical representation of the Enum columns directly through the pipeline, avoiding the need for `OneHotEncoder`.
   - We set the output of the pipeline to "polars" using `pipeline.set_output(transform="polars")` to ensure that the pipeline returns a Polars DataFrame.

3. Model Training and Evaluation:
   - We simulated cross-validation by filtering the DataFrame based on a "fold" column to obtain the training, validation, and test sets.
   - We fit the pipeline on the training data using `pipeline.fit()`.
   - We evaluated the model's performance on the validation and test data using the `evaluate_model` function, which calculates the accuracy using scikit-learn's `accuracy_score`.

4. Integration with Polars:
   - Throughout the example, we used Polars DataFrames for data manipulation and preprocessing.
   - We converted the categorical columns to their physical representation using `pl.col(col).to_physical()` to ensure compatibility with the pipeline.
   - We used Polars' `filter` and `drop` methods to select the appropriate subsets of data for training, validation, and testing.

In our next working session, we will focus on the following tasks:

1. Refactoring the Cynde-related methods to integrate the new approaches developed in this example.
2. Introducing equivalent models for defining the model configurations, replacing the current dictionary-based approach.
3. Cleaning and generalizing all the cross-validation methods under a common framework to improve code organization and reusability.

By building upon the foundation established in this example and incorporating the planned enhancements, we aim to create a more robust, efficient, and user-friendly machine learning framework within the Cynde project.

In [1]:
import polars as pl
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier,HistGradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from typing import List, Optional
from pydantic import BaseModel, ValidationInfo, model_validator
import numpy as np
from sklearn.metrics import accuracy_score




In [2]:
from enum import Enum
from typing import Optional, Union, Dict, Literal
from pydantic import BaseModel, Field, model_validator
import polars as pl
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler, PowerTransformer, QuantileTransformer, Normalizer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

class ScalerType(str, Enum):
    STANDARD_SCALER = "StandardScaler"
    MIN_MAX_SCALER = "MinMaxScaler"
    MAX_ABS_SCALER = "MaxAbsScaler"
    ROBUST_SCALER = "RobustScaler"
    POWER_TRANSFORMER = "PowerTransformer"
    QUANTILE_TRANSFORMER = "QuantileTransformer"
    NORMALIZER = "Normalizer"

class Feature(BaseModel):
    column_name: str
    name: str
    description: Optional[str] = None

    @model_validator(mode='before')
    def validate_column_name(cls, values):
        column_name = values.get("column_name")
        context = values.get("context")
        if context is not None and isinstance(context, pl.DataFrame):
            if column_name not in context.columns:
                raise ValueError(f"Column '{column_name}' not found in the DataFrame.")
        return values

    class Config:
        arbitrary_types_allowed = True
        extra = "allow"

class NumericalFeature(Feature):
    scaler_type: ScalerType = Field(ScalerType.STANDARD_SCALER, description="The type of scaler to apply to the numerical feature.")

    def get_scaler(self):
        scaler_map = {
            ScalerType.STANDARD_SCALER: StandardScaler(),
            ScalerType.MIN_MAX_SCALER: MinMaxScaler(),
            ScalerType.MAX_ABS_SCALER: MaxAbsScaler(),
            ScalerType.ROBUST_SCALER: RobustScaler(),
            ScalerType.POWER_TRANSFORMER: PowerTransformer(),
            ScalerType.QUANTILE_TRANSFORMER: QuantileTransformer(),
            ScalerType.NORMALIZER: Normalizer(),
        }
        return scaler_map[self.scaler_type]

    @model_validator(mode='before')
    def validate_numerical_column(cls, values):
        column_name = values.get("column_name")
        context = values.get("context")
        if context is not None and isinstance(context, pl.DataFrame):
            if column_name not in context.columns:
                raise ValueError(f"Column '{column_name}' not found in the DataFrame.")
            if context[column_name].dtype not in [
                pl.Boolean,
                pl.Int8,
                pl.Int16,
                pl.Int32,
                pl.Int64,
                pl.UInt8,
                pl.UInt16,
                pl.UInt32,
                pl.UInt64,
                pl.Float32,
                pl.Float64,
                pl.Decimal,
            ]:
                raise ValueError(
                    f"Column '{column_name}' must be of a numeric type (Boolean, Integer, Unsigned Integer, Float, or Decimal)."
                )
        return values

class EmbeddingFeature(NumericalFeature):
    @model_validator(mode='before')
    def validate_embedding_column(cls, values):
        column_name = values.get("column_name")
        context = values.get("context")
        if context is not None and isinstance(context, pl.DataFrame):
            if column_name not in context.columns:
                raise ValueError(f"Column '{column_name}' not found in the DataFrame.")
            if context[column_name].dtype not in [pl.List(pl.Float32), pl.List(pl.Float64)]:
                raise ValueError(f"Column '{column_name}' must be of type pl.List(pl.Float32) or pl.List(pl.Float64).")
        return values

class CategoricalFeature(Feature):
    one_hot_encoding: bool = Field(True, description="Whether to apply one-hot encoding to the categorical feature.")

    @model_validator(mode='before')
    def validate_categorical_column(cls, values):
        column_name = values.get("column_name")
        context = values.get("context")
        if context is not None and isinstance(context, pl.DataFrame):
            if column_name not in context.columns:
                raise ValueError(f"Column '{column_name}' not found in the DataFrame.")
            if context[column_name].dtype not in [
                pl.Utf8,
                pl.Categorical,
                pl.Enum,
                pl.Int8,
                pl.Int16,
                pl.Int32,
                pl.Int64,
                pl.UInt8,
                pl.UInt16,
                pl.UInt32,
                pl.UInt64,
            ]:
                raise ValueError(
                    f"Column '{column_name}' must be of type pl.Utf8, pl.Categorical, pl.Enum, or an integer type."
                )
        return values

class FeatureSet(BaseModel):
    numerical: List[NumericalFeature] = []
    embeddings: List[EmbeddingFeature] = []
    categorical: List[CategoricalFeature] = []

    class Config:
        arbitrary_types_allowed = True
        extra = "allow"

class InputConfig(BaseModel):
    feature_sets: List[FeatureSet]

    def validate_with_dataframe(self, df: pl.DataFrame):
        for feature_set in self.feature_sets:
            for feature_type in ["numerical", "embeddings", "categorical"]:
                for feature in getattr(feature_set, feature_type):
                    feature.model_validate({"context": df, **feature.dict()})

    class Config:
        arbitrary_types_allowed = True
        extra = "allow"



In [3]:
def generate_simulated_data(n_samples: int, n_classes: int) -> pl.DataFrame:
    class_0 = np.random.multivariate_normal(mean=[30, 50000], cov=[[100, 0], [0, 1000000]], size=n_samples // 2)
    class_1 = np.random.multivariate_normal(mean=[50, 80000], cov=[[100, 0], [0, 1000000]], size=n_samples // 2)
    data = {
        "age": np.concatenate((class_0[:, 0], class_1[:, 0])),
        "income": np.concatenate((class_0[:, 1], class_1[:, 1])),
        "gender": np.random.choice(["Male", "Female"], size=n_samples),
        "education": np.random.choice(["Bachelor's", "Master's", "PhD"], size=n_samples),
        "target": np.concatenate((np.zeros(n_samples // 2), np.ones(n_samples // 2))),
        "fold": np.random.choice([0, 1, 2], size=n_samples),
    }
    return pl.DataFrame(data)



In [4]:
from enum import Enum
from typing import Optional, Union, Dict, Literal, Any, List, Tuple
from pydantic import BaseModel, Field

class ClassifierName(str, Enum):
    LOGISTIC_REGRESSION = "LogisticRegression"
    RANDOM_FOREST = "RandomForestClassifier"
    HIST_GRADIENT_BOOSTING = "HistGradientBoostingClassifier"

class BaseClassifierConfig(BaseModel):
    classifier_name: ClassifierName

class LogisticRegressionConfig(BaseClassifierConfig):
    classifier_name: Literal[ClassifierName.LOGISTIC_REGRESSION] = ClassifierName.LOGISTIC_REGRESSION
    penalty: str = Field("l2", description="Specify the norm of the penalty.")
    dual: bool = Field(False, description="Dual or primal formulation.")
    tol: float = Field(1e-4, description="Tolerance for stopping criteria.")
    C: float = Field(1.0, description="Inverse of regularization strength.")
    fit_intercept: bool = Field(True, description="Specifies if a constant should be added to the decision function.")
    intercept_scaling: float = Field(1, description="Scaling factor for the constant.")
    class_weight: Optional[Union[str, Dict[Any, float]]] = Field(None, description="Weights associated with classes.")
    random_state: Optional[int] = Field(None, description="Seed for random number generation.")
    solver: str = Field("lbfgs", description="Algorithm to use in the optimization problem.")
    max_iter: int = Field(100, description="Maximum number of iterations.")
    multi_class: str = Field("auto", description="Approach for handling multi-class targets.")
    verbose: int = Field(0, description="Verbosity level.")
    warm_start: bool = Field(False, description="Reuse the solution of the previous call to fit.")
    n_jobs: Optional[int] = Field(None, description="Number of CPU cores to use.")
    l1_ratio: Optional[float] = Field(None, description="Elastic-Net mixing parameter.")

class RandomForestClassifierConfig(BaseClassifierConfig):
    classifier_name: Literal[ClassifierName.RANDOM_FOREST] = ClassifierName.RANDOM_FOREST
    n_estimators: int = Field(100, description="The number of trees in the forest.")
    criterion: str = Field("gini", description="The function to measure the quality of a split.")
    max_depth: Optional[int] = Field(None, description="The maximum depth of the tree.")
    min_samples_split: Union[int, float] = Field(2, description="The minimum number of samples required to split an internal node.")
    min_samples_leaf: Union[int, float] = Field(1, description="The minimum number of samples required to be at a leaf node.")
    min_weight_fraction_leaf: float = Field(0.0, description="The minimum weighted fraction of the sum total of weights required to be at a leaf node.")
    max_features: Union[str, int, float] = Field("sqrt", description="The number of features to consider when looking for the best split.")
    max_leaf_nodes: Optional[int] = Field(None, description="Grow trees with max_leaf_nodes in best-first fashion.")
    min_impurity_decrease: float = Field(0.0, description="A node will be split if this split induces a decrease of the impurity greater than or equal to this value.")
    bootstrap: bool = Field(True, description="Whether bootstrap samples are used when building trees.")
    oob_score: bool = Field(False, description="Whether to use out-of-bag samples to estimate the generalization score.")
    n_jobs: Optional[int] = Field(None, description="Number of CPU cores to use.")
    random_state: Optional[int] = Field(None, description="Seed for random number generation.")
    verbose: int = Field(0, description="Verbosity level.")
    warm_start: bool = Field(False, description="Reuse the solution of the previous call to fit and add more estimators to the ensemble.")
    class_weight: Optional[Union[str, Dict[Any, float]]] = Field(None, description="Weights associated with classes.")
    ccp_alpha: float = Field(0.0, description="Complexity parameter used for Minimal Cost-Complexity Pruning.")
    max_samples: Optional[Union[int, float]] = Field(None, description="If bootstrap is True, the number of samples to draw from X to train each base estimator.")
    monotonic_cst: Optional[Dict[str, int]] = Field(None, description="Monotonic constraint to enforce on each feature.")

class HistGradientBoostingClassifierConfig(BaseClassifierConfig):
    classifier_name: Literal[ClassifierName.HIST_GRADIENT_BOOSTING] = ClassifierName.HIST_GRADIENT_BOOSTING
    loss: str = Field("log_loss", description="The loss function to use in the boosting process.")
    learning_rate: float = Field(0.1, description="The learning rate, also known as shrinkage.")
    max_iter: int = Field(100, description="The maximum number of iterations of the boosting process.")
    max_leaf_nodes: int = Field(31, description="The maximum number of leaves for each tree.")
    max_depth: Optional[int] = Field(None, description="The maximum depth of each tree.")
    min_samples_leaf: int = Field(20, description="The minimum number of samples per leaf.")
    l2_regularization: float = Field(0.0, description="The L2 regularization parameter.")
    max_features: Union[str, int, float] = Field(1.0, description="Proportion of randomly chosen features in each and every node split.")
    max_bins: int = Field(255, description="The maximum number of bins to use for non-missing values.")
    categorical_features: Optional[Union[str, List[int], List[bool]]] = Field("warn", description="Indicates the categorical features.")
    monotonic_cst: Optional[Dict[str, int]] = Field(None, description="Monotonic constraint to enforce on each feature.")
    interaction_cst: Optional[Union[str, List[Tuple[int, ...]]]] = Field(None, description="Specify interaction constraints, the sets of features which can interact with each other in child node splits.")
    warm_start: bool = Field(False, description="Reuse the solution of the previous call to fit and add more estimators to the ensemble.")
    early_stopping: Union[str, bool] = Field("auto", description="Whether to use early stopping to terminate training when validation score is not improving.")
    scoring: Optional[str] = Field("loss", description="Scoring parameter to use for early stopping.")
    validation_fraction: float = Field(0.1, description="Proportion of training data to set aside as validation data for early stopping.")
    n_iter_no_change: int = Field(10, description="Used to determine when to stop if validation score is not improving.")
    tol: float = Field(1e-7, description="The absolute tolerance to use when comparing scores.")
    verbose: int = Field(0, description="Verbosity level.")
    random_state: Optional[int] = Field(None, description="Seed for random number generation.")
    class_weight: Optional[Union[str, Dict[Any, float]]] = Field(None, description="Weights associated with classes.")

class ClassifierConfig(BaseModel):
    classifier: Union[LogisticRegressionConfig, RandomForestClassifierConfig, HistGradientBoostingClassifierConfig]

In [5]:
def convert_utf8_to_enum(df: pl.DataFrame, threshold: float = 0.2) -> pl.DataFrame:
    if not 0 < threshold < 1:
        raise ValueError("Threshold must be between 0 and 1 (exclusive).")

    for column in df.columns:
        if df[column].dtype == pl.Utf8 and len(df[column]) > 0:
            unique_values = df[column].unique()
            unique_ratio = len(unique_values) / len(df[column])

            if unique_ratio <= threshold:
                enum_dtype = pl.Enum(unique_values.to_list())
                df = df.with_columns(df[column].cast(enum_dtype))
            else:
                print(f"Column '{column}' has a high ratio of unique values ({unique_ratio:.2f}). Skipping conversion to Enum.")
        elif df[column].dtype == pl.Utf8 and len(df[column]) == 0:
            print(f"Column '{column}' is empty. Skipping conversion to Enum.")

    return df
def evaluate_model(pipeline: Pipeline, X, y):
    predictions = pipeline.predict(X)
    accuracy = accuracy_score(y, predictions)
    return accuracy

def convert_enum_to_physical(df: pl.DataFrame) -> pl.DataFrame:
    df_physical = df.with_columns(
        [pl.col(col).to_physical() for col in df.columns if df[col].dtype == pl.Enum]
    )
    return df_physical

def create_pipeline(df: pl.DataFrame, input_config: InputConfig, classifier_config: ClassifierConfig) -> Pipeline:
    transformers = []
    for feature_set in input_config.feature_sets:
        numerical_features = [feature.column_name for feature in feature_set.numerical]
        if numerical_features:
            scaler = feature_set.numerical[0].get_scaler()  # Assuming all numerical features use the same scaler
            transformers.append(("numerical", scaler, numerical_features))

        categorical_features = [feature.column_name for feature in feature_set.categorical]
        if categorical_features:
            for feature in feature_set.categorical:
                if feature.one_hot_encoding:
                    if df[feature.column_name].dtype == pl.Categorical:
                        categories = [df[feature.column_name].unique().to_list()]
                    elif df[feature.column_name].dtype == pl.Enum:
                        categories = [df[feature.column_name].dtype.categories]
                    else:
                        raise ValueError(f"Column '{feature.column_name}' must be of type pl.Categorical or pl.Enum for one-hot encoding.")
                    one_hot_encoder = OneHotEncoder(categories=categories, handle_unknown='error', sparse_output=False)
                    transformers.append((f"categorical_{feature.column_name}", one_hot_encoder, [feature.column_name]))
                else:
                    if df[feature.column_name].dtype not in [pl.Float32, pl.Float64]:
                        raise ValueError(f"Column '{feature.column_name}' must be of type pl.Float32 or pl.Float64 for physical representation.")
                    transformers.append((f"categorical_{feature.column_name}", "passthrough", [feature.column_name]))

    preprocessor = ColumnTransformer(transformers)

    # Create the classifier based on the classifier configuration
    if isinstance(classifier_config.classifier, LogisticRegressionConfig):
        classifier = LogisticRegression(**classifier_config.classifier.dict(exclude={"classifier_name"}))
    elif isinstance(classifier_config.classifier, RandomForestClassifierConfig):
        classifier = RandomForestClassifier(**classifier_config.classifier.dict(exclude={"classifier_name"}))
    elif isinstance(classifier_config.classifier, HistGradientBoostingClassifierConfig):
        classifier = HistGradientBoostingClassifier(**classifier_config.classifier.dict(exclude={"classifier_name"}))
    else:
        raise ValueError(f"Unsupported classifier: {type(classifier_config.classifier)}")

    pipeline = Pipeline([("preprocessor", preprocessor), ("classifier", classifier)])
    pipeline.set_output(transform="polars")
    return pipeline

# Example usage
n_samples = 1000
n_classes = 2
df = generate_simulated_data(n_samples, n_classes)
# Convert categorical columns to Enum
df_enum = convert_utf8_to_enum(df, threshold=0.8)
df_physical = df_enum

# Declare feature sets using Pydantic classes
numerical_features = [
    NumericalFeature(column_name="age", name="Age"),
    NumericalFeature(column_name="income", name="Income"),
]
categorical_features = [
    CategoricalFeature(column_name="gender", name="Gender"),
    CategoricalFeature(column_name="education", name="Education"),
]
feature_set = FeatureSet(
    numerical=numerical_features,
    categorical=categorical_features,
)
input_config = InputConfig(feature_sets=[feature_set])

# Declare classifier configurations
classifier_configs = [
    ClassifierConfig(
        classifier=LogisticRegressionConfig(
            penalty="l2",
            C=1.0,
            solver="lbfgs",
            max_iter=100,
        )
    ),
    ClassifierConfig(
        classifier=RandomForestClassifierConfig(
            n_estimators=100,
            max_depth=5,
            min_samples_split=2,
            min_samples_leaf=1,
            random_state=42,
        )
    ),
    ClassifierConfig(
        classifier=HistGradientBoostingClassifierConfig(
            learning_rate=0.1,
            max_iter=100,
            max_leaf_nodes=31,
            min_samples_leaf=20,
            random_state=42,
        )
    ),
]

# Simulate cross-validation
fold_name = "fold"
train_df = df_physical.filter(pl.col(fold_name) == 0)
val_df = df_physical.filter(pl.col(fold_name) == 1)
test_df = df_physical.filter(pl.col(fold_name) == 2)

# Evaluate each classifier configuration
for classifier_config in classifier_configs:
    print(f"Evaluating {type(classifier_config.classifier).__name__}")

    # Create the pipeline
    pipeline = create_pipeline(df_enum, input_config, classifier_config)

    # Fit the pipeline on the training data
    pipeline.fit(train_df.drop(fold_name), train_df["target"])

    # Evaluate the model on the validation and test data
    val_accuracy = evaluate_model(pipeline, val_df.drop([fold_name, "target"]), val_df["target"])
    test_accuracy = evaluate_model(pipeline, test_df.drop([fold_name, "target"]), test_df["target"])

    print("Validation accuracy:", val_accuracy)
    print("Test accuracy:", test_accuracy)
    print()

Evaluating LogisticRegressionConfig
Validation accuracy: 1.0
Test accuracy: 1.0

Evaluating RandomForestClassifierConfig
Validation accuracy: 1.0
Test accuracy: 1.0

Evaluating HistGradientBoostingClassifierConfig
Validation accuracy: 1.0
Test accuracy: 1.0



found 0 physical cores < 1
  File "c:\Users\Tommaso\AppData\Local\Programs\Python\Python310\lib\site-packages\joblib\externals\loky\backend\context.py", line 217, in _count_physical_cores
    raise ValueError(
