### Install additional packages

Install the following packages required to execute this notebook. 

In [None]:
! pip3 install --upgrade --quiet google-cloud-aiplatform \
                                 google-cloud-storage \
                                 'kfp<2' \
                                 'google-cloud-pipeline-components<2'

In [None]:
# ! pip install protobuf==3.20.0

### Check the package versions

Check the versions of the packages you installed.  The KFP SDK version should be >=1.8.

In [4]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.22
google_cloud_pipeline_components version: 1.0.44


In [5]:
PROJECT_ID = "project-id"  # @param {type:"string"}
REGION = "us-east1"

In [6]:
BUCKET_URI = f"gs://fcustom-ml-experiments"  # @param {type:"string"}

#### Service Account

In [7]:
shell_output = !gcloud auth list 2>/dev/null
SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

print("Service Account:", SERVICE_ACCOUNT)

Service Account: 662741782935-compute@developer.gserviceaccount.com


#### Set service account access for Vertex AI Pipelines

Run the following commands to grant service account access to read and write pipeline artifacts in the bucket that is created in the previous step -- only need to run these once per service account.

In [8]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://fmcc-custom-ml-experiments/
No changes made to gs://fmcc-custom-ml-experiments/


### Import libraries and define constants

In [9]:
from typing import NamedTuple, List

import kfp
from google.cloud import aiplatform
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Input, Metrics,
                        Output, component, Dataset, Model, HTML, Markdown)

#### UUID

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a uuid for each instance session, and append it onto the name of resources you create in this tutorial.

In [10]:
import random
import string


# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))


UUID = generate_uuid()
UUID

'nqygikzk'

#### Vertex AI constants

Setup up the following constants for Vertex AI Pipeline:
- `PIPELINE_NAME`: Set name for the Pipeline.
- `PIPELINE_ROOT`: Cloud Storage bucket path to store pipeline artifacts.

In [11]:
# set path for storing the pipeline artifacts
PIPELINE_NAME = "custom-ml-tabular-training"
PIPELINE_ROOT = "{}/pipeline_root/beans".format(BUCKET_URI)

### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [12]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Define custom components

In [13]:
@dsl.component
def _get_date() -> str:
    """Returns the current date."""
    import datetime  # pylint: disable=g-import-not-at-top,import-outside-toplevel
    return datetime.datetime.today().strftime('%Y%m%d')

In [14]:
@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
    packages_to_install=["scorecardpy==0.1.9.6"],
)
def credit_score_dataset(
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
):
    import pandas as pd
    import scorecardpy as sc

    import logging

    # load germancredit data
    data = sc.germancredit()

    # filter variable via missing rate, iv, identical value rate
    dt_s = sc.var_filter(data, y="creditability")

    # breaking dt into train and test
    train, test = sc.split_df(dt_s, 'creditability').values()
    
    # woe binning ------
    bins = sc.woebin(dt_s, y="creditability")
    # sc.woebin_plot(bins)

    # binning adjustment
    # # adjust breaks interactively
    # breaks_adj = sc.woebin_adj(dt_s, "creditability", bins) 
    # # or specify breaks manually
    breaks_adj = {
        'age.in.years': [26, 35, 40],
        'other.debtors.or.guarantors': ["none", "co-applicant%,%guarantor"]
    }
    bins_adj = sc.woebin(dt_s, y="creditability", breaks_list=breaks_adj)
    
    # converting train and test into woe values
    train_woe = sc.woebin_ply(train, bins_adj)
    test_woe = sc.woebin_ply(test, bins_adj)

    train_woe.to_csv(dataset_train.path, index=False)
    test_woe.to_csv(dataset_test.path, index=False)

In [15]:
@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
    packages_to_install=["scorecardpy==0.1.9.6"],
)
def model_train(
    dataset: Input[Dataset],
    model: Output[Artifact],
):
    import pandas as pd
    import pickle
    from sklearn.pipeline import Pipeline
    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LogisticRegression

    train_woe = pd.read_csv(dataset.path)
    y_train = train_woe.loc[:,'creditability']
    X_train = train_woe.loc[:,train_woe.columns != 'creditability']

    model_pipeline =  LogisticRegression(penalty='l1', C=0.9, solver='saga', n_jobs=-1, random_state=42)

    model_pipeline.fit(X_train, y_train)

    model.metadata["framework"] = "scikit-learn"
    model.metadata["containerSpec"] = {
        "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
    }

    file_name = model.path + "/model.pkl"
    import pathlib

    pathlib.Path(model.path).mkdir()
    with open(file_name, "wb") as file:
        pickle.dump(model_pipeline, file)

In [16]:
@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
    packages_to_install=["scorecardpy==0.1.9.6"],
)
def model_evaluate_metric(
    test_set: Input[Dataset],
    model: Input[Model],
    metrics: Output[Metrics],
    # metricsc: Output[ClassificationMetrics],
) -> dict:
    import pandas as pd
    import pickle
    from sklearn.metrics import (roc_curve,
                                 confusion_matrix,
                                 accuracy_score,
                                 precision_score,
                                 recall_score,
                                 f1_score,
                                 log_loss,
                                 roc_auc_score,
                                 average_precision_score)
    
    data = pd.read_csv(test_set.path)
    file_name = model.path + "/model.pkl"
    with open(file_name, "rb") as file:
        model_pipeline = pickle.load(file)
    
    X=data.drop(columns=['creditability'])
    y=data.creditability

    y_pred = model_pipeline.predict(X)

    y_scores = model_pipeline.predict_proba(X)[:, 1]
#     fpr, tpr, thresholds = roc_curve(y_true=y, y_score=y_scores, pos_label=True)
#     metricsc.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

#     metricsc.log_confusion_matrix(
#         ["good", "bad"],
#         confusion_matrix(y, y_pred).tolist(),
#     )
    
    metrics.log_metric('Framework', 'scikit-learn')
    metrics.log_metric('Threshold','0.5000')
    metrics.log_metric('Precision', precision_score(y, y_pred))
    metrics.log_metric('Recall', recall_score(y, y_pred))
    metrics.log_metric('Accuracy', accuracy_score(y, y_pred))
    metrics.log_metric('F1 score', f1_score(y, y_pred))
    metrics.log_metric('Log loss', log_loss(y, y_pred))
    metrics.log_metric('ROC AUC', roc_auc_score(y, y_scores))
    metrics.log_metric('ROC PR', average_precision_score(y, y_scores))
    
    output = {'auROC': roc_auc_score(y, y_pred)}
    print(output)

In [17]:
@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
    packages_to_install=["scorecardpy==0.1.9.6"],
)
def model_evaluate_matrix(
    test_set: Input[Dataset],
    model: Input[Model],
    metricsc: Output[ClassificationMetrics],
) -> dict:
    import pandas as pd
    import pickle
    from sklearn.metrics import (roc_curve,
                                 confusion_matrix,
                                 accuracy_score,
                                )
    
    data = pd.read_csv(test_set.path)
    file_name = model.path + "/model.pkl"
    with open(file_name, "rb") as file:
        model_pipeline = pickle.load(file)
    
    X=data.drop(columns=['creditability'])
    y=data.creditability

    y_pred = model_pipeline.predict(X)

    y_scores = model_pipeline.predict_proba(X)[:, 1]
    fpr, tpr, thresholds = roc_curve(y_true=y, y_score=y_scores, pos_label=True)
    metricsc.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

    metricsc.log_confusion_matrix(
        ["good", "bad"],
        confusion_matrix(y, y_pred).tolist(),
    )
     

## Define pipeline 

Define the pipeline for AutoML tabular classification using the components from `google_cloud_pipeline_components`.

In [18]:
from ast import literal_eval
@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT)
def pipeline(
    project: str,
    location: str,
    UUID: str
):
    from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
    from google_cloud_pipeline_components.v1.model import ModelUploadOp
    from google_cloud_pipeline_components.experimental.custom_job.utils import (
        create_custom_training_job_op_from_component,
    )
    
    data_op = credit_score_dataset()

    custom_job_distributed_training_op = create_custom_training_job_op_from_component(
        model_train,
        replica_count=1
    )

    model_train_op = custom_job_distributed_training_op(
        dataset=data_op.outputs["dataset_train"],
        project=project,
        location=location,
    ).after(data_op)

    model_evaluate_metric_op = model_evaluate_metric(
        test_set=data_op.outputs["dataset_test"],
        model=model_train_op.outputs["model"],
    ).after(model_train_op)
    
    # model_evaluate_matrix_op = model_evaluate_matrix(
    #     test_set=data_op.outputs["dataset_test"],
    #     model=model_train_op.outputs["model"],
    # ).after(model_train_op)
    
    # shapely parameters
    parameters = {"sampled_shapley_attribution": {"path_count": 10}}
    
    # Explanation metadata
    COLUMNS = ['purpose_woe', 'installment_rate_in_percentage_of_disposable_income_woe', 'status_of_existing_checking_account_woe', 'housing_woe', 'credit_history_woe', 'savings_account_and_bonds_woe', 'duration_in_month_woe', 'present_employment_since_woe', 'age_in_years_woe', 'other_debtors_or_guarantors_woe', 'other_installment_plans_woe', 'property_woe', 'credit_amount_woe']

    metadata = {
    "inputs":{
        "features": {"index_feature_mapping": COLUMNS, "encoding": "BAG_OF_FEATURES"}
    },
    "outputs":{"creditability": {}}}

    model_upload_op = ModelUploadOp(
        project=PROJECT_ID,
        location=REGION,
        display_name=f"german-credit-scroe-model-{UUID}",
        unmanaged_container_model=model_train_op.outputs["model"],
        explanation_parameters=parameters,
        explanation_metadata=metadata,
    ).after(model_train_op)

    endpoint_create_op = EndpointCreateOp(
        project=PROJECT_ID,
        location=REGION,
        display_name=f"german-credit-scroe-endpoint-{UUID}",
    )

    ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name=f"german-credit-scroe-model-{UUID}",
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    ).after(model_upload_op)


## Compile the pipeline

Next, compile the pipeline to the specified json file.

In [19]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="custom_classification_pipeline.json",
)



## Run the pipeline

Pass the input parameters required for the pipeline and run it. The defined pipeline takes the following parameters:
- `project`: Project-id where the pipeline is run.
- `location`: Region for setting the pipeline location.
- `UUID`: uuid for current instance session

In [20]:
parameters = {
    "project": PROJECT_ID,
    "location": REGION,
    "UUID": UUID,
}

In [21]:
# Configure the pipeline
job = aiplatform.PipelineJob(
    display_name=f"custom_classification_training_{UUID}",
    template_path="custom_classification_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values=parameters,
    enable_caching=False,
)

Run the pipeline job. Click on the generated link to see your run in the Cloud Console.

In [22]:
UUID


'nqygikzk'

In [23]:
# Run the job
job.run(sync=False)

Creating PipelineJob
PipelineJob created. Resource name: projects/662741782935/locations/us-central1/pipelineJobs/custom-ml-tabular-training-20231002115217
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/662741782935/locations/us-central1/pipelineJobs/custom-ml-tabular-training-20231002115217')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-ml-tabular-training-20231002115217?project=662741782935
PipelineJob projects/662741782935/locations/us-central1/pipelineJobs/custom-ml-tabular-training-20231002115217 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/662741782935/locations/us-central1/pipelineJobs/custom-ml-tabular-training-20231002115217 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/662741782935/locations/us-central1/pipelineJobs/custom-ml-tabular-training-20231002115217 current state:
PipelineState.PIPELINE_STATE_RUNNING
Pipel