In [63]:
import kfp
import kfp.dsl as dsl
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
from typing import NamedTuple
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Dataset,
    Metrics,
    Artifact,
    Model,
    ClassificationMetrics
)
BUCKET_NAME = "gs://feature-store-mars21"

In [64]:
@component(#base_image='', 
           packages_to_install=['argparse','pandas','xgboost','numpy','sklearn','fsspec','gcsfs'],
           output_component_file='prepro.yaml'
)
def prepro(
    data_path: str,
    dta_out_x_train: Output[Dataset],
    dta_out_y_train: Output[Dataset],
    dta_out_x_test: Output[Dataset],
    dta_out_y_test: Output[Dataset]
):
    
    import pandas as pd
    import xgboost as xgb
    import numpy as np
    import collections
    import sklearn
    from google.cloud import storage # test
    from sklearn.model_selection import train_test_split
    from sklearn.utils import shuffle

    ### Import data ###
    COLUMN_NAMES = collections.OrderedDict({
        'as_of_year': np.int16,
        'agency_code': 'category',
        'loan_type': 'category',
        'property_type': 'category',
        'loan_purpose': 'category',
        'occupancy': np.int8,
        'loan_amt_thousands': np.float64,
        'preapproval': 'category',
        'county_code': np.float64,
        'applicant_income_thousands': np.float64,
        'purchaser_type': 'category',
        'hoepa_status': 'category',
        'lien_status': 'category',
        'population': np.float64,
        'ffiec_median_fam_income': np.float64,
        'tract_to_msa_income_pct': np.float64,
        'num_owner_occupied_units': np.float64,
        'num_1_to_4_family_units': np.float64,
        'approved': np.int8
    })

    data = pd.read_csv(data_path, index_col=False, dtype=COLUMN_NAMES)
    
    ### Feature engineering ###
    data = data.dropna()
    data = data[0:100]
    data = shuffle(data, random_state=2)

    labels = data['approved'].values
    data = data.drop(columns=['approved'])

    dummy_columns = list(data.dtypes[data.dtypes == 'category'].index)
    data = pd.get_dummies(data, columns=dummy_columns)

    x,y = data.values,labels
    x_train,x_test,y_train,y_test = train_test_split(x,y)
    
    ### Export data as artifact ###
    pd.DataFrame(x_train).to_csv(dta_out_x_train.path, index=False, header=False)  
    pd.DataFrame(y_train).to_csv(dta_out_y_train.path, index=False, header=False)  
    pd.DataFrame(x_test).to_csv(dta_out_x_test.path, index=False, header=False)  
    pd.DataFrame(y_test).to_csv(dta_out_y_test.path, index=False, header=False)


In [65]:
@component(packages_to_install=['argparse','pandas','xgboost','numpy','sklearn','fsspec','gcsfs'])
def train( 
    dta_in_x_train: Input[Dataset], 
    dta_in_y_train: Input[Dataset],
    dta_in_x_test: Input[Dataset],
    dta_in_y_test: Input[Dataset],
    model: Output[Model],
    metrics_conf: Output[ClassificationMetrics],
    metrics_para: Output[Metrics]
)  -> NamedTuple(
    'ModelPathOut',
    [
      ('path', str)
    ]):
    
    import pandas as pd
    import xgboost as xgb
    import numpy as np
    from google.cloud import storage
    from sklearn.metrics import confusion_matrix
    from sklearn.metrics import roc_curve
    import os
    
    from collections import namedtuple
    
    ### Load data ###
    x_train = pd.read_csv(dta_in_x_train.path, header=None)
    y_train = pd.read_csv(dta_in_y_train.path, header=None)
    x_test = pd.read_csv(dta_in_x_test.path, header=None)
    y_test = pd.read_csv(dta_in_y_test.path, header=None)
    
    ### Build model ###
    eval_set = [(x_train, y_train.values.ravel()), (x_test, y_test.values.ravel())]
    eval_metric = ["auc"]
    bst = xgb.XGBClassifier(objective='reg:logistic')
    bst.fit(x_train, y_train.values.ravel(), eval_set=eval_set, eval_metric=eval_metric)
        
    ### Create evaluation metrics ###
        # Confusion matrix
    pred = bst.predict(x_train)
    metrics_conf.log_confusion_matrix(["0", "1"], confusion_matrix(y_train.values.ravel(), pred).tolist())
    
        # Additional metrics
    results = bst.evals_result()
    auc = results['validation_0']['auc'][0]
    metrics_para.log_metric("auc", (auc))
    
    ### Export model ###
    os.makedirs(model.path, exist_ok=True)
    bst.save_model(model.path+"/model.bst")
    
    output = namedtuple('ModelPathOut',
        ['path'])
    return output(model.path.replace('/gcs/', 'gs://'))

In [66]:
@component(packages_to_install=["google-cloud-aiplatform==1.4.0"])
def upload_model(
    display_name: str,
    serving_container_image_uri: str,
    model: Input[Model],
    project_location: str,
    project_id: str,
    description: str = None,
    labels: dict = None,
    sync: bool = True,
) -> str:

    import logging
    from google.cloud.aiplatform import Model

    # uri expects a folder containing the model binaries
    artifact_uri = model.uri.rsplit("/", 1)[0]

    logging.info("upload model...")
    model = Model.upload(
        display_name,
        serving_container_image_uri,
        artifact_uri=artifact_uri,
        description=description,
        project=project_id,
        location=project_location,
        labels=labels,
        sync=sync,
    )

    logging.info(f"uploaded model {model}")

    return model.resource_name

In [70]:
from google_cloud_pipeline_components import aiplatform as gcc_aip

@dsl.pipeline(
  name='xgb-complex-v1',
  description='Complex XGB pipeline',
  pipeline_root=BUCKET_NAME+"/xgb-pl"
)
def pipeline(
    data_path: str,
    project_id: str,
    bucket_name: str=BUCKET_NAME+"/xgb-pl",
    endpoint_name: str = 'xgb pipeline endpoint complex'
):
    
    prepro_op = prepro(data_path)
    
    
    train_op = train(
        prepro_op.outputs['data_out_x_train'],
        prepro_op.outputs['data_out_y_train'],
        prepro_op.outputs['data_out_x_test'],
        prepro_op.outputs['data_out_y_test']
    )
    train_op.set_cpu_limit('4')
    train_op.set_memory_limit('14Gi')
    #train_op.add_node_selector_constraint('cloud.google.com/gke-accelerator', 'nvidia-tesla-k80')
    #train_op.set_gpu_limit(1)
    
    ### Create endpoint
    #endpoint_create_op = gcc_aip.EndpointCreateOp(
    #    project=project_id,
    #    display_name=endpoint_name
    #).after(train_op)
    
    
    ### Use predefined component to upload model
    model_upload_op = gcc_aip.ModelUploadOp(
        project=project_id,
        display_name='modelxgb',
        artifact_uri=train_op.outputs["path"],
        serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-4:latest'
    ).after(train_op)
    
    
    deploy_op = gcc_aip.ModelDeployOp(  
        project=project_id,
        model=model_upload_op.outputs["model"],
        machine_type="n1-standard-4"
    )
    
    #custom_model_deploy_op = gcc_aip.ModelDeployOp(
    #    project=project_id,
    #    endpoint=endpoint_create_op.outputs["endpoint"],
    #    model=model_upload_op.outputs["model"],
    #    deployed_model_display_name='abc',
    #    machine_type='n1-standard-4')

In [71]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="pl.json"
)


In [72]:
from google.cloud.aiplatform.pipeline_jobs import PipelineJob

pl = PipelineJob(display_name= 'xgb-job',
        template_path= "pl.json",
        location='us-central1',
        parameter_values={'project_id': 'feature-store-mars21', 
                          'data_path': 'gs://mortgage_dataset_files/mortgage-small.csv'})

pl.run(sync=False)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/202835066335/locations/us-central1/pipelineJobs/xgb-complex-v1-20211013123602
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/202835066335/locations/us-central1/pipelineJobs/xgb-complex-v1-20211013123602')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/xgb-complex-v1-20211013123602?project=202835066335
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/202835066335/locations/us-central1/pipelineJobs/xgb-complex-v1-20211013123602 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/202835066335/locations/us-central1/pipelineJobs/xgb-compl