## Overview

Kubeflow is a popular open-source framework for making it easy to develop and deploy machine learning pipelines on Kubernetes. In this tutorial we'll create a three step Kubeflow Pipeline that reads data from Vantage, trains a model and then deploys the model to Vantage.  We will then use that model to create another pipeline that scores new data from a table.

## Prerequisites

* GCP account
* Vantage Express installed on GCP with BYOM installed

## Setting up Vantage and loading data

1. Setup a Vantage instance - Follow the [Run Vantage Express on GCP](https://quickstarts.teradata.com/vantage.express.gcp.html) how-to to get Vantage setup. Make sure to follow the instructions to open the VM up to the Internet.

2. Download sample data - we'll use the Boston Housing dataset which can be downloaded from [Kaggle](https://www.kaggle.com/datasets/vikrishnan/boston-house-prices?resource=download). Upload the dataset into a Cloud Storage bucket.

3. Load training data to Vantage - we'll create a table called `housing`. The example below uses a table in `mldb` database. Make sure you replace `LOCATION('/gs/storage.googleapis.com/vantage_express/boston_housing.csv')` with a string that points to your bucket.

    ``` SQL
    CREATE FOREIGN TABLE mldb.housing_foreign
        USING ( LOCATION('/gs/storage.googleapis.com/vantage_express/boston_housing.csv') );

    CREATE MULTISET TABLE
        mldb.housing(CRIM, ZN, INDUS, CHAS, NOX, RM, AGE, DIS, RAD, TAX, PTRATIO, B, LSTAT, MEDV)
    AS (
        SELECT CRIM, ZN, INDUS, CHAS, NOX, RM, AGE, DIS, RAD, TAX, PTRATIO, B, LSTAT, MEDV FROM mldb.housing_foreign
    ) WITH DATA 
    NO PRIMARY INDEX;
    ```

4. Load sample input data - For this tutorial we need a table with some new data that we want to score with our model. Use `teradatasql` to execute the following SQL on your Vantage instance - replace the IP addr with the IP addr of your Vantage instance.

    ``` Python
    import teradatasql

    with teradatasql.connect (host="34.71.35.124", user="mldb", password="mldb") as con:
        with con.cursor () as cur:
            cur.execute ("CREATE SET TABLE demo_models (model_id VARCHAR (30), model BLOB) PRIMARY INDEX (model_id);")
            cur.execute ("CREATE SET TABLE test_housing (ID INTEGER, CRIM FLOAT, ZN FLOAT,INDUS FLOAT,CHAS INTEGER,NOX FLOAT,RM FLOAT, AGE FLOAT,DIS FLOAT, RAD INTEGER,TAX INTEGER,PTRATIO FLOAT,B FLOAT,LSTAT FLOAT) PRIMARY INDEX (CRIM);")
            cur.execute ("INSERT INTO test_housing (ID, CRIM, ZN, INDUS, CHAS, NOX, RM, AGE, DIS, RAD, TAX, PTRATIO, B, LSTAT) VALUES (1,.02,0.0,7.07,0,.46,6.4,78.9,4.9,2,242,17.8,396.9,9.14);")
    ```

## Set Up Kubeflow Pipelines on Google

In your Google Console select `AI Platform Pipelines` and click on `New Instance`, which will redirect you to the `Kubeflow Pipeline` page. On the `Kubeflow Pipeline` page click on `Configure`, which will open up a new page `Deploy Kubernetes Pipelines`.  On the `Deploy Kubernetes Pipelines` page select the region you want to use, you can leave `Network and Subnetwork` as default, and then click on `Create New Cluster` (you don't need to check the `allow access to cloud api` box). This will create a Kubernetes cluster with three VMs in the region you selected.  When the cluster is finished being set up, at the very bottom of the page click on Deploy (for this example you can leave all the other fields as default and not check any of the optional boxes.) You have to wait for the Kubernetes cluster to finish being created - if you click `Deploy` and it fails because the resources are not available, wait and try again.

Test that your Kubeflow Pipeline is up and running by creating a Kubeflow client.  Go to the `AI Platform Pipelines` and click on the `Settings` and copy the code to connect to the client. Replace the code below with the code copied from the `Pipelines Settings`.

In [1]:
import kfp
client = kfp.Client(host='https://2475fa1cdaee6bd-dot-us-central1.pipelines.googleusercontent.com')

## Create the first component

Now we are ready to create the components in the pipeline.
In this example we will create the following three components:
* read_data_from_vantage
    * input: ipaddr of the VM hosting Vantage
    * output: csv file with the data for training and testing
* train_model
    * input: csv file with data for training and testing
    * output: file containing the model
    * output: Metric artifact with model performance
* deploy_model
    * input: file containing the model

First, import the Kubeflow Pipeline dsl libraries.

In [10]:
import kfp.v2.dsl as dsl
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Dataset,
    Model,
    Metrics,
)

The first component reads data from a Vantage warehouse (see above and make sure you have set up Vantage Express in GCP including opening up a firewall to the VM so you can access Vantage from the Internet.)

The component connects to Vantage using the ipaddr passed as an input parameter, reads the rows from the table Iris.iris in Vantage and then outputs the data to an `Output[Dataset]`, which is essentially a temporary file used to pass data between components (see more about passing data between components [here](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/python-function-components/#passing-artifacts-by-file)).

The component uses the `teradatasql` driver. Each component is run in a separate container on Kubernetes so all import statements need to be done within the component. We have created a base image with `teradatasql` already installed. When you pass `base_image='teradata/python-teradatasql'` the component will use that image to create a container.

In [3]:
@component(base_image='teradata/python-teradatasql')
def read_data_from_vantage(
    ipaddr: str,
    output_file: Output[Dataset]
    ):
    import teradatasql
    
    with teradatasql.connect (host=ipaddr, user="mldb", password="mldb") as con:
        with con.cursor () as cur:
            sFileName = output_file.path
            cur.execute ("{fn teradata_write_csv(" + sFileName + ")}select * from mldb.housing order by 1")

## Create the train model component

Next we'll create a component to train a model with the training data.

The input into this component is the file from the previous component. The output is the file with the trained model using joblib.dump and a file with the test data.

The component will use scikit-learn and pandas so we need to pass `packages_to_install=['pandas==1.3.5','scikit-learn']` - this will tell Kubeflow to install the packages when the container is created.

In [5]:
@component(base_image='teradata/python-sklearn2pmml', packages_to_install=['pandas==1.3.5','scikit-learn','sklearn-pandas==1.5.0'])
def train_model(
    input_file : Input[Dataset],
    output_model: Output[Model],
    output_metrics: Output[Metrics]
):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.preprocessing import StandardScaler
    from sklearn import metrics
    from sklearn_pandas import DataFrameMapper
    import joblib
    from sklearn2pmml.pipeline import PMMLPipeline
    from sklearn2pmml import sklearn2pmml
 
    df = pd.read_csv(input_file.path)
    
    train, test = train_test_split(df, test_size = .33)
    train = train.apply(pd.to_numeric, errors='ignore')
    test = test.apply(pd.to_numeric, errors='ignore')
    
    features = train.columns.drop('MEDV')
    target = 'MEDV'
    
    pipeline = PMMLPipeline([
    ("mapping", DataFrameMapper([
    (['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT'], StandardScaler())
    ])),
    ("rfc", RandomForestRegressor(n_estimators = 100, random_state = 0))
    ])
    
    pipeline.fit(train[features], train[target])
    y_pred = pipeline.predict(test[features])

    metric_accuracy = metrics.mean_squared_error(y_pred,test[target])
    output_metrics.log_metric('accuracy', metric_accuracy)
    output_model.metadata['accuracy'] = metric_accuracy
        
    joblib.dump(pipeline, output_model.path)
  

## Create component to deploy model

The last component loads the model and tests it on the test data.

The `Output[Metrics]` allows us to visualize the model's performance.  We'll use the `output_metrics.log_metric` API to visualize the accuracy of the model in the pipelines run details.

In [6]:
@component(base_image='teradata/python-sklearn2pmml')
def deploy_model(
    input_model : Input[Model],
):
    
    import teradataml as tdml
    from teradataml import create_context
    import joblib
    from sklearn2pmml.pipeline import PMMLPipeline
    from sklearn2pmml import sklearn2pmml
    
    eng = create_context(host = "104.197.18.6" , username = 'mldb', password = 'mldb')
    
    pipeline = joblib.load(input_model.path)
    
    sklearn2pmml(pipeline, "test_local.pmml", with_repr = True)
        
    model_id = 'housing_rf'
    model_file = 'test_local.pmml'
    table_name = 'demo_models'
    
    tdml.configure.byom_install_location = "mldb"
    
    try:
        res = tdml.save_byom(model_id = model_id, model_file = model_file, table_name = table_name)
    except Exception as e:
        # if our model exists, delete and rewrite
        if str(e.args).find('TDML_2200') >= 1:
            res = tdml.delete_byom(model_id = model_id, table_name = table_name)
            res = tdml.save_byom(model_id = model_id, model_file = model_file, table_name = table_name)
            pass
        else:
            raise
    

## Create component to use the deployed model to score new data

The next component uses the `teradataml` driver to retrieve the saved model and score the rows in a table with new data.  The component uses the `PMMLPredict` function to score the newdata and returns a `teradataml` byom output that can be converted to a pandas DataFrame with `result.result.to_pandas()`.

In [7]:
@component(base_image='teradata/python-sklearn2pmml', packages_to_install=['pandas==1.3.5','scikit-learn'])
def test_model(
):
    import teradataml as tdml
    from teradataml import create_context
    from sklearn2pmml.pipeline import PMMLPipeline
    from sklearn2pmml import sklearn2pmml
    
    eng = create_context(host = "104.197.18.6" , username = 'mldb', password = 'mldb')
    
    #indicate the database that BYOM is using
    tdml.configure.byom_install_location = "mldb"
    
    tdf_test = tdml.DataFrame('test_housing')
    
    modeldata = tdml.retrieve_byom("housing_rf", table_name="demo_models")
    
    result = tdml.PMMLPredict(
            modeldata = modeldata,
            newdata = tdf_test,
            accumulate = ['ID']
            )

## Create a function for executing the pipeline

Finally, we'll create a function to execute each component in the pipeline.

In [8]:
@dsl.pipeline(
   name='test-vantage-pipeline',
   description='An example test pipeline that connects to Vantage.',
)
def run_vantage_pipeline(
   ipaddr: str
):
    data_file = read_data_from_vantage(ipaddr).output
    test_model_data = train_model(data_file)
    _deploy_model_op = deploy_model(test_model_data.outputs['output_model'])
    test_model().after(_deploy_model_op)


Now use the Kubeflow client to execute the pipeline.  Replace the `ipaddr` argument with the `ipaddress` of the VM hosting Vantage.
When the Experiment starts two links should appear: `Experiment details` and `Run details`. Click on `Run details` to follow the progress of your pipeline.


In [9]:
arguments = {'ipaddr' : "104.197.18.6" }
client.create_run_from_pipeline_func(
    run_vantage_pipeline,
    arguments=arguments,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)



RunPipelineResult(run_id=1c4cc82c-44fe-43d7-a2eb-23555a4dd1fb)

When the pipeline has completed running (each component in the graph should have a green check mark) .  You can click on each component to see more details. If you click on the train_model component and then click on Visualizations in the side window you will see the accuracy of the model (you can learn more about other metrics you can pass and visulation using the Metrics artifict [here](https://www.kubeflow.org/docs/components/pipelines/sdk/output-viewer/#introduction).)

## Create a new pipeline to score new data

The first component in the pipeline uses the teradatasql driver to execute a SQL query that retrieves the model from the `demo_model` table and scores the rows in the `test_housing` table.

In [7]:
@component(base_image='teradata/python-sklearn2pmml', packages_to_install=['pandas==1.3.5','scikit-learn'])
def score_new_data(
    model_name: str,
    model_table: str,
    data_table: str,
    prediction_table: str  
):
    import teradataml as tdml
    from teradataml import create_context
    from sklearn2pmml.pipeline import PMMLPipeline
    from sklearn2pmml import sklearn2pmml
    import teradatasql
        
    tdml.configure.byom_install_location = "mldb"
    
    with teradatasql.connect (host="34.71.35.124", user="mldb", password="mldb") as con:
        with con.cursor () as cur:
            cur.execute ("CREATE TABLE {fn " + prediction_table + "} AS (SELECT * FROM mldb.PMMLPredict ( ON test_housing ON (SELECT * FROM demo_models where model_id='housing_rf') DIMENSION USING Accumulate ('ID')) AS td ) WITH DATA;")

The `run_new_data_score` pipeline takes as parameters:
- `model_name`: ID of the model
- `model_table`: the name of the table storing the model
- `data_table`: the name of the table with new data to score
- `prediction_table`: the name of the table to store the scoring results
When the pipeline is executed the dashboard will provide fields to enter the values you want to use (the values have been hard-coded above with the appropriate values for this tutorial to avoid errors.)

In [8]:
@dsl.pipeline(
   name='new-data-pipeline',
   description='An example of a component that scores new data with a saved model.',
)
def run_new_data_score(
    model_name: str,
    model_table: str,
    data_table: str,
    prediction_table: str
):
    score_new_data(model_name,model_table,data_table,prediction_table)

To compile the pipeline run the following.  The output will save the package in a yaml file.  Download the yaml file to your local computer.

To run the pipeline in Kubeflow, go to the Pipelines and click on `Upload pipeline` in the top right hand corner. Fill out the fields and upload the yaml file.  Go to `Runs`, click on `Create run` and choose the pipeline you just created.

In [14]:
kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
    pipeline_func=run_new_data_score,
    package_path='score_new_data_pipeline_sql.yaml')

## Run the pipelines in Vertex AI pipelines

You can also run your Kubeflow pipelines in `Google Vertex AI Pipleline`.  

Compile the pipeline with the following code.  The output will save the package in a json file.

Go to `Vertex AI Piplelines` and click on `Create run`. Choose the json file, change the pipeline name and run name if you like, and click continue. You'll need a Cloud Storage bucket - enter the path to a Cloud Storage bucket and the pipeline parameters and click on `Submit`.

One of the nice benefits of running the pipeline in Vertex AI is that it is serveless - so Vertex AI will take care of creating and deleteing a cluster to run the pipeline.

In [17]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=run_new_data_score,
    package_path='score_new_data_pipeline_sql.json')

You can also compile the `run_vantage_pipline` above with the three components to load the data, train and deploy, and run that in Vertex AI.  Below is a slightly modified pipeline with the same components we created at the beginning of the notebook.

In [10]:
@dsl.pipeline(
   name='run-vantage-pipeline',
   description='An example test pipeline that connects to Vantage.',
)
def run_vantage_pipeline_vertex(
   ipaddr: str
):
    data_file = read_data_from_vantage(ipaddr).output
    test_model_data = train_model(data_file)
    deploy_model(test_model_data.outputs['output_model'])


Compile the pipeline with the following code and follow the same instrucations for creating a run in Vertex AI Piplines.

In [19]:
compiler.Compiler().compile(pipeline_func=run_vantage_pipeline_vertex,
    package_path='train_housing_pipeline.json')

## Cleanup

To stop incurring charges you need to delete the Vantage Express VM and the Kubeflow Pipeline. Go back to the list of Kubeflow Pipeline instances, select the instance you want to delete and then click on Delete (make sure to check the box to delete the Kubernetes cluster as well, this will delete the cluster's three compute engines.) Delete the Vantage Express VM by going to the list of Compute Engine instances and selecting the instance with Vantage Express and then click on Delete.