In [None]:
%pip install -r requirements.txt

In [None]:
import kfp
from google.cloud import aiplatform as aip
import google_cloud_pipeline_components.v1.bigquery as bqop

# Set your env variable

In [None]:
PREFIX = 'your-prefix'
PROJECT_ID = 'your-project-id'
LOCATION = 'US'
REGION = 'us-central1'
PIPELINE_NAME = 'bqml-vertex-pipeline'
MODEL_NAME = 'bqml-model'
EXPERIMENT_NAME = 'bqml-experiment'
ENDPOINT_DISPLAY_NAME = 'bqml-endpoint'

SERVICE_ACCOUNT = f"vertex-sa@{PROJECT_ID}.iam.gserviceaccount.com"
PIPELINE_ROOT = f"gs://{PREFIX}-data"
DATASET = "{}_data".format(PREFIX.replace("-","_")) 

# Vertex Pipeline Definition

In the following code block we are defining our Vertex AI pipeline. It is made up of three main steps:
1. Create a BigQuery dataset which will contains the BQ ML models
2. Train the BQ ML model, in this case a logistic regression
3. Evaluate the BQ ML model with the standard evaluation metrics

The pipeline takes as input the following variables:
- ```model_name```: the display name of the BQ ML model
- ```split_fraction```: the percentage of data that will be used as evaluation dataset
- ```evaluate_job_conf```: bq dict configuration to define where to store evalution metrics
- ```dataset```: name of dataset where the artifacts will be stored
- ```project_id```: the project id where the GCP resources will be created
- ```location```: BigQuery location

In [None]:
with open("sql/train.sql") as file:
    train_query = file.read()

with open("sql/features.sql") as file:
    features_query = file.read()


@kfp.dsl.pipeline(name='bqml-pipeline', pipeline_root=PIPELINE_ROOT)
def pipeline(
        model_name: str,
        split_fraction: float,
        evaluate_job_conf: dict, 
        dataset: str = DATASET,
        project_id: str = PROJECT_ID,
        location: str = LOCATION,
        ):

    create_dataset = bqop.BigqueryQueryJobOp(
        project=project_id,
        location=location,
        query=f'CREATE SCHEMA IF NOT EXISTS {dataset}'
    )

    create_features_table = bqop.BigqueryQueryJobOp(
        project=project_id,
        location=location,
        query=features_query.format(dataset=dataset, project_id=project_id),
        #job_configuration_query = {"writeDisposition": "WRITE_TRUNCATE"} #, "destinationTable":{"projectId":project_id,"datasetId":dataset,"tableId":"ecommerce_abt_table"}} #{"destinationTable":{"projectId":"project_id","datasetId":dataset,"tableId":"ecommerce_abt_table"}}, #"writeDisposition": "WRITE_TRUNCATE", 

    ).after(create_dataset)

    create_bqml_model = bqop.BigqueryCreateModelJobOp(
        project=project_id,
        location=location,
        query=train_query.format(model_type = 'LOGISTIC_REG'
           , project_id = project_id
           , dataset = dataset
           , model_name = model_name
           , split_fraction=split_fraction)
    ).after(create_features_table)

    evaluate_bqml_model = bqop.BigqueryEvaluateModelJobOp(
        project=project_id,
        location=location,
        model=create_bqml_model.outputs["model"],
        job_configuration_query=evaluate_job_conf
    ).after(create_bqml_model)


# this is to compile our pipeline and generate the json description file
kfp.v2.compiler.Compiler().compile(pipeline_func=pipeline,
        package_path=f'{PIPELINE_NAME}.json')    

# Create Experiment

We will create an experiment in order to keep track of our trainings and tasks on a specific issue or problem.

In [None]:
my_experiment = aip.Experiment.get_or_create(
    experiment_name=EXPERIMENT_NAME,
    description='This is a new experiment to keep track of bqml trainings',
    project=PROJECT_ID,
    location=REGION
    )

# Running the same training pipeline with different parameters

One of the main tasks during the training phase is to compare different models or to try the same model with different inputs. We can leverage the power of Vertex Pipelines in order to submit the same steps with different training parameters. Thanks to the experiments artifact it is possible to easily keep track of all the tests that have been done. This simplifies the process to select the best model to deploy.

In this demo case, we will run the same training pipeline while changing the data split percentage between training and test data.

In [None]:
# this configuration is needed in order to persist the evaluation metrics on big query
job_configuration_query = {"destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET}, "writeDisposition": "WRITE_TRUNCATE"}

for split_fraction in [0.1, 0.2]:
    job_configuration_query['destinationTable']['tableId'] = MODEL_NAME+'-fraction-{}-eval_table'.format(int(split_fraction*100))
    pipeline = aip.PipelineJob(
        parameter_values = {'split_fraction':split_fraction, 'model_name':  MODEL_NAME+'-fraction-{}'.format(int(split_fraction*100)), 'evaluate_job_conf': job_configuration_query },
        display_name=PIPELINE_NAME,
        template_path=f'{PIPELINE_NAME}.json',
        pipeline_root=PIPELINE_ROOT,
        enable_caching=True
        
    )

    pipeline.submit(service_account=SERVICE_ACCOUNT, experiment=my_experiment)

# Deploy the model to an endpoint

Thanks to the integration of Vertex Endpoint, it is very straightforward to create a live endpoint to serve the model which we prefer.

In [None]:
# get the model from the Model Registry 
model = aip.Model(model_name='levelup_model_name-fraction-10')

# let's create a Vertex Endpoint where we will deploy the ML model
endpoint = aip.Endpoint.create(
    display_name=ENDPOINT_DISPLAY_NAME,
    project=PROJECT_ID,
    location=REGION,
)

In [None]:
# deploy the BQ ML model on Vertex Endpoint
# have a coffe - this step can take up 10/15 minutes to finish
model.deploy(endpoint=endpoint, deployed_model_display_name='bqml-deployed-model')

: 

In [None]:
# Let's get a prediction from new data
inference_test = {
    'postal_code': '97700-000',
    'number_of_successful_orders': 0,
    'city': 'Santiago',
    'sum_previous_orders': 1,
    'number_of_unsuccessful_orders': 0,
    'day_of_week': 'WEEKDAY',
    'traffic_source': 'Facebook',
    'browser': 'Firefox',
    'hour_of_day': 20}

In [None]:
my_prediction = endpoint.predict([inference_test])

my_prediction