In [1]:
# !pip install jupyter_contrib_nbextensions
# !jupyter contrib nbextension install - user
# from jedi import settings
# settings.case_insensitive_completion = True

In [2]:
# # Install ai platform and kfp
# USER_FLAG = "--user"
# !pip3 install {USER_FLAG} google-cloud-aiplatform==1.3.0 --upgrade
# !pip3 install {USER_FLAG} kfp --upgrade
# !pip install google_cloud_pipeline_components

In [3]:
# !pip install kfp --upgrade

In [4]:
# !gcloud services enable compute.googleapis.com         \
#                        containerregistry.googleapis.com  \
#                        aiplatform.googleapis.com  \
#                        cloudbuild.googleapis.com \
#                        cloudfunctions.googleapis.com

In [5]:
from typing import NamedTuple
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp.v2 import compiler
from google.cloud import bigquery
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip

In [6]:
USER_FLAG = "--user"
#!gcloud auth login if needed

In [7]:
# Get projet name
shell_output=!gcloud config get-value project 2> /dev/null
PROJECT_ID=shell_output[0]
PROJECT_ID

'gpa-poc-001'

In [8]:
# Set bucket name
BUCKET_NAME="gs://gpa-churn/artifacts"

# Create bucket
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline-vertexai/"
PIPELINE_ROOT

'gs://gpa-churn/artifacts/pipeline-vertexai/'

In [9]:
REGION="southamerica-east1"
REGION

'southamerica-east1'

---

## Creating pipeline components

In [10]:
@component(
    base_image="gcr.io/gpa-poc-001/churn-base-image-src-xgb@sha256:61db16ec13bba7d8023fff61329c6c28a7eb119f8f837fce4c09258776c16727",
    output_component_file="get_prediction_data.yaml"
)

def get_prediction_data(
    Xpred_: Output[Dataset],
    cod_cliente_: Output[Dataset],
    df_date_: Output[Dataset],
    path:str='gs://gpa-churn/data/processed/input/'
    ):
    
    import os
    import gc
    import sys
    import numpy as np
    import pandas as pd
    from google.cloud import storage
    
    # extracting bucket and path info from path
    #-------------------------------------------------------
    bucket = path.split('/')[2]
    path_ref = '/'.join(i for i in path.split('/')[3:-1])
    
    # reading dataframes in path folder
    #-------------------------------------------------------
    storage_client = storage.Client()
    obj_list = storage_client.list_blobs(bucket)
    obj_list = [i.name for i in obj_list if path_ref in i.name]
    obj_list = obj_list[1:]
    df_list = []
    for obj in obj_list:
        local_df = pd.read_parquet(f'gs://{bucket}/{obj}')
        df_list.append(local_df)
        print(f'added {path}{obj}')
        
    # concatenating df_list and saving cod_client column in an independent df
    #-------------------------------------------------------
    df = pd.concat(df_list, axis=0)
    df.drop_duplicates(inplace=True)
    df.reset_index(drop=True, inplace=True)
    cod_cliente = df[['cod_cliente']].copy()
    df_date = df[['date']].copy()
    
    # selecting valid columns
    #-------------------------------------------------------
    df.drop(columns=['cod_cliente'], inplace=True)
    target = 'target'
    features = list(df.columns)
    features = [i for i in features if i != target]
    Xpred = df[features]
    print('Successfully read prediction data.')
    print('shapes:')
    print(f'xtrain:{Xpred.shape}')
    
    # saving output datasets in pipeline
    #-------------------------------------------------------
    Xpred.to_parquet(Xpred_.path + '.parquet', index=False, compression='gzip')
    cod_cliente.to_parquet(cod_cliente_.path + '.parquet', index=False, compression='gzip')
    df_date.to_parquet(df_date_.path + '.parquet', index=False, compression='gzip')

In [11]:
@component(
    base_image="gcr.io/gpa-poc-001/churn-base-image-src-xgb@sha256:61db16ec13bba7d8023fff61329c6c28a7eb119f8f837fce4c09258776c16727",
    output_component_file="load_artifacts.yaml"
)

def load_artifacts(
    fe_pipeline_: Output[Model],
    fs_pipeline_: Output[Model],
    model_: Output[Model],
    path:str='gs://gpa-churn/artifacts/training_pipeline/production/'
    ):
    
    import os
    import sys
    import json
    import pytz
    import joblib
    import pandas as pd
    import xgboost as xgb
    from io import BytesIO
    from datetime import datetime
    from google.cloud import storage
    from sklearn.pipeline import Pipeline

    sys.path.append('/usr/app/')
    sys.path.append('/usr/app/src')
    import src.pipeline_modules as pipeline_modules
    
    # extracting bucket and path info from prefix
    #-------------------------------------------------------
    bucket_name = path.split('/')[2]
    path_ref = '/'.join(i for i in path.split('/')[3:-1])
    
    # creating storage access point
    #-------------------------------------------------------
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    
    # reading pipeline artifacts
    #-------------------------------------------------------
    pipe_dict = {
        'fe_pipeline':None,
        'fs_pipeline':None
    }

    for pipe in pipe_dict.keys():
        art_file=f'{path_ref}/{pipe}.joblib'
        blob = bucket.blob(art_file)
        art_obj = BytesIO()
        blob.download_to_file(art_obj)
        pipe_dict[pipe]=joblib.load(art_obj)
    
    # saving feature engineering and selection artifacts within pipeline
    #-------------------------------------------------------
    obj_list = [fe_pipeline_, fs_pipeline_]
    key_list = list(pipe_dict.keys())
    for i in range(len(obj_list)):
        file_name = obj_list[i].path + '.joblib'
        with open(file_name, 'wb') as file:
            joblib.dump(pipe_dict[key_list[i]], file)
    
    # reading model artifact
    #-------------------------------------------------------
    model_file = 'model.bst'
    art_file=f'{path_ref}/{model_file}'
    blob = bucket.blob(art_file)
    blob.download_to_filename(model_file)
    bst = xgb.Booster()
    bst.load_model(model_file)

    # saving endpoint_information artifact within pipeline
    #-------------------------------------------------------
    model_.metadata['framework'] = 'xgb'
    bst.save_model(model_.path + '.bst')

In [12]:
@component(
    base_image="gcr.io/gpa-poc-001/churn-base-image-src-xgb@sha256:61db16ec13bba7d8023fff61329c6c28a7eb119f8f837fce4c09258776c16727",
    output_component_file="make_predictions.yaml"
)

def make_predictions(
    Xpred_: Input[Dataset],
    cod_cliente_: Input[Dataset],
    df_date_: Input[Dataset],
    model_: Input[Model],
    fe_pipeline_: Input[Model],
    fs_pipeline_: Input[Model],
    predictions_df_: Output[Dataset],
    bucket:str='gpa-churn',
    output_path:str='data/processed/batch_output/'
    ):
    
    import os
    import sys
    import pytz
    import uuid
    import joblib
    import pandas as pd
    import xgboost as xgb
    from datetime import datetime
    from google.cloud import storage
    from google.cloud import aiplatform
    from sklearn.pipeline import Pipeline

    sys.path.append('/usr/app/')
    sys.path.append('/usr/app/src')
    import src.utils as utils
    import src.pipeline_modules as pipeline_modules
    
    # loading artifacts
    #-------------------------------------------------------
    bst = xgb.Booster()
    bst.load_model(model_.path+'.bst')
    fe_pipeline = joblib.load(fe_pipeline_.path+'.joblib')
    fs_pipeline = joblib.load(fs_pipeline_.path+'.joblib')
    
    # reading input arguments
    #-------------------------------------------------------
    Xpred = pd.read_parquet(Xpred_.path+'.parquet')
    cod_cliente = pd.read_parquet(cod_cliente_.path+'.parquet')
    df_date = pd.read_parquet(df_date_.path+'.parquet')
    
    # applying pipelines
    #-------------------------------------------------------
    Xpred = fe_pipeline.transform(Xpred)
    Xpred = fs_pipeline.transform(Xpred)
    
    # making predictions
    #-------------------------------------------------------
    ypred = list(bst.predict(xgb.DMatrix(Xpred)))
    
    # applying cod_cliente as index
    #-------------------------------------------------------
    predictions_df = pd.DataFrame()
    predictions_df['cod_cliente'] = cod_cliente['cod_cliente']
    predictions_df['churn_prediction'] = ypred
    predictions_df['reference_date'] = df_date['date']
    predictions_df['prediction_time'] = datetime.now().strftime(format='%Y-%m-%d %H:%M:%S')
    predictions_df['model_stage'] = 'poc'
    
    # saving predictions dataframe in pipeline and in output_path
    #-------------------------------------------------------
    predictions_df.to_parquet(predictions_df_.path+'.parquet', index=False, compression='gzip')
    
    # upload predictions output to cloud storage
    #-------------------------------------------------------
    predictions_df.to_parquet('predictions.parquet', index=False, compression='gzip')
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket)
    storage_file= f'{output_path}predictions.parquet'
    blob = bucket.blob(storage_file)
    blob.upload_from_filename('predictions.parquet')

---

In [13]:
# creating the pipeline
from datetime import datetime
timestamp=datetime.now().strftime("%Y%m%d%H%M%S")
pipeline_label = f'pipeline-churn-batchprediction-'

In [14]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=pipeline_label)

def pipeline(
    bucket:str='gpa-churn',
    input_data_path:str='data/processed/input/',
    artifacts_path:str='artifacts/training_pipeline/production/',
    predictions_path:str='data/processed/batch_output/'
    ):
    
    data_op = get_prediction_data(
        path=f'gs://{bucket}/{input_data_path}'
        )
    
    load_artifacts_op = load_artifacts(
        path=f'gs://{bucket}/{artifacts_path}'
        )
    
    make_predictions(
        data_op.outputs['Xpred_'],
        data_op.outputs['cod_cliente_'],
        data_op.outputs['df_date_'],
        load_artifacts_op.outputs['model_'],
        load_artifacts_op.outputs['fe_pipeline_'],
        load_artifacts_op.outputs['fs_pipeline_'],
        bucket=bucket,
        output_path=predictions_path
        )

In [15]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='ml_pipeline_batchprediction.json')



---

In [16]:
start_pipeline = pipeline_jobs.PipelineJob(
    display_name=pipeline_label,
    template_path="ml_pipeline_batchprediction.json",
    enable_caching=False,
    location=REGION,
)

In [17]:
start_pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/437364709834/locations/southamerica-east1/pipelineJobs/pipeline-churn-batchprediction-20220607232504
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/437364709834/locations/southamerica-east1/pipelineJobs/pipeline-churn-batchprediction-20220607232504')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/southamerica-east1/pipelines/runs/pipeline-churn-batchprediction-20220607232504?project=437364709834
PipelineJob projects/437364709834/locations/southamerica-east1/pipelineJobs/pipeline-churn-batchprediction-20220607232504 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/437364709834/locations/southamerica-east1/pipelineJobs/pipeline-churn-batchprediction-20220607232504 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/437364709834/locations/southamerica-east1/pipelineJobs/pipeline-churn-batchprediction-202206