In [None]:
USER_FLAG = "--user"

In [None]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0
!pip3 install {USER_FLAG} kfp==1.8.9

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

In [None]:
!pip list | grep aiplatform

In [None]:
import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

In [None]:
'''
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}
'''

In [None]:
BUCKET_NAME="gs://" + PROJECT_ID + "-pipeline"
BUCKET_NAME

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath

from google.cloud import aiplatform

# We'll use this namespace for metadata querying
from google.cloud import aiplatform_v1

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

In [None]:
#First Component in the pipeline to fetch data from big query.
@component(
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow"],
    base_image="python:3.9",
    output_component_file="dataset_creating.yaml"
)
def get_data(
    bq_table: str,
    output_data_path: OutputPath("Dataset")
):
    from google.cloud import bigquery
    import pandas as pd

    bqclient = bigquery.Client()
    table = bigquery.TableReference.from_string(
        bq_table
    )
    rows = bqclient.list_rows(
        table
    )
    dataframe = rows.to_dataframe(
        create_bqstorage_client=True,
    )
    dataframe = dataframe.sample(frac=1, random_state=2)
    dataframe.to_csv(output_data_path)

In [None]:
#Second component in the pipeline to train the classification model using decision Trees
@component(
    packages_to_install=["sklearn", "pandas", "joblib"],
    base_image="python:3.9",
    output_component_file="model_training.yaml",
)
def training_classmod(
    dataset: Input[Dataset],
    metrics: Output[Metrics],
    model: Output[Model]
):
    
    from sklearn.metrics import roc_curve
    from sklearn.model_selection import train_test_split
    from joblib import dump
    from sklearn.metrics import confusion_matrix
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    
    data = pd.read_csv(dataset.path)  
    cols_drop=  ['CLIENTNUM']
    cols_numeric = ['Customer_Age', 'Months_on_book','Total_Relationship_Count','Months_Inactive_12_mon','Credit_Limit','Total_Revolving_Bal','Avg_Open_To_Buy','Total_Amt_Chng_Q4_Q1','Total_Trans_Amt','Total_Ct_Chng_Q4_Q1','Avg_Utilization_Ratio']
    cols_categorical = ['Gender','Dependent_count', 'Education_Level', 'Marital_Status','Income_Category','Card_Category']
    data['Attrition_Flag'] = [1 if cust == "Existing Customer" else 0 for cust in data['Attrition_Flag']]
    data_encoded = pd.get_dummies(data, columns = cols_categorical)
    X = data_encoded.drop(columns=['Attrition_Flag'])
    y = data_encoded['Attrition_Flag']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=100,stratify=y)
    model_classifier = DecisionTreeClassifier()
    #model_classifier = RandomForestClassifier()
    model_classifier.fit(X_train,y_train)
    y_pred=model_classifier.predict(X_test)
    score = model_classifier.score(X_test,y_test)
    print('accuracy is:',score)
    #cm=confusion_matrix(y_test,y_pred)
    #print(cm)

    metrics.log_metric("accuracy",(score * 100.0))
    metrics.log_metric("model", "Decision tree")
    #metrics.log_confusion_matrix("Confusion Matrix", cm)
    dump(model_classifier, model.path + ".joblib")

In [None]:
@component(
    packages_to_install=["google-cloud-aiplatform"],
    base_image="python:3.9",
    output_component_file="model_deployment.yaml",
)
def model_deployment(
    model: Input[Model],
    project: str,
    region: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)

    deployed_model = aiplatform.Model.upload(
        display_name="custom-model-pipeline",
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")

    # Save data to the output params
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name

In [None]:
@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="custom-pipeline",
)
def pipeline(
    bq_table: str = "",
    output_data_path: str = "data.csv",
    project: str = PROJECT_ID,
    region: str = REGION
):
    dataset_task = get_data(bq_table)

    model_task = training_classmod(dataset_task.output)

    deploy_task = model_deployment(model=model_task.outputs["model"],project=project,region=region)

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="custom-pipeline-classifier.json")

In [None]:
run1 = aiplatform.PipelineJob(
    display_name="vertex-ai-pipeline-custom-training",
    template_path="custom-pipeline-classifier.json",
    job_id="custom-pipeline-DT",
    #job_id="custom-pipeline-RF",
    parameter_values={"bq_table": "vertex-ai-123.credit_card_churn.data"},
    enable_caching=False,)

In [None]:
run1.submit()

In [None]:
#df = aiplatform.get_pipeline_df(pipeline="vertex-ai-pipeline-custom-training")
#df