## Install Vertex Pipeline SDK & Kubelfow

In [7]:
# Restart the kernel
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
# Install ai platform and kfp
!pip3 install google-cloud-aiplatform==1.3.0 --upgrade --user
!pip3 install kfp --upgrade --user
!pip install google_cloud_pipeline_components --user

## Set up Global Variables

In [1]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"  #HINT: us-central1 is always the cheapest

# Set bucket name
BUCKET_NAME="gs://cloud-ai-platform-837ad179-119c-4d56-a742-82ec7aaa2cae"

# Create bucket
PIPELINE_ROOT = f"{BUCKET_NAME}/pipelines/"
PIPELINE_ROOT

# Dataset Bucket
DATA_PATH = "gs://bq-experiments-datasets/fraud/synthetic-fraud.csv"

env: PATH=/opt/conda/bin:/opt/conda/condabin:/opt/conda/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/jupyter/.local/bin


### Install Libraries

In [2]:
from typing import NamedTuple
import typing
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp.v2 import compiler
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip
from sklearn.ensemble import RandomForestClassifier
import pickle 
import pandas as pd
import numpy as np

### Create pipeline

We create 4 components:  
- Load data   
- Train a  model
- Deploy the model

In [None]:
# Requirement to read data from GCS Bucket
!pip install fsspec
!pip install gcsfs



In [3]:
df = pd.read_csv(DATA_PATH)
df

Unnamed: 0,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
0,1,PAYMENT,9839.64,C1231006815,170136.00,160296.36,M1979787155,0.00,0.00,0,0
1,1,PAYMENT,1864.28,C1666544295,21249.00,19384.72,M2044282225,0.00,0.00,0,0
2,1,TRANSFER,181.00,C1305486145,181.00,0.00,C553264065,0.00,0.00,1,0
3,1,CASH_OUT,181.00,C840083671,181.00,0.00,C38997010,21182.00,0.00,1,0
4,1,PAYMENT,11668.14,C2048537720,41554.00,29885.86,M1230701703,0.00,0.00,0,0
...,...,...,...,...,...,...,...,...,...,...,...
6362615,743,CASH_OUT,339682.13,C786484425,339682.13,0.00,C776919290,0.00,339682.13,1,0
6362616,743,TRANSFER,6311409.28,C1529008245,6311409.28,0.00,C1881841831,0.00,0.00,1,0
6362617,743,CASH_OUT,6311409.28,C1162922333,6311409.28,0.00,C1365125890,68488.84,6379898.11,1,0
6362618,743,TRANSFER,850002.52,C1685995037,850002.52,0.00,C2080388513,0.00,0.00,1,0


## Create Read Component

In [4]:
@component(
    packages_to_install=["pandas", "pyarrow", "scikit-learn==1.0.2","fsspec", "gcsfs"],
    base_image="python:3.9",
    output_component_file="get_fraud_data.yaml"
)

def get_fraud_data(
    url: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
):
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split as tts
    from google.cloud import storage
    
    print("reading the csv")
    df_fraud = pd.read_csv(url)
    
    print("dropping columns and getting dummies")
    df_fraud.drop(['nameOrig','nameDest','isFlaggedFraud'],axis=1,inplace=True)
    df_fraud = pd.concat([df_fraud.drop('type', axis=1), pd.get_dummies(df_fraud['type'])], axis=1)
    
    print("creating test train split")
    train, test = tts(df_fraud, test_size = 0.3,random_state=42, shuffle=False)
    
    print("creating csvs")
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

## Train the Model

In [5]:
@component(
    packages_to_install = ["pandas", "scikit-learn==1.0.2"], 
    base_image="python:3.9",
)
def train_fraud_model(
    dataset:  Input[Dataset],
    model: Output[Model], 
):
    
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import pickle

    print("reading the dataset")
    data = pd.read_csv(dataset.path+".csv")
    
    print("getting the labels")
    y_train = data["isFraud"]
    
    print("dropping the targets")
    X_train = data.drop('isFraud', axis=1)
    
    print("creating the model")
    model_rf = RandomForestClassifier(n_estimators=120)
    
    print("Fitting the model")
    model_rf.fit(X_train,y_train)
    print("Finished fitting the model")
    
    print("Creating PKL")
    model.metadata["framework"] = "RF"
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(model_rf, file)

## Evaluate the Model

In [6]:
@component(
    packages_to_install = ["pandas", "scikit-learn==1.0.2"], 
    base_image="python:3.9",
)
def fraud_evaluation(
    test_set:  Input[Dataset],
    rf_fraud_model: Input[Model],
    thresholds_dict_str: str,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
) -> NamedTuple("output", [("deploy", str)]):

    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import logging 
    import pickle
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score
    import json
    import typing

    
    def threshold_check(val1, val2):
        cond = "false"
        if val1 >= val2 :
            cond = "true"
        return cond
    
    print("Getting test data")
    data = pd.read_csv(test_set.path+".csv")
    
    print("restoring the model")
    model = RandomForestClassifier()
    file_name = rf_fraud_model.path + ".pkl"
    with open(file_name, 'rb') as file:  
        model = pickle.load(file)
    print("model restored")
    
    print("prepping the data")
    y_test = data["isFraud"]
    X_test = data.drop('isFraud', axis=1)
    
          
    print("running prediction")
    y_pred = model.predict(X_test)
    
    print("Making test predictions")
    y_scores = model.predict_proba(X_test)[:, 1]
    
    print("Getting ROC")
    fpr, tpr, thresholds = roc_curve(
         y_true=data['isFraud'].to_numpy(), y_score=y_scores, pos_label=True
    )
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())  
    
    print("Getting confusion matrix")
    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
           data['isFraud'], y_pred
       ).tolist(), 
    )
    
    print("Checking Accuracy")
    accuracy = accuracy_score(data['isFraud'], y_pred.round())
    thresholds_dict = json.loads(thresholds_dict_str)
    rf_fraud_model.metadata["accuracy"] = float(accuracy)
    kpi.log_metric("accuracy", float(accuracy))
    deploy = threshold_check(float(accuracy), int(thresholds_dict['roc']))
    return (deploy,)
    


## Deploy the Model

In [17]:
@component(
    packages_to_install=["google-cloud-aiplatform", "sklearn",  "kfp"],
    base_image="python:3.9",
    output_component_file="model_fraud_coponent.yml"
)
def deploy_fraud_model(
    model: Input[Model],
    project: str,
    region: str,
    serving_container_image_uri : str, 
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)

    DISPLAY_NAME  = "financial-fraud-synthetic"
    MODEL_NAME = "financial-fraud"
    ENDPOINT_NAME = "financial-fraud-ep2"
    
    def create_endpoint():
        print('Creating endpoint')
        endpoints = aiplatform.Endpoint.list(
        filter='display_name="{}"'.format(ENDPOINT_NAME),
        order_by='create_time desc',
        project=project, 
        location=region,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0]  # most recently created
        else:
            endpoint = aiplatform.Endpoint.create(
            display_name=ENDPOINT_NAME, project=project, location=region
        )
    endpoint = create_endpoint() 
    print('Endpoint created')
    
    
    #Import a model programmatically
    print('Uploading model to endpoint')
    model_upload = aiplatform.Model.upload(
        display_name = DISPLAY_NAME, 
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri =  serving_container_image_uri,
        serving_container_health_route=f"/v1/models/{MODEL_NAME}",
        serving_container_predict_route=f"/v1/models/{MODEL_NAME}:predict",
        serving_container_environment_variables={
        "MODEL_NAME": MODEL_NAME,
    },       
    )
    model_deploy = model_upload.deploy(
        machine_type="n1-standard-4", 
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME,
    )
    print('Model deployed')

    # Save data to the output params
    print('Saving model output params')
    vertex_model.uri = model_deploy.resource_name

## Create the Pipeline

In [18]:
@dsl.pipeline(
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-fraud",
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,

    
)
def pipeline(
    url: str = DATA_PATH,
    project: str = 'bq-experiments-350102',
    region: str = REGION, 
    display_name: str = 'financial-fraud-synthetic',
    api_endpoint: str = REGION+"-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"roc":0.8}',
    serving_container_image_uri: str = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-23:latest"
    ):
    
    data_op = get_fraud_data(url)
    train_model_op = train_fraud_model(data_op.outputs["dataset_train"])
    model_evaluation_op = fraud_evaluation(
        test_set=data_op.outputs["dataset_test"],
        rf_fraud_model=train_model_op.outputs["model"],
        thresholds_dict_str = thresholds_dict_str, # I deploy the model anly if the model performance is above the threshold
    )
    
    with dsl.Condition(
        model_evaluation_op.outputs["deploy"]=="true",
        name="deploy-fraud-model",
    ):
           
        deploy_model_op = deploy_fraud_model(
        model=train_model_op.outputs['model'],
        project=project,
        region=region, 
        serving_container_image_uri = serving_container_image_uri,
        )
    

## Compile the Pipeline

In [19]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='ml_fraud.json')



## Create a Run

In [20]:
start_pipeline = pipeline_jobs.PipelineJob(
    display_name="fraud-pipeline",
    template_path="ml_fraud.json",
    enable_caching=False,
    location=REGION,
)

In [None]:
start_pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/402374189238/locations/us-central1/pipelineJobs/pipeline-fraud-20220825143425
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/402374189238/locations/us-central1/pipelineJobs/pipeline-fraud-20220825143425')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-fraud-20220825143425?project=402374189238
PipelineJob projects/402374189238/locations/us-central1/pipelineJobs/pipeline-fraud-20220825143425 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/402374189238/locations/us-central1/pipelineJobs/pipeline-fraud-20220825143425 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/402374189238/locations/us-central1/pipelineJobs/pipeline-fraud-20220825143425 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/402374189238/locations/us-central1/pipelineJobs/pipeline