## Intro
This lab demonstrates how to launch a kubeflow pipeline using code located in source repositories. In this lab we will:
- Create 2 cloud repos, one that holds the pipeline code and one that holds the training code. This will allow us to modularize the code and make it reusable across different scenarios.
- Create a pipeline that reads from bq, creates a Vertex AI Dataset, generates data statistics using TensorFlow Data Validation library and runs a training job.
- Send an email when the pipeline is completed.

We'll be using the following GCP services and will need to enable them:
- Vertex AI
- BigQuery
- Google Cloud Storage
- Google Cloud Artifact Registry
- Google Cloud Source Repositories
- Google Cloud Artifact Repositories
- Dataflow

You'll also need the following permissions:
- Vertex AI admin
- Bigquery Admin
- Google Cloud Storage object writer.
- Dataflow Admin
- Dataflow Worker
- Artifact Registry Admin

Network:
- Firewall rule with TCP ports 12345-12346 (Dataflow)

## Setup
Install libraries.

In [None]:
!pip install --user google-cloud-aiplatform --upgrade --q
!pip install --user kfp --upgrade --q
!pip install --user google-cloud-pipeline-components --upgrade --q
!pip install --user tensorflow_data_validation --upgrade -q

## Create the source repositories

In [None]:
!gcloud source repos create my-kfp-library

In [None]:
!gcloud source repos clone my-kfp-library

In [None]:
!gcloud source repos create my-training-code

In [None]:
!gcloud source repos clone my-training-code

## Create the training code and run the pipeline.
We will build a container that contains the code to run a scikit-learn classifier and push it to Google Cloud Artifact Repository. The Kubeflow pipeline component will reference this container for the training step.

In [None]:
%%writefile my-training-code/Dockerfile
FROM gcr.io/deeplearning-platform-release/sklearn-cpu.0-23
WORKDIR /

COPY trainer /trainer

RUN pip install sklearn google-cloud-bigquery joblib pandas google-cloud-storage

ENTRYPOINT ["python","-m","trainer.train"]

In [None]:
%mkdir my-training-code/trainer

In [None]:
%%writefile my-training-code/trainer/train.py
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_curve
from sklearn.model_selection import train_test_split
from google.cloud import bigquery
from google.cloud import storage
from joblib import dump

import os
import pandas as pd

bqclient = bigquery.Client()
storage_client = storage.Client()

def download_table(bq_table_uri: str):
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix):]
    table = bigquery.TableReference.from_string(bq_table_uri)
    rows = bqclient.list_rows(
        table,
    )
    return rows.to_dataframe(create_bqstorage_client=False)

# These environment variables are from Vertex AI managed datasets
training_data_uri = os.environ["AIP_TRAINING_DATA_URI"]
test_data_uri = os.environ["AIP_TEST_DATA_URI"]

# Download data into Pandas df, split into train / test
df = download_table(training_data_uri)
test_df = download_table(test_data_uri)
labels = df.pop("Class").tolist()
data = df.values.tolist()
test_labels = test_df.pop("Class").tolist()
test_data = test_df.values.tolist()

# Define and train the scikit model
skmodel = DecisionTreeClassifier()
skmodel.fit(data, labels)
score = skmodel.score(test_data, test_labels)
print('accuracy is:',score)

dump(skmodel, 'model.joblib')

# Upload the saved model file to GCS, the GCS_BUCKET variable will be passed 
# in the pipeline definition
bucket = os.environ["GCS_BUCKET"]
model_directory = os.environ["AIP_MODEL_DIR"]
print("AIP_MODEL_DIR",model_directory)
storage_path = os.path.join(model_directory, "model.joblib")
blob = storage.blob.Blob.from_string(storage_path, client=storage_client)
blob.upload_from_filename("model.joblib")

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}
BUCKET_ID="[your-bucket-id]"  # @param {type:"string"}
IMAGE_URI=f"gcr.io/{PROJECT_ID}/scikit:v1"

In [None]:
!docker build ./my-training-code/ -t $IMAGE_URI

In [None]:
!docker push $IMAGE_URI

## Build the pipeline

In [None]:
%mkdir my-kfp-library/pipelines
%mkdir my-kfp-library/components

In [None]:
!touch my-kfp-library/__init__.py
!touch my-kfp-library/pipelines/__init__.py
!touch my-kfp-library/components/__init__.py

In [None]:
!pip install --user tensorflow_data_validation
!pip install --user pyparsing

### Create components
Here we will create an empty component and a tensorflow data validator component that collects statistics about the input data.

**Emtpy component**

In [None]:
%%writefile my-kfp-library/components/empty_component.py
from kfp.v2.dsl import component
@component()
def empty_component():
    print("this is a test component")

**Generate Statistics component** will also have an option to run tfdv locally in the container.

In [None]:
%%writefile my-kfp-library/components/generate_statistics_component.py
from kfp.v2.dsl import component, Output, Artifact

# setting custom machine type settings https://cloud.google.com/vertex-ai/docs/pipelines/machine-types
@component(base_image="tensorflow/tfx:1.8.0", packages_to_install=["pandas",
                                "google-cloud-bigquery","google-cloud-storage"])
def generate_statistics(bq_source: str,
                        bucket: str,
                        job_id: str,
                        project_id : str,
                        statistics : Output[Artifact]):

    import subprocess
    from google.cloud import bigquery
    import pandas as pd
    import tensorflow_data_validation as tfdv
    import sys

    bqclient = bigquery.Client(project=project_id)

    output_path = f'{bucket}/{job_id}/statistics/stats.pb'
    
    def remove_prefix(cloud_uri, prefix):
        #prefix = "bq://"
        #prefix = "gs://"
        if cloud_uri.startswith(prefix):
            cloud_uri = cloud_uri[len(prefix):]
        return cloud_uri

    def download_table(bq_table_uri: str):
        bq_table_uri = remove_prefix(bq_table_uri,"bq://")
        table = bigquery.TableReference.from_string(bq_table_uri)
        rows = bqclient.list_rows(
            table,
        )
        return rows.to_dataframe(create_bqstorage_client=False)
    
    print("generating statistics")
    
    df = download_table(bq_source)
    stats = tfdv.generate_statistics_from_dataframe(df)
    tfdv.write_stats_text(stats,output_path)
    
    statistics.uri = output_path
        
        

In [None]:
%%writefile my-kfp-library/pipelines/bq_preprocess_train_pipeline.py

import argparse

def build_pipeline(args):
    
    import kfp
    from kfp.v2 import compiler, dsl
    import kfp.dsl as dsl
    from kfp.v2.dsl import component, pipeline
    from google.cloud import aiplatform
    import google_cloud_pipeline_components as gcpc
    from google_cloud_pipeline_components import aiplatform as gcc_aip
    from google_cloud_pipeline_components.experimental import vertex_notification_email
    from datetime import datetime
    from components.generate_statistics_component import generate_statistics
    from components.empty_component import empty_component
    
    print("kfp version:",kfp.__version__)
    print("gcpc version:",gcpc.__version__)
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    
    bucket = args.bucket
    bucket_name = bucket[5:]
    
    job_id = f"{args.job_id}-{TIMESTAMP}"
    
    pipeline_root = f"{bucket}/{args.pipeline_root}"
    
    @pipeline(name=args.pipeline_name, pipeline_root=pipeline_root)
    def pipeline(
        bq_source: str = args.bq_source,
        bucket: str = args.bucket,
        project: str = args.project_id,
        job_id: str = args.job_id,
        gcp_region: str = "us-central1"
    ):
        
        notify_email_task = vertex_notification_email.VertexNotificationEmailOp(
                recipients=args.recipients)
            
        with dsl.ExitHandler(notify_email_task):
            
            dataset_create_op = gcc_aip.TabularDatasetCreateOp(
                display_name="tabular-beans-dataset",
                bq_source=bq_source,
                project=project,
                location=gcp_region
            )
            
            empty_component_op = empty_component()
            
            generate_statistics_op = generate_statistics(bq_source=args.bq_source, 
                                                         bucket=args.bucket, 
                                                         job_id=job_id, 
                                                         project_id=args.project_id)
            
            training_op = gcc_aip.CustomContainerTrainingJobRunOp(
                display_name="pipeline-beans-custom-train",
                container_uri=args.training_container_uri,
                project=project,
                location=gcp_region,
                dataset=dataset_create_op.outputs["dataset"],
                staging_bucket=bucket,
                training_fraction_split=0.8,
                validation_fraction_split=0.1,
                test_fraction_split=0.1,
                bigquery_destination=args.bq_dest,
                model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
                model_display_name="scikit-beans-model-pipeline",
                environment_variables={"GCS_BUCKET" : bucket_name},
                machine_type="n1-standard-4",
            )
        
            
    
    compiler.Compiler().compile(pipeline_func = pipeline, package_path="custom_train_pipeline.json")
    
    pipeline_job = aiplatform.PipelineJob(
        display_name="custom-train-pipeline",
        template_path="custom_train_pipeline.json",
        job_id="custom-train-pipeline-{0}".format(TIMESTAMP),
        enable_caching=True
    )
    pipeline_job.submit()

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket',
                        required=True,
                        help='gcs bucket formatted as gs://my-bucket')
    parser.add_argument('--pipeline-root',
                        required=True,
                        help='name of pipeline')
    parser.add_argument('--pipeline-name',
                        required=True,
                        help="name of pipeline")
    parser.add_argument('--project-id',
                        required=True,
                        help="project id")
    parser.add_argument("--bq-source",
                        required=True,
                        help="source table")
    parser.add_argument("--bq-dest",
                        required=True,
                        help="destination table")
    parser.add_argument("--training-container-uri",
                        required=True,
                        help="training container uri from gcr")
    parser.add_argument("--recipients",nargs='+',
                       required=True,
                       help="email recipients when pipeline exists")
    parser.add_argument("--job-id",
                        required=True,
                        help="job id of your pipeline")

    args = parser.parse_args()
    build_pipeline(args)

In [None]:
%env PYTHONPATH=$PYTHONPATH:/home/jupyter/jfacevedo-demos/vertexai-pipelines/kfp-w-cloud-repos/my-kfp-library/

In [None]:
!python my-kfp-library/pipelines/bq_preprocess_train_pipeline.py \
--bucket $BUCKET_ID \
--pipeline-root my-pipeline-root \
--job-id my-beans-pipeline-1234 \
--project-id $PROJECT_ID \
--pipeline-name my-first-pipeline \
--bq-source bq://aju-dev-demos.beans.beans1 \
--bq-dest bq://$PROJECT_ID \
--training-container-uri $IMAGE_URI \
--recipients myemail@google.com 

## Validate outputs

In [None]:
import tensorflow_data_validation as tfdv
stats_file_loc="gs://$BUCKET_ID/my-beans-pipeline-1234/stats.pb"
stats = tfdv.load_stats_text('stats.pb')
tfdv.visualize_statistics(stats)

## Push code to the cloud repo

In [None]:
%%bash
cd my-kfp-library
git add .
git commit -m "initial commit"
git push origin master

In [None]:
%%bash
cd my-training-code
git add .
git commit -m "initial commit"
git push origin master