## MicroModelling with STOs

Very simple example of how to use STOs in combination with the new tdextensions library to create MicroModels (a model per data partition). The example is contrived and the dataset is a synthetic dataset generated to have identical partition distributions etc. 

This is now how the real world is but we will be expanding this example over the coming months to show you how to deal with unbalanced partitions and partitions where there is not enough data etc. 

Note that the methods from the tdextensions library will be included in teradataml at the end of Q1 2021.


In [3]:
from teradataml.dataframe.dataframe import DataFrame
from teradataml import create_context
import getpass

engine = create_context(host="3.238.151.85", username="AOA_DEMO", password=getpass.getpass("password"))

password ··········


### Training a Model per Partittion

We will use the `map_partition` function from tdextensions to seamlessly "push down" the python code for training a model based on the data in a given partition. An independent model is trained for each partition and output as a row in a models table per model version and partition id. We also record the training metadata relevant to that partition which allows us to track the result of hyper-parameter optimization or anything else which may be model specific.

Behind the scenes, this actually generates and installs an STO file. The big difference here is from the user experience perspective as the user can work "natively" with python to achieve their goals. 

In [5]:
from teradataml import create_context
from teradataml.dataframe.dataframe import DataFrame
from tdextensions.distributed import DistDataFrame, DistMode
from sklearn.preprocessing import RobustScaler,OneHotEncoder
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from aoa.sto.util import save_metadata, cleanup_cli

import os
import numpy as np
import json
import base64
import dill
import uuid

model_version = str(uuid.uuid4())
hyperparams = {
    "max_depth": 5
}
data_conf = {
    "table": "STO_SYNTHETIC_TRAIN_V"
}

def train_partition(partition, model_version, hyperparams):
    numeric_features = ["X"+str(i) for i in range(1,10)]
    for i in numeric_features:
        partition[i] = partition[i].astype("float")

    numeric_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", RobustScaler()),
        ("pca",PCA(0.95))
    ])

    categorical_features = ["flag"]
    for i in categorical_features:
        partition[i] = partition[i].astype("category")

    categorical_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="constant", fill_value=0)),
        ("onehot", OneHotEncoder(handle_unknown="ignore"))])

    preprocessor = ColumnTransformer(transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)])

    features = numeric_features + categorical_features
    pipeline = Pipeline([("preprocessor", preprocessor),
                         ("rf", RandomForestRegressor(max_depth=hyperparams["max_depth"]))])
    pipeline.fit(partition[features], partition[['Y1']])
    pipeline.features = features

    partition_id = partition.partition_ID.iloc[0]
    artefact = base64.b64encode(dill.dumps(pipeline))

    # record whatever partition level information you want like rows, data stats, explainability, etc
    partition_metadata = json.dumps({
        "num_rows": partition.shape[0],
        "hyper_parameters": hyperparams
    })

    return np.array([[partition_id, model_version, partition.shape[0], partition_metadata, artefact]])

print("Starting training...")

query = "SELECT * FROM {table} WHERE fold_ID='train'".format(table=data_conf["table"])
df = DistDataFrame(query=query, dist_mode=DistMode.STO, sto_id="model_train")
model_df = df.map_partition(lambda partition: train_partition(partition, model_version, hyperparams),
                            partition_by="partition_id",
                            returns=[["partition_id", "VARCHAR(255)"],
                                     ["model_version", "VARCHAR(255)"],
                                     ["num_rows", "BIGINT"],
                                     ["partition_metadata", "CLOB"],
                                     ["model_artefact", "CLOB"]])
# materialize as we reuse result
model_df = DataFrame(model_df._table_name, materialize=True)

# append to models table
model_df.to_sql("aoa_sto_models", if_exists="append")

print("Finished training")


Starting training...
Finished training


In [8]:
model_df.head(5)

  partition_id                         model_version  num_rows                                 partition_metadata                                     model_artefact
0           12  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {"num_rows": 7999, "hyper_parameters": {"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...
1            2  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {"num_rows": 7999, "hyper_parameters": {"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...
2           14  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {"num_rows": 7999, "hyper_parameters": {"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...
3           10  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {"num_rows": 7999, "hyper_parameters": {"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...
4            1  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {"num_rows": 7999, "hyper_parameters": {"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...

### Evaluating the MicroModels

After training, we need to evaluate the models trained on each partition. We will need to do this multiple times, on different datasets if we want to track the performance of the model against existing and new labelled data. 

The following example also uses the tdextensions `map_partition` to achieve this. It calculates and returns metrics per partition which we record against the model version. We also support calculating the average metric value across all partitions (the aoa does this automatically) based on metrics which are aggregatable. 


In [9]:
from sklearn import metrics
from aoa.sto.util import save_metadata, save_evaluation_metrics

data_conf = {
    "table": "STO_SYNTHETIC_TEST_V"
}


def eval_partition(partition):
    model_artefact = partition.loc[partition['n_row'] == 1, 'model_artefact'].iloc[0]
    model = dill.loads(base64.b64decode(model_artefact))

    X_test = partition[model.features]
    y_test = partition[['Y1']]

    y_pred = model.predict(X_test)

    partition_id = partition.partition_ID.iloc[0]

    # record whatever partition level information you want like rows, data stats, metrics, explainability, etc
    partition_metadata = json.dumps({
        "num_rows": partition.shape[0],
        "metrics": {
            "MAE": "{:.2f}".format(metrics.mean_absolute_error(y_test, y_pred)),
            "MSE": "{:.2f}".format(metrics.mean_squared_error(y_test, y_pred)),
            "R2": "{:.2f}".format(metrics.r2_score(y_test, y_pred))
        }
    })

    return np.array([[partition_id, partition.shape[0], partition_metadata]])

# we join the model artefact to the 1st row of the data table so we can load it in the partition
query = f"""
SELECT d.*, CASE WHEN n_row=1 THEN m.model_artefact ELSE null END AS model_artefact 
    FROM (SELECT x.*, ROW_NUMBER() OVER (PARTITION BY x.partition_id ORDER BY x.partition_id) AS n_row FROM {data_conf["table"]} x) AS d
    LEFT JOIN aoa_sto_models m
    ON d.partition_id = m.partition_id
    WHERE m.model_version = '{model_version}'
"""

df = DistDataFrame(query=query, dist_mode=DistMode.STO, sto_id="model_eval")
eval_df = df.map_partition(lambda partition: eval_partition(partition),
                           partition_by="partition_id",
                           returns=[["partition_id", "VARCHAR(255)"],
                                    ["num_rows", "BIGINT"],
                                    ["partition_metadata", "CLOB"]])

# materialize as we reuse result
eval_df = DataFrame(eval_df._table_name, materialize=True)

#save_metadata(eval_df)
#save_evaluation_metrics(eval_df, ["MAE", "MSE", "R2"])

print("Finished evaluation")


Finished evaluation


In [10]:
eval_df.head()

  partition_id  num_rows                                 partition_metadata
0           12      2001  {"num_rows": 2001, "metrics": {"MAE": "1.91", ...
1            2      2001  {"num_rows": 2001, "metrics": {"MAE": "0.75", ...
2           29      2001  {"num_rows": 2001, "metrics": {"MAE": "0.99", ...
3            4      2001  {"num_rows": 2001, "metrics": {"MAE": "0.83", ...
4            7      2001  {"num_rows": 2001, "metrics": {"MAE": "1.43", ...
5            9      2001  {"num_rows": 2001, "metrics": {"MAE": "0.68", ...
6            6      2001  {"num_rows": 2001, "metrics": {"MAE": "1.74", ...
7           14      2001  {"num_rows": 2001, "metrics": {"MAE": "0.71", ...
8           10      2001  {"num_rows": 2001, "metrics": {"MAE": "0.61", ...
9            1      2001  {"num_rows": 2001, "metrics": {"MAE": "0.83", ...

### Scoring 

You have already seen how to perform scoring based on the evaluation code. However, we can also separate the scoring logic into its own simple function which can be called independently by a scheduler or a user on-demand. 

When this code is executed it stores the model code in TD and this allows it to be called later via pure SQL for example instead of via the python code shown here. This provides extra flexiblity.

Note that the AOA can automatically schedule the scoring logic show below to exeucte at some defined interval. 

In [12]:

data_conf = {
    "table": "STO_SYNTHETIC_TEST_V",
    "predictions": "STO_SYNTHETIC_PREDICTIONS"
}

def score_partition(partition):
    model_artefact = partition.loc[partition['n_row'] == 1, 'model_artefact'].iloc[0]
    model = dill.loads(base64.b64decode(model_artefact))

    X = partition[model.features]

    return model.predict(X)

# we join the model artefact to the 1st row of the data table so we can load it in the partition
query = f"""
SELECT d.*, CASE WHEN n_row=1 THEN m.model_artefact ELSE null END AS model_artefact 
    FROM (SELECT x.*, ROW_NUMBER() OVER (PARTITION BY x.partition_id ORDER BY x.partition_id) AS n_row FROM {data_conf["table"]} x) AS d
    LEFT JOIN aoa_sto_models m
    ON d.partition_id = m.partition_id
    WHERE m.model_version = '{model_version}'
"""

df = DistDataFrame(query=query, dist_mode=DistMode.STO, sto_id="my_model_score")
scored_df = df.map_partition(lambda partition: score_partition(partition),
                             partition_by="partition_id",
                             returns=[["prediction", "VARCHAR(255)"]])

scored_df.to_sql(data_conf["predictions"], if_exists="append")