Take car dataset as an example. We briefly introduce how to build and train an ML pipeline, and how to translate the trained pipeline into pure SQL, to execute the pipeline in databases natively.

#### Environment set-up

In [1]:
# Install dependencies
%pip install -r requirements.txt

# Add project root path to Python path
import os
os.environ['PYTHONPATH'] = os.getcwd()

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
[0mNote: you may need to restart the kernel to use updated packages.


#### Build a pipeline using craftsman `training_helper` tool and train the pipeline

Import moudles

In [10]:
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import Pipeline

import pandas as pd
from craftsman.utility.loader import save_model
from craftsman.base.defs import OperatorName, ModelName
import craftsman.base.defs as defs
from craftsman.utility.training_helper import *

Define model path and load data

In [11]:
train_data_path = "./dataset/Car_price/train.csv"
pipeline_save_path = "./car_dt6.joblib"
test_data_path = "./dataset/Car_price/test.csv"

# load dataset
data = pd.read_csv(train_data_path)
y = data["Price"]
X = data.drop(["Price"], axis=1)

Define features to fit ML Model

In [12]:
columns = X.columns.tolist()

ordinal_cols = ['Owner_Type']
binary_cols = ['Location','Fuel_Type','Transmission','Name']
kbins_cols = ['Year','Kilometers_Driven','Engine','Power','Mileage','Seats']
count_cols = ['Brand']

all_cols =  ordinal_cols + binary_cols + kbins_cols + count_cols
X = X[all_cols]

Define pipline, the following are some important points:
1. Replace scikit-learn's `ColumnTransformer` with `CraftsmanColumnTransformer`. The `CraftsmanColumnTransformer` has an additional input parameter `input_data`, which is the training data transformed by the previous `CraftsmanColumnTransformer`.
2. Preprocessing operators are partly wrapped by Craftsman, including `CraftsmanSimpleImputer`, `CraftsmanLabelEncoder`, `CraftsmanTargetEncoder`, `CraftsmanCatBoostEncoder`, `CraftsmanKBinsDiscretizer`, `CraftsmanOrdinalEncoder`, `CraftsmanCountEncoder`, `CraftsmanLeaveOneOutEncoder`, `CraftsmanOneHotEncoder`, `CraftsmanBinaryEncoder`, etc.; while others directly use scikit-learn preprocessing operators: `StandardScaler`, `MinMaxScaler`, `RobustScaler`, etc. (Refer to /craftsman/utility/training_helper.py for details).
3. Models are used directly from scikit-learn, supporting `DecisionTreeClassifier`, `DecisionTreeRegressor`, `RandomForestClassifier`, `RandomForestRegressor`, `LinearRegressor`, `LogisticRegressor`.
4. The definition logic of `CraftsmanColumnTransformer` is shown in the figure below, where each `CraftsmanColumnTransformer` is considered as a step.
5. Data transformation needs to be performed between each step, i.e., calling the `fit_transform()` function of the previous step.

![craftsman_pipeline_define.jpg](craftsman_pipeline_define.jpg "/craftsman_pipeline_define.jpg")

In [13]:
# define preprocessors
type_categories = ["First","Second","Third","Fourth & Above"]
ordinal_encoder = CraftsmanOrdinalEncoder(categories=[type_categories])
kbins = CraftsmanKBinsDiscretizer(encode="ordinal",n_bins=15)
binary_encoder = CraftsmanBinaryEncoder()
imputer = CraftsmanSimpleImputer(strategy="most_frequent")
count_encoder = CraftsmanCountEncoder()
minmax_scaler = MinMaxScaler()

# define model
dt = DecisionTreeRegressor(max_depth=6,random_state=24)

# define steps
X_copy = X.copy()

X_copy = imputer.fit_transform(X_copy)

transformer1 = CraftsmanColumnTransformer(
    remainder="passthrough",
    transformers=[
        (
            OperatorName.ORDINALENCODER.value,
            ordinal_encoder,
            ordinal_cols,
        ),
        (
            OperatorName.BINARYENCODER.value,
            binary_encoder,
            binary_cols,
        ),
        (
            OperatorName.KBINSDISCRETIZER.value,
            kbins,
            kbins_cols,
        ),
          (
            OperatorName.COUNTENCODER.value,
            count_encoder,
            count_cols,
        ),
    ],
    input_data=X_copy
)

X_copy = transformer1.fit_transform(X_copy,y)

transformer2 = CraftsmanColumnTransformer(
    remainder="passthrough",
    transformers=[
        (
            OperatorName.MINMAXSCALER.value,
            minmax_scaler,
            ['Year','Kilometers_Driven','Engine'],
        )
    ],
    input_data=X_copy
)

# compose pipline
pipeline = Pipeline(
    steps=[
        ('Imputer', imputer),
        ("step2", transformer1),
        ("step3", transformer2),        
        (ModelName.DECISIONTREEREGRESSOR.value, dt)
    ]
)



Add some config informations to pipelines

In [14]:
# which database dialect
pipeline.data_rows = len(X)
# training dataset amount
defs.DBMS = 'duckdb'

Fit and save the trained pipeline to local directory as a joblib file

In [15]:
# train model
pipeline.fit(X, y)

# save model to the file
save_model(pipeline, pipeline_save_path)
print(f'Pipeline has been saved at: {pipeline_save_path}')

# test model
data_test = pd.read_csv(test_data_path)
y_test = data_test["Price"]
X_test = data_test.drop("Price", axis=1)
X_test = X_test[all_cols]

# evaluate the test result
y_predict = pipeline.predict(X_test)

print(f"Model's RMSE: {mean_squared_error(y_predict,(y_test))}")



Pipeline has been saved at: ./car_dt6.joblib
Model's RMSE: 19.864693159722353


### Generate pure SQL query for the trained pipeline using craftsman `transform_manager` tool

Parameters for manager.generate_query:
- `pre_sql`: other SQL statements, like 'EXPLAIN ANALYZE'
- `group`: the algorithm of the graph selection, 'enum' (i.e., Enumeration-based algorithm) / 'prune' (i.e., Greedy-based algorithm)
- `max_process_num`: the parallelism of the Enumeration-based algorithm
- `pipeline_save_path`: trained pipeline file path

In [8]:
import time
from craftsman.transformer_manager import TransformerManager
import os

manager = TransformerManager()
table_name = "car_price"
dbms = 'duckdb'
pre_sql = "EXPLAIN ANALYZE "
group = 'prune'

t1 = time.time()
query = manager.generate_query(
    pipeline_save_path,
    table_name,
    dbms,
    pre_sql=pre_sql,
    group=group,
    cost_model='craftsman'
)
t2 = time.time()
print(f'total compile time: {(t2-t1):.2f}s')


prune plan num: 24
prune sql generate time: 0.0011 s
total compile time: 0.68s


Save the generated sql file

In [9]:
generated_file_path = f"./tutorial_{group}.sql"
with open(generated_file_path, "w") as sql_file:
    sql_file.write(query)
print(f'Generate SQL file have been saved at: {generated_file_path}')

Generate SQL file have been saved at: ./tutorial_prune.sql
