In [None]:
!python3 -m pip install kfp --upgrade --user 

In [None]:
import kfp 

import kfp.dsl as dsl 

from kfp import compiler 

from kfp import components 

from kfp.aws import use_aws_secret 

 

BASE_IMAGE = 'tensorflow/tensorflow:2.0.0b0-py3' 

In [None]:
@dsl.python_component( 

    name='data_process_op', 

    description='process data', 

    base_image=BASE_IMAGE  # you can define the base image here, or when you build in the next step.  

) 

def process_data(glue_job_name: str, region: str ) -> str: 

    import os 

    import boto3 

    import time 

     

    print ('start data processing') 

     

    # kick off the Glue Job to process data 

    client = boto3.client('glue', region_name= region) 

    job_id = client.start_job_run(JobName = glue_job_name) 

     

    #wait for the job to complete 

    job_state = "RUNNING" 

    while job_state != "SUCCEEDED": 

        time.sleep(60) 

        status = client.get_job_run(JobName = glue_job_name, RunId = job_id['JobRunId']) 

        job_state = status['JobRun']['JobRunState'] 

  

    print ('data processing completed') 

    return f"GLUE job id: {job_id['JobRunId']}" 

  

process_data_op = components.func_to_container_op( 

    process_data, 

    base_image=BASE_IMAGE,  

    packages_to_install =['boto3'] 

) 

In [None]:
@dsl.python_component( 

    name='model_training_op', 

    description='model training step', 

    base_image=BASE_IMAGE  # you can define the base image here, or when you build in the next step.  

) 

def train_model(bucket: str, key: str, region: str, previous_output: str ) -> str : 

    import os 

     

    import boto3 

    import mlflow 

    import pandas as pd 

    from sklearn.ensemble import RandomForestClassifier 

    from sklearn.model_selection import train_test_split 

     

    s3 = boto3.client('s3', region_name= region) 

    response = s3.list_objects (Bucket = bucket, Prefix = key) 

  

    key = response['Contents'][0]['Key'] 

    s3.download_file ('datalake-demo-dyping', key, "churn.csv") 

     

    churn_data = pd.read_csv('churn.csv') 

        

    # Split the dataset into training (80%) and testing (20%). 

    churn_train, churn_test = train_test_split(churn_data, test_size=0.2) 

  

    churn_train_X = churn_train.loc[:, churn_train.columns != 'exited'] 

    churn_train_y = churn_train['exited'] 

  

    churn_test_X = churn_test.loc[:, churn_test.columns != 'exited'] 

    churn_test_y = churn_test['exited'] 

     

    tracking_uri = <<your mlflow tracking server url>> 

     

    mlflow.set_tracking_uri(tracking_uri) 

    mlflow.set_experiment('Churn Experiment 3') 

     

    with mlflow.start_run(run_name="churn_run_2") as run: 

        bank_churn_clf = RandomForestClassifier(max_depth=2, random_state=0) 

        mlflow.sklearn.autolog() 

        bank_churn_clf.fit(churn_train_X, churn_train_y)  

        mlflow.sklearn.log_model(sk_model=bank_churn_clf, artifact_path="sklearn-model", registered_model_name="churn-model") 

     

    print (f"MLflow run id: {run.info.run_id}") 

    return f"MLflow run id: {run.info.run_id}" 

     

train_model_op = components.func_to_container_op( 

    train_model, 

    base_image=BASE_IMAGE,  

    packages_to_install =['boto3', 'mlflow', 'scikit-learn', 'matplotlib'], 

) 

 

In [None]:
@dsl.python_component( 

    name='model_download_op', 

    description='model training step', 

    base_image=BASE_IMAGE  # you can define the base image here, or when you build in the next step.  

) 

def download_model(model_version: int, previous_output: str ) -> str : 

    import mlflow 

    import os 

    import shutil 

    import boto3 

 

    model_name = "churn-model" 

    model_version = model_version 

 

    tracking_uri = <<your mlflow tracking server>>  

     

    mlflow.set_tracking_uri(tracking_uri) 

    mlflow.set_experiment('Churn Experiment 3') 

     

    sk_model = mlflow.sklearn.load_model(f"models:/{model_name}/{model_version}") 

 

    mlflow.sklearn.save_model(sk_model, f"{model_name}_{model_version}") 

 

    os.mkdir(f"skserver_{model_name}_release") 

     

     

    src = f"{model_name}_{model_version}/model.pkl" 

    des = f"skserver_{model_name}_release/model.joblib" 

 

    shutil.copyfile(src, des) 

     

    targetbucket = "model-deployment-<your initials>" 

    prefix = f"mlflow-models/{model_name}_release" 

             

    def upload_objects(src_path, bucketname): 

        s3 = boto3.resource('s3') 

        my_bucket = s3.Bucket(bucketname) 

  

        for path, dirs, files in os.walk(src_path): 

            dirs[:] = [d for d in dirs if not d.startswith('.')] 

             

            path = path.replace("\\","/") 

            directory_name = prefix + path.replace(src_path,"") 

            for file in files: 

                my_bucket.upload_file(os.path.join(path, file), directory_name + "/" + file) 

  

    upload_objects (des, targetbucket) 

     

    print (f"target bucket: {targetbucket}, prefix: {prefix} ") 

    return f"target bucket: {targetbucket}, prefix: {prefix} " 

 

model_download_op = components.func_to_container_op( 

    download_model, 

    base_image=BASE_IMAGE,  

    packages_to_install =['boto3', 'mlflow', 'scikit-learn'], 

) 

 

In [None]:
@dsl.pipeline( 

  name='bank churn pipeline', 

  description='Train bank churn model' 

) 

def preprocess_train_deploy( 

        bucket: str = 'datalake-demo-dyping', 

        glue_job_name: str = 'customer-churn-processing', 

        region: str = <<aws region>>, 

        tag: str = '4', 

        model: str = 'bank_churn_model', 

        model_version: int = 1, 

): 

    precess_data_task = process_data_op(glue_job_name, region).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'us-west-1')) 

     

    model_training_task = train_model_op(bucket,'ml-customer-churn/data/', region, precess_data_task.output).apply(use_aws_secret())  

     

    model_download_task = model_download_op(model_version, model_training_task.output).apply(use_aws_secret()) 

     

    seldon_config = yaml.load(open("bank_churn_deployment.yaml")) 

    deploy_op = dsl.ResourceOp( 

        name="seldondeploy", 

        k8s_resource=seldon_config, 

        action = "apply", 

        attribute_outputs={"name": "{.metadata.name}"}) 

     

    deploy_op.after(model_download_task) 

In [None]:
import kfp.compiler as compiler 

 

pipeline_filename = 'bank_churn_pipeline.tar.gz' 

compiler.Compiler().compile(preprocess_train_deploy, pipeline_filename) 

In [None]:
client = kfp.Client() 

experiment = client.create_experiment(name='data_experiment', namespace='admin') 

 

arguments = {'model_version':1} 

pipeline_func = preprocess_train_deploy 

run_name = pipeline_func.__name__ + '_run' 

run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments) 