### Building a Kubeflow Pipeline!

 - Step 1: Create a custom component for creating training/validation data

In [1]:
!mkdir ./pipeline
!mkdir ./pipeline/components
!mkdir ./pipeline/components/bq2parquet

#### The steps of creating a custom component are 
- Create the task code
- Create the Dockerfile
- Build and push Image
- Create the config.yaml

#### Step 1: Write the task code

In [12]:
%%writefile ./pipeline/components/bq2parquet/main.py

import argparse
import os

from google.cloud import bigquery
from google.cloud.bigquery.job import ExtractJobConfig


DATASET = "credit_card_fraud"
TRAIN_TABLE = "train"
VALID_TABLE = "validation"

TRAIN_SQL = """ CREATE OR REPLACE TABLE credit_card_fraud.train AS
WITH features_table AS (
    SELECT IFNULL(TransactionDT, 0) AS TransactionDT, 
    IFNULL(TransactionAmt, 0.0) AS TransactionAmt, 
    IFNULL(card1,0) AS card1, IFNULL(card2,0.0) AS card2, 
    IFNULL(card3,0.0) AS card3, IFNULL(C1,0.0) AS C1, IFNULL(C2,0.0) AS C2, 
    IFNULL(C11,0.0) AS C11, IFNULL(C12,0.0) AS C12, IFNULL(C13,0.0) AS C13, 
    IFNULL(C14,0.0) AS C14, IFNULL(D8,0.0) AS D8, IFNULL(V45,0.0) AS V45, 
    IFNULL(V87,0.0) AS V87, IFNULL(V258,0.0) AS V258, 
    IFNULL(card6, "Unknown") AS card6, IFNULL(ProductCD, "Unknown") AS ProductCD,
    IFNULL(P_emaildomain, "Unknown") AS emaildomain,isFraud 
    FROM `kubeflow-1-0-2.credit_card_fraud.data`
    WHERE isFraud IS NOT NULL)

SELECT * FROM features_table AS data
WHERE MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(data))), 10) < 8
"""

VALID_SQL = """ CREATE OR REPLACE TABLE credit_card_fraud.validation AS
WITH features_table AS (
    SELECT IFNULL(TransactionDT, 0) AS TransactionDT, 
    IFNULL(TransactionAmt, 0.0) AS TransactionAmt, 
    IFNULL(card1,0) AS card1, IFNULL(card2,0.0) AS card2, 
    IFNULL(card3,0.0) AS card3, IFNULL(C1,0.0) AS C1, IFNULL(C2,0.0) AS C2, 
    IFNULL(C11,0.0) AS C11, IFNULL(C12,0.0) AS C12, IFNULL(C13,0.0) AS C13, 
    IFNULL(C14,0.0) AS C14, IFNULL(D8,0.0) AS D8, IFNULL(V45,0.0) AS V45, 
    IFNULL(V87,0.0) AS V87, IFNULL(V258,0.0) AS V258, 
    IFNULL(card6, "Unknown") AS card6, IFNULL(ProductCD, "Unknown") AS ProductCD,
    IFNULL(P_emaildomain, "Unknown") AS emaildomain,isFraud 
    FROM `kubeflow-1-0-2.credit_card_fraud.data`
    WHERE isFraud IS NOT NULL)

SELECT * FROM features_table AS data
WHERE MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(data))), 10) >= 8
"""


def export_table_to_gcs(dataset_ref, 
                        source_table, 
                        destination_uri):
    table_ref = dataset_ref.table(source_table)

    config = ExtractJobConfig()
    config.print_header = False
    config.destination_format="PARQUET"

    extract_job = bq.extract_table(
                                    table_ref,
                                    destination_uri,
                                    job_config=config,
                                  )
    extract_job.result()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
                        "--export_path",
                        help = "Path to export the train.parquet and eval.parquet files",
                        required = True
                        )
    
    args = parser.parse_args()

    data_path = args.export_path
    
    train_export_path = os.path.join(data_path, 
                                     "train.parquet")
    valid_export_path = os.path.join(data_path, 
                                     "eval.parquet")

    bq = bigquery.Client()

    dataset_ref = bigquery.Dataset(bq.dataset("credit_card_fraud"))

    try:
        bq.create_dataset(dataset_ref)
        print("Dataset created")
    except:
        print("Dataset already exists")

    print("Creating the training dataset...")
    bq.query(TRAIN_SQL).result()

    print("Creating the validation dataset...")
    bq.query(VALID_SQL).result()

    print("Exporting training dataset to GCS", train_export_path)
    export_table_to_gcs(dataset_ref, 
                        TRAIN_TABLE, 
                        train_export_path)

    print("Exporting validation dataset to GCS", valid_export_path)
    export_table_to_gcs(dataset_ref, 
                        VALID_TABLE, 
                        valid_export_path)


Overwriting ./pipeline/components/bq2parquet/main.py


#### Step 2: Create the Dockerfile

In [13]:
%%writefile ./pipeline/components/bq2parquet/Dockerfile

FROM google/cloud-sdk:latest

RUN apt-get update && \
    apt-get install --yes python3-pip

COPY . /code
WORKDIR /code

RUN pip3 install google-cloud-bigquery 

ENTRYPOINT ["python3", "./main.py"]

Overwriting ./pipeline/components/bq2parquet/Dockerfile


#### Step 3: Build and Push to GCR

In [14]:
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]
IMAGE_NAME='bq2parquet'
IMAGE_TAG='latest'
IMAGE_URI='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, IMAGE_TAG)

In [15]:
!gcloud builds submit --tag $IMAGE_URI ./pipeline/components/bq2parquet

Creating temporary tarball archive of 3 file(s) totalling 4.3 KiB before compression.
Uploading tarball of [./pipeline/components/bq2parquet] to [gs://kubeflow-1-0-2_cloudbuild/source/1620468128.924015-e456456c29014da09c76da06ad40ba21.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/kubeflow-1-0-2/locations/global/builds/53299473-5e40-4724-93da-445a1ab9ecb0].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/53299473-5e40-4724-93da-445a1ab9ecb0?project=9118975290].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "53299473-5e40-4724-93da-445a1ab9ecb0"

FETCHSOURCE
Fetching storage object: gs://kubeflow-1-0-2_cloudbuild/source/1620468128.924015-e456456c29014da09c76da06ad40ba21.tgz#1620468129237953
Copying gs://kubeflow-1-0-2_cloudbuild/source/1620468128.924015-e456456c29014da09c76da06ad40ba21.tgz#1620468129237953...
/ [1 files][  1.5 KiB/  1.5 KiB]                                                
Operation

### Make sure to change to project in the GCR.IO path. 

In [16]:
%%writefile ./pipeline/components/bq2parquet/component.yaml

name: bq2parquet
description: |
    This component creates the training and
    validation datasets as BiqQuery tables and export
    them to parquet files in GCS

inputs:
    - {name: Export Path , type: String, description: 'GCS directory path.'}

implementation:
    container:
        image: gcr.io/kubeflow-1-0-2/bq2parquet
        args: [
          "--export_path", {inputValue: Export Path},
        ]

Overwriting ./pipeline/components/bq2parquet/component.yaml


#### Building a KFP
Builds a kubeflow pipeline that queries BigQuery, creates train/validation tables, exports those tables out to parquet files in GCS, launches an AI Platform training job, then deploys the model to AI Platform Predictions

In [17]:
from os import path
import os 

import kfp
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.notebook


PIPELINE_TAR = 'creditcard.tar.gz'  # to be written/created at the time of compiling the kfp pipeline
BQ2PARQUET_YAML = './pipeline/components/bq2parquet/component.yaml'
COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/'
# Create component factories
component_store = kfp.components.ComponentStore(local_search_paths=None, 
                                                url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])

mlengine_train_op = component_store.load_component('ml_engine/train')
mlengine_deploy_op = component_store.load_component('ml_engine/deploy')

@dsl.pipeline(name='Credit_card_fraud',
              description='Train and deploy ml model to predict credit card fraud')
def pipeline(project_id='',
            job_dir=''):
    # pre-requisite: container image of model training script should be already there in GCR
    TRAINER_IMAGE_URI = f'gcr.io/{project_id}/xgboost_fraud_trainer:latest'
    export_path = f'gs://{project_id}/credit_card_fraud/data'
    model_output_dir = f'gs://{project_id}/credit_card_fraud/models'
    
    bq2parquet_op = comp.load_component_from_file(BQ2PARQUET_YAML)
    bq2parquet = bq2parquet_op(export_path=export_path)
    
    train_data_path = os.path.join(export_path, "train.parquet")
    eval_data_path = os.path.join(export_path, "eval.parquet")
    
    train_args = [
                '--training_dataset_path', train_data_path,
                '--validation_dataset_path', eval_data_path,
                '--max_depth', '20',
                '--nohptune',
                ]
    

    train_model = mlengine_train_op(
                                    project_id=project_id,
                                    job_dir=job_dir,
                                    region='us-central1',
                                    master_image_uri=TRAINER_IMAGE_URI,
                                    args=train_args).set_display_name("CAIP Training Job - XGBoost")
    
    deploy_model = mlengine_deploy_op(
                                    model_uri=train_model.outputs['job_dir'],
                                    project_id=project_id,
                                    model_id='cc_fraud_classifier',
                                    version_id='xgb',
                                    runtime_version='2.3',
                                    python_version='3.7',
                                    replace_existing_version='True').set_display_name("Deployed XGBoost Model to CAIP Predictions")
    

    train_model.after(bq2parquet)

In [18]:
# Get host url from kfp settings
# client = kfp.Client(host='https://402de67bf16fcbda-dot-us-central1.pipelines.googleusercontent.com')

In [19]:
# Change to your managed kubeflow endpoint
HOST = "https://420de6789bf19486fcbda-dot-us-central1.pipelines.googleusercontent.com"
client = kfp.Client(host=HOST)
exp = client.create_experiment(name='credit_card_fraud')

In [20]:
# compile the pipeline
compiler.Compiler().compile(pipeline, 
                            PIPELINE_TAR)  # to be created

In [21]:
import time 

PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]
BUCKET = PROJECT_ID
JOB_NAME = "JOB_{}".format(time.strftime("%Y%m%d_%H%M%S"))
JOB_DIR = "{}/{}".format(f'gs://{BUCKET}/credit_card_fraud/models', JOB_NAME)

run = client.run_pipeline(
                        experiment_id=exp.id, 
                        job_name='credit_card_fraud', 
                        pipeline_package_path='creditcard.tar.gz', 
                        params={
                                'project_id': PROJECT_ID,
                                'job_dir': JOB_DIR
                                },
                        )