In [None]:
import pandas as pd
import numpy as np

# Read some customer data from the Aqueduct repo.
customers_table = pd.read_csv(
    "https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/customers.csv"
)
churn_table = pd.read_csv(
    "https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/churn_data.csv"
)
pd.merge(customers_table, churn_table, on="cust_id").head()

In [None]:
from zenml.steps import step

In [None]:
# The @op decorator here allows Aqueduct to run this function as
# a part of an Aqueduct workflow. It tells Aqueduct that when
# we execute this function, we're defining a step in the workflow.
# While the results can be retrieved immediately, nothing is
# published until we call `publish_flow()` below.
@step
def log_featurize(cust: pd.DataFrame) -> pd.DataFrame:
    """
    log_featurize takes in customer data from the Aqueduct customers table
    and log normalizes the numerical columns using the numpy.log function.
    It skips the cust_id, using_deep_learning, and using_dbt columns because
    these are not numerical columns that require regularization.

    log_featurize adds all the log-normalized values into new columns, and
    maintains the original values as-is. In addition to the original company_size
    column, log_featurize will add a log_company_size column.
    """
    features = cust.copy()
    skip_cols = ["cust_id", "using_deep_learning", "using_dbt"]

    for col in features.columns.difference(skip_cols):
        features["log_" + col] = np.log(features[col] + 1.0)

    return features.drop(columns="cust_id")

In [None]:
# Calling `.local()` on an @op-annotated function allows us to execute the
# function locally for testing purposes. When a function is called with
# `.local()`, Aqueduct does not capture the function execution as a part of
# the definition of a workflow.
features_table = log_featurize.entrypoint(customers_table)
features_table.head()

### Training the Model

In this example, we will train and ensemble two basic classifiers.  In practice, would probably do something more interesting but this will help illustrate post-processing logic (the ensemble function).

In [None]:
import sklearn
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier

In [None]:
@step
def data_loader() -> pd.DataFrame:
    # Read some customer data from the Aqueduct repo.
    customers_table = pd.read_csv(
        "https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/customers.csv"
    )
    
    return customers_table

In [None]:
@step
def label_loader() -> pd.DataFrame:
    # Read some customer data from the Aqueduct repo.
    churn_table = pd.read_csv(
        "https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/churn_data.csv"
    )
    
    return churn_table

In [None]:
import mlflow
from zenml.client import Client

experiment_tracker = Client().active_stack.experiment_tracker

Instead of training static models like in our previous notebook, let's include the training steps into the pipeline itself and deploy the trained models using ZenML's MLflow integration!

In [None]:
@step(enable_cache=False, experiment_tracker=experiment_tracker.name)
def train_linear_model(
    feature_table: pd.DataFrame,
    churn_table: pd.DataFrame
) -> sklearn.linear_model.LogisticRegression:
    mlflow.sklearn.autolog()  # log all model hparams and metrics to MLflow
    linear_model = LogisticRegression(max_iter=10000)
    linear_model.fit(features_table, churn_table["churn"])
    return linear_model

In [None]:
@step(enable_cache=False, experiment_tracker=experiment_tracker.name)
def train_decision_tree_model(
    feature_table: pd.DataFrame,
    churn_table: pd.DataFrame
) -> sklearn.tree.DecisionTreeClassifier:
    mlflow.sklearn.autolog()  # log all model hparams and metrics to MLflow
    decision_tree_model = DecisionTreeClassifier(max_depth=10, min_samples_split=3)
    decision_tree_model.fit(features_table, churn_table["churn"])
    return decision_tree_model

In [None]:
@step
def deployment_trigger() -> bool:
    return True

In [None]:
from zenml.pipelines import pipeline

In [None]:
@pipeline(enable_cache=False)
def churn_training_pipeline(
    data_loader,
    label_loader,
    log_featurize,
    train_linear,
    train_tree,
    deployment_trigger,
    model_deployer_linear,
    model_deployer_tree,
):
    customers_table = data_loader()
    churn_table = label_loader()
    features_table = log_featurize(customers_table)
    linear_model = train_linear(features_table, churn_table)
    tree_model = train_tree(features_table, churn_table)
    trigger = deployment_trigger()
    model_deployer_linear(trigger, linear_model)
    model_deployer_tree(trigger, tree_model)

In [None]:
from zenml.integrations.mlflow.steps import mlflow_model_deployer_step

In [None]:
model_deployer_linear_step = mlflow_model_deployer_step()
model_deployer_linear_step.configure(name='mlflow_linear_model_deployer')
model_deployer_tree_step = mlflow_model_deployer_step()
model_deployer_tree_step.configure(name='mlflow_tree_model_deployer')

In [None]:
churn_training_pipeline(
    data_loader=data_loader(),
    label_loader=label_loader(),
    log_featurize=log_featurize(),
    train_linear=train_linear_model(),
    train_tree=train_decision_tree_model(),
    deployment_trigger=deployment_trigger(),
    model_deployer_linear=model_deployer_linear_step,
    model_deployer_tree=model_deployer_tree_step,
).run()

Now that we've built a training pipeline, let's build an inference pipeline that
uses the linear model we deployed in the training pipeline above to generate predictions.

In [None]:
from zenml.services import BaseService

@step(enable_cache=False)
def linear_model_service_loader() -> BaseService:
    client = Client()
    model_deployer = client.active_stack.model_deployer
    services = model_deployer.find_model_server(
        pipeline_name="churn_training_pipeline",
        pipeline_step_name="mlflow_linear_model_deployer",
        running=True,
    )
    service = services[0]
    return service

In [None]:
from zenml.steps import Output

@step
def predictor(
    service: BaseService,
    features_table: pd.DataFrame,
) -> Output(predictions=list):
    """Run a inference request against a prediction service"""
    service.start(timeout=10)  # should be a NOP if already started
    print(f"Input is: {[features_table.to_numpy()]}")
    prediction = service.predict(features_table.to_numpy())
    print(f"Prediction is: {[prediction.tolist()]}")
    return [prediction.tolist()]

In [None]:
@pipeline(enable_cache=False)
def churn_inference_pipeline(
    data_loader,
    log_featurize,
    linear_model_service_loader,
    predictor,
):
    customers_table = data_loader()
    features_table = log_featurize(customers_table)
    service = linear_model_service_loader()
    predictor(service, features_table)

In [None]:
churn_inference_pipeline(
    data_loader=data_loader(),
    log_featurize=log_featurize(),
    linear_model_service_loader=linear_model_service_loader(),
    predictor=predictor(),
).run()

Lastly, remember we added MLflow experiment tracking (`mlflow.sklearn.autolog()`) to our training steps before? Those two simple lines of code automatically configured and initialized MLflow and logged all hyperparameters and metrics there.

Let's start up the MLflow UI and check it out!

In [None]:
from zenml.environment import Environment
from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri


def open_mlflow_ui(port=4997):
    if Environment.in_google_colab():
        from pyngrok import ngrok

        public_url = ngrok.connect(port)
        print(f"\x1b[31mIn Colab, use this URL instead: {public_url}!\x1b[0m")

    !mlflow ui --backend-store-uri="{get_tracking_uri()}" --port={port}


open_mlflow_ui()