In [None]:
import polars as pl
import polars.selectors as cs
from polars_ds.pipeline import Pipeline, Blueprint

# Builtin Pipeline Functions

To run this demo: use the latest version of polars_ds. Or the latest commit if the latest version doesn't work.

Need v0.5.2 or above

In [None]:
df = pl.read_parquet("../examples/dependency.parquet")
df.head()

In [None]:
# Create a blueprint first. 
# A blueprint is a plan for a pipeline. No hard work will be done until the blueprint is materialized, which
# is when the tranforms are fitted (e.g. scale learns the mean and std from base data)
# If target is specified for the blueprint, target will be excluded from all transformations that require a fit,
# and target will be auto-filled if the transformation requires a target field and when no target field is explicitly given.

bp = (
    Blueprint(df, name = "example", target = "approved") # You can optionally put target of the ML model here
    # Select only the columns we need
    .lowercase() # lowercase all columns
    # Say you want to remove a population for your data pipeline.
    .filter( 
        "city_category is not null" # or equivalently, you can do: pl.col("city_category").is_not_null()
    )
    .select(cs.numeric() | cs.by_name(["gender", "employer_category1", "city_category"]))
    # explicitly put target, since this is not the target for prediction. 
    # Use a linear regression with x1 = var1, x2=existing_emi to predict missing values in loan_period
    .linear_impute(features = ["var1", "existing_emi"], target = "loan_period") 
    .impute(["existing_emi"], method = "median")
    .append_expr( # generate some features
        pl.col("existing_emi").log1p().alias("existing_emi_log1p"),
        pl.col("loan_amount").log1p().alias("loan_amount_log1p"),
        pl.col("loan_amount").clip(lower_bound = 0, upper_bound = 1000).alias("loan_amount_log1p_clipped"),
        pl.col("loan_amount").sqrt().alias("loan_amount_sqrt"),
        pl.col("loan_amount").shift(-1).alias("loan_amount_lag_1") # any kind of lag transform
    )
    .scale( # target is numerical, but will be excluded automatically because bp is initialzied with a target
        cs.numeric().exclude(["var1", "existing_emi_log1p"]), method = "standard"
    ) # Scale the columns up to this point. The columns below won't be scaled
    .append_expr(
        # Add missing flags
        pl.col("employer_category1").is_null().cast(pl.UInt8).alias("employer_category1_is_missing")
    )
    .one_hot_encode("gender", drop_first=True)
    .woe_encode("city_category") # No need to specify target because we initialized bp with a target
    .target_encode("employer_category1", min_samples_leaf = 20, smoothing = 10.0) # same as above
    .shrink_dtype(force_f32 = True) # shrink dtype to smallest possible. Force floats to be f32
)

print(bp)

In [None]:
# Materialize the blueprint
pipe:Pipeline = bp.materialize()
# Text representation of the pipeline
pipe

Naive Query Steps: 

Step 1:
col("ID").alias("id"),
col("Gender").alias("gender"),
col("DOB").alias("dob"),
col("Lead_Creation_Date").alias("lead_creation_date"),
col("City_Code").alias("city_code"),
col("City_Category").alias("city_category"),
col("Employer_Code").alias("employer_code"),
col("Employer_Category1").alias("employer_category1"),
col("Employer_Category2").alias("employer_category2"),
col("Monthly_Income").alias("monthly_income"),
col("Customer_Existing_Primary_Bank_Code").alias("customer_existing_primary_bank_code"),
col("Primary_Bank_Type").alias("primary_bank_type"),
col("Contacted").alias("contacted"),
col("Source").alias("source"),
col("Source_Category").alias("source_category"),
col("Existing_EMI").alias("existing_emi"),
col("Loan_Amount").alias("loan_amount"),
col("Loan_Period").alias("loan_period"),
col("Interest_Rate").alias("interest_rate"),
col("EMI").alias("emi"),
col("Var1").alias("var1"),
col("Approved").alias("approved")

Step 2:
col("city_category").is_not_n

In [None]:
df_transformed = pipe.transform(df)
df_transformed.head()

city_category,employer_category1,employer_category2,monthly_income,existing_emi,loan_amount,loan_period,interest_rate,emi,var1,approved,existing_emi_log1p,loan_amount_log1p,loan_amount_log1p_clipped,loan_amount_sqrt,loan_amount_lag_1,employer_category1_is_missing,gender_Male
f32,f32,f32,f32,f32,f32,f32,f32,f32,i8,i8,f32,f32,f32,f32,f32,u8,u8
0.114988,0.010887,0.346539,-0.020724,-0.156214,,-1.269339,,,0,0,0.0,,,,-0.632338,0,0
0.114988,0.021223,-3.370307,-0.012192,-0.156214,-0.632338,-0.299292,-1.019936,-0.197259,10,0,0.0,-0.586105,,-0.658025,0.181273,0,1
-0.606987,0.021223,0.346539,-0.019302,-0.156214,0.181273,0.670754,,,0,0,0.0,0.537137,,0.375948,1.710861,0,1
-0.606987,0.010887,0.346539,-0.012192,-0.156214,1.710861,1.155778,,,7,0,0.0,1.527696,,1.709278,0.343995,0,1
0.114988,0.010887,0.346539,0.024783,0.93203,0.343995,-0.299292,,,10,0,7.824446,0.683076,,0.543738,,0,1


In [None]:
# Empty. Because we filtered this to not null.
df_transformed.filter(
    pl.col("city_category").is_null()
)

city_category,employer_category1,employer_category2,monthly_income,existing_emi,loan_amount,loan_period,interest_rate,emi,var1,approved,existing_emi_log1p,loan_amount_log1p,loan_amount_log1p_clipped,loan_amount_sqrt,loan_amount_lag_1,employer_category1_is_missing,gender_Male
f32,f32,f32,f32,f32,f32,f32,f32,f32,i8,i8,f32,f32,f32,f32,f32,u8,u8


# Serialization Methods

Pickle + JSON support.

In [None]:
import pickle
# The pipe object can be pickled
with open("pipe.pickle", "wb") as f:
    pickle.dump(pipe, f)

In [None]:
with open("pipe.pickle", "rb") as f:
    pipe2 = pickle.load(f)

pipe2

Naive Query Steps: 

Step 1:
col("ID").alias("id"),
col("Gender").alias("gender"),
col("DOB").alias("dob"),
col("Lead_Creation_Date").alias("lead_creation_date"),
col("City_Code").alias("city_code"),
col("City_Category").alias("city_category"),
col("Employer_Code").alias("employer_code"),
col("Employer_Category1").alias("employer_category1"),
col("Employer_Category2").alias("employer_category2"),
col("Monthly_Income").alias("monthly_income"),
col("Customer_Existing_Primary_Bank_Code").alias("customer_existing_primary_bank_code"),
col("Primary_Bank_Type").alias("primary_bank_type"),
col("Contacted").alias("contacted"),
col("Source").alias("source"),
col("Source_Category").alias("source_category"),
col("Existing_EMI").alias("existing_emi"),
col("Loan_Amount").alias("loan_amount"),
col("Loan_Period").alias("loan_period"),
col("Interest_Rate").alias("interest_rate"),
col("EMI").alias("emi"),
col("Var1").alias("var1"),
col("Approved").alias("approved")

Step 2:
col("city_category").is_not_n

In [None]:
df_transformed_2 = pipe2.transform(df)
df_transformed_2

city_category,employer_category1,employer_category2,monthly_income,existing_emi,loan_amount,loan_period,interest_rate,emi,var1,approved,existing_emi_log1p,loan_amount_log1p,loan_amount_log1p_clipped,loan_amount_sqrt,loan_amount_lag_1,employer_category1_is_missing,gender_Male
f32,f32,f32,f32,f32,f32,f32,f32,f32,i8,i8,f32,f32,f32,f32,f32,u8,u8
0.114988,0.010887,0.346539,-0.020724,-0.156214,,-1.269339,,,0,0,0.0,,,,-0.632338,0,0
0.114988,0.021223,-3.370307,-0.012192,-0.156214,-0.632338,-0.299292,-1.019936,-0.197259,10,0,0.0,-0.586105,,-0.658025,0.181273,0,1
-0.606987,0.021223,0.346539,-0.019302,-0.156214,0.181273,0.670754,,,0,0,0.0,0.537137,,0.375948,1.710861,0,1
-0.606987,0.010887,0.346539,-0.012192,-0.156214,1.710861,1.155778,,,7,0,0.0,1.527696,,1.709278,0.343995,0,1
0.114988,0.010887,0.346539,0.024783,0.93203,0.343995,-0.299292,,,10,0,7.824446,0.683076,,0.543738,,0,1
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
0.114988,0.010887,-3.370307,-0.004228,-0.156214,,1.203359,,,10,0,0.0,,,,,0,0
-0.606987,0.010887,0.346539,0.008799,0.474968,,0.456202,,,7,0,7.280008,,,,-0.50216,0,0
-0.091923,0.021223,0.346539,-0.023,-0.156214,-0.50216,0.670754,2.785431,-0.210546,2,0,0.0,-0.333569,,-0.46065,1.320328,0,0
0.114988,0.021223,-0.89241,0.024175,0.438403,1.320328,1.155778,,,10,0,7.220374,1.334103,,1.40992,0.962339,0,1


In [None]:
from polars.testing import assert_frame_equal
# True
assert_frame_equal(df_transformed, df_transformed_2)

In [None]:
pipe.to_dict()

{'name': 'example',
 'target': 'approved',
 'feature_names_in_': ['ID',
  'Gender',
  'DOB',
  'Lead_Creation_Date',
  'City_Code',
  'City_Category',
  'Employer_Code',
  'Employer_Category1',
  'Employer_Category2',
  'Monthly_Income',
  'Customer_Existing_Primary_Bank_Code',
  'Primary_Bank_Type',
  'Contacted',
  'Source',
  'Source_Category',
  'Existing_EMI',
  'Loan_Amount',
  'Loan_Period',
  'Interest_Rate',
  'EMI',
  'Var1',
  'Approved'],
 'feature_names_out_': ['city_category',
  'employer_category1',
  'employer_category2',
  'monthly_income',
  'existing_emi',
  'loan_amount',
  'loan_period',
  'interest_rate',
  'emi',
  'var1',
  'approved',
  'existing_emi_log1p',
  'loan_amount_log1p',
  'loan_amount_log1p_clipped',
  'loan_amount_sqrt',
  'loan_amount_lag_1',
  'employer_category1_is_missing',
  'gender_Male'],
 'transforms': [{'SelectStep': ['{"Alias":[{"Column":"ID"},"id"]}',
    '{"Alias":[{"Column":"Gender"},"gender"]}',
    '{"Alias":[{"Column":"DOB"},"dob"]}'

In [None]:
# To save the pipeline as JSON

pipe.to_json("test.json")
pipe3 = Pipeline.from_json("test.json")
# True
assert_frame_equal(df_transformed, pipe3.transform(df))

# Custom Tranformations in Pipeline

Need version >= v0.4.6 (Not released yet)

In [None]:
df = pl.read_parquet("../examples/dependency.parquet")
df.head()

In [None]:
from typing import Union, List

# Any custom function must satistfy the following function signature:
# func(df:Union[pl.DataFrame, pl.LazyFrame], cols: List[str], ...) -> List[pl.Expr]
# where ... means kwargs
# Here is a custom imputer

def smallest_abs_impute(df:Union[pl.DataFrame, pl.LazyFrame], cols: List[str], epsilon:float = 0.01) -> List[pl.Expr]:
    """
    Imputes columns by the min of the absolute values for c in columns, plus epsilon.
    """
    temp = df.lazy().select(pl.col(cols).abs().min() + epsilon).collect().row(0)
    return [pl.col(c).fill_null(m) for c, m in zip(cols, temp)]


In [None]:
bp = (
    Blueprint(df, name = "example", target = "approved")
    .lowercase() # lowercase all columns
    .append_fit_func(smallest_abs_impute, ["var1", "existing_emi", "loan_amount"], epsilon = 0.5)
    # Use append_fit_func for custom transforms
)
# Notice that the value to impute is correct, it is 0.5, because the min abs of the columns are 0.
pipe:Pipeline = bp.materialize()
pipe

In [None]:
pipe.transform(df).null_count().select(["var1", "existing_emi", "loan_amount"])

# Scriptable Steps in Pipeline

What is a scriptable step? It means we can encode steps in a json or yaml file easily. As long as we can turn the text into 
a valid Python dictionary, the step (the transformation) can be defined.

In [None]:
df.select(
    pl.col("Existing_EMI").null_count() 
)

In [None]:
bp = Blueprint(df, name = "example", target = "approved")

# Takes in a dict with 3 fields: `name`, `args`, and `kwargs`. 
# Args and kwargs are optional depending on whether the method call needs certain arguments.
step_dict_1 = {
    "name": "impute",
    "kwargs": {"cols": ["Existing_EMI"], "method": "median"} 
}

step_dict_2 = {
    "name": "does_not_exist",
    "kwargs": {"test": 1} 
}

# filter_step = {
#     "name": "filter",
#     "args": ["Employer_Category1 is not null"]
# }

bp.append_step_from_dict(
    step_dict_1
)

# .append_step_from_dict(
#     filter_step
# )

# bp.append_step_from_dict(step_dict_2) # Will error
pipe = bp.materialize()
# 
df_transformed = pipe.transform(df)
df_transformed.select(
    pl.col("Existing_EMI").null_count() # Imputed. So 0
)

# Custom Transforms as a Scriptable Step in Pipeline

You need to inherit the blueprint class. Once the blueprint is materialized (learned). You do not need this class any more, because the "learned" info should all be encoded as Polars expressions

In [None]:
from polars_ds.pipeline import Blueprint, FitStep
from typing import Self, Union, List
from functools import partial

def smallest_abs_impute(df:Union[pl.DataFrame, pl.LazyFrame], cols: List[str], epsilon:float = 0.01) -> List[pl.Expr]:
    """
    Imputes columns by the min of the absolute values for c in columns, plus epsilon.
    """
    temp = df.lazy().select(pl.col(cols).abs().min() + epsilon).collect().row(0)
    return [pl.col(c).fill_null(m).name.suffix("_imputed") for c, m in zip(cols, temp)]

class ExtendedBlueprint(Blueprint):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def smallest_abs_impute(self, cols: List[str], epsilon:float = 0.01) -> Self:
        # bind all arguments, except df and cols.
        # If you don't want to use partial from functool, you can define an inner function
        partial_func = partial(smallest_abs_impute, epsilon=epsilon)
        self._steps.append(
            FitStep(partial_func, cols, self.exclude)
        )
        return self

    def smallest_abs_impute2(self, cols: List[str], epsilon:float = 0.01) -> Self:
        # bind all arguments, except df and cols.
        # Example of using an inner function
        def inner_func(df:Union[pl.DataFrame, pl.LazyFrame], cols: List[str]) -> List[pl.Expr]:
            temp = df.lazy().select(pl.col(cols).abs().min() + epsilon).collect().row(0)
            return [pl.col(c).fill_null(m).name.suffix("_imputed2") for c, m in zip(cols, temp)]

        self._steps.append(
            FitStep(inner_func, cols, self.exclude)
        )
        return self


In [None]:
bp = ExtendedBlueprint(df, name = "example", target = "approved")

# Takes in a dict with 3 fields: `name`, `args`, and `kwargs`. 
# Args and kwargs are optional depending on whether the method call needs certain arguments.
step_dict_1 = {
    "name": "smallest_abs_impute",
    "kwargs": {"cols": ["Existing_EMI"], "epsilon": 0.01} 
}

step_dict_2 = {
    "name": "smallest_abs_impute2",
    "kwargs": {"cols": ["Existing_EMI"], "epsilon": 0.01} 
}

bp.append_step_from_dict(
    step_dict_1
).append_step_from_dict(
    step_dict_2
)

pipe = bp.materialize()
df_transformed = pipe.transform(df)

df_transformed.with_columns(
    impute_value = pl.col("Existing_EMI").abs().min() + 0.01
).filter(
    pl.col("Existing_EMI").is_null()
).select(
    pl.col("Existing_EMI"),
    pl.col("Existing_EMI_imputed"),
    pl.col("Existing_EMI_imputed2"),
    pl.col("impute_value")
)