# Credit Card Fraud Detection Pipeline

<img src="https://miro.medium.com/max/513/1*aeXlwnOS3DvVHiMVgBZbpQ.png?raw=1" alt="Vertex AI Logo" align="left" height="50" width="50" />

<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/2/2d/Tensorflow_logo.svg/1200px-Tensorflow_logo.svg.png?raw=1" alt="Vertex AI Logo" align="left" height="50" width="50" style="margin-left:20px"/>

## Install libraries

In [1]:
# USER_FLAG = "--user"

In [2]:
# !pip3 install {USER_FLAG} google-cloud-aiplatform==1.18.3 --upgrade
# !pip3 install {USER_FLAG} kfp==1.8.16 google-cloud-pipeline-components==1.0.26

In [3]:
# !python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
# !python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

## Import libraries

In [4]:
import google_cloud_pipeline_components.experimental.evaluation as gcc_evaluation
import json
import kfp
import logging
import os
import time
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, Input, Output, Model, Metrics, ClassificationMetrics
from google.cloud import aiplatform
from google_cloud_pipeline_components.experimental.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

## Set project ID and bucket

In [5]:
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("PROJECT_ID: ", PROJECT_ID)

PROJECT_ID:  aadev-end-to-end-pipeline-demo


In [6]:
BUCKET_NAME = os.path.join("gs://", "demo-kfp-pipelines")
print("BUCKET_NAME:", BUCKET_NAME)

BUCKET_NAME: gs://demo-kfp-pipelines


## Define constants

In [7]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

env: PATH=/Users/ayoad/Downloads/cvs-end-to-end-pipeline-demo/venv/bin:/usr/local/opt/openjdk@11/bin:/Users/ayoad/Downloads/google-cloud-sdk/bin:/opt/anaconda3/bin:/opt/anaconda3/condabin:/usr/local/git/git-google/bin:/usr/local/git/current/bin:/usr/local/bin:/usr/bin:/bin:/usr/local/sbin:/usr/sbin:/sbin:/home/jupyter/.local/bin


In [8]:
PIPELINE_ROOT = os.path.join(BUCKET_NAME, "pipeline-root")
print("PIPELINE_ROOT:", PIPELINE_ROOT)

PIPELINE_ROOT: gs://demo-kfp-pipelines/pipeline-root


In [9]:
UUID = str(int(time.time()))
WORKDIR = os.path.join(PIPELINE_ROOT, UUID)
print(f'WORKDIR: {WORKDIR}')

WORKDIR: gs://demo-kfp-pipelines/pipeline-root/1668969721


In [10]:
EMAIL_NOTIFICATION_RECIPIENTS = ["ayoad@google.com"]

## Create component for model evaluation

In [11]:
@component()
def make_deployment_decision(project: str, staging_bucket: str, metric_name: str, threshold: float) -> NamedTuple("Outputs", [("dep_decision", str)]):
    import json
    import logging
    import os
    import re
    from google.cloud import storage
    
    # determine bucket name and prefix
    staging_bucket_components = staging_bucket.split("gs://")[-1].split("/")
    bucket_name = staging_bucket_components[0]
    prefix = os.path.join(*staging_bucket_components[1:])
    
    # determine evaluation metrics output file "executor_output.json" path
    storage_client = storage.Client(project=project)
    bucket = storage_client.get_bucket(bucket_name)
    blobs = storage_client.list_blobs(bucket, prefix=prefix)
    regex = re.compile(r'.*/model-evaluation-classification_.*/executor_output.json')
    matching_metrics_blobs = list(filter(regex.match, [blob.name for blob in blobs]))
    evaluation_metrics_blob_name = matching_metrics_blobs[0]
    
    # read blob and create results dict
    blob = bucket.blob(evaluation_metrics_blob_name)
    metrics_data_dict = json.loads(blob.download_as_bytes(client=storage_client))
    results_dict = metrics_data_dict["artifacts"]["evaluation_metrics"]["artifacts"][0]['metadata']
   
    # determine metric value
    metric_value = results_dict[metric_name]
    
    if metric_value > threshold:
        dep_decision = "true"
    else:
        dep_decision = "false"
        
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

## Build pipeline

In [12]:
@dsl.pipeline(name="credit-card-fraud-detection-training", pipeline_root=PIPELINE_ROOT)
def pipeline(
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    dataset_display_name: str = "credit_card_fraud_detection",
    bq_source: str = "bq://aadev-end-to-end-pipeline-demo.gcs_lake.credit_card_fraud_detection",
    training_display_name: str = "credit-card-fraud-detection-tf-train",
    staging_bucket: str = WORKDIR,
    container_uri: str = "us-central1-docker.pkg.dev/aadev-end-to-end-pipeline-demo/vertex-ai-pipeline-custom-training-jobs/credit_card_fraud_detection_tf:latest",
    target_column_name: str = "Class",
    class_names: list = ["0", "1"],
    key_columns: list = ["key"],
    bq_dest: str = f"bq://{PROJECT_ID}",
    model_display_name: str = "credit-card-fraud-detection-tf",
    model_serving_container_image_uri: str = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
    training_machine_type: str = "n1-standard-4",
    batch_predict_display_name: str = "credit-card-fraud-detection-tf-batch-predict",
    batch_predict_machine_type: str = "n1-standard-4",
    deployment_metric: str = "auPrc",
    deployment_threshold: float = 0.95,
    aiplatform_host: str = "us-central1-aiplatform.googleapis.com",
    endpoint_display_name: str = "credit-card-fraud-detection-tf",
    dedicated_resources_min_replica_count: int = 1,
    dedicated_resources_max_replica_count: int = 1,
    dedicated_resources_machine_type: str = "n1-standard-4"
):
    notify_email_task = VertexNotificationEmailOp(recipients=EMAIL_NOTIFICATION_RECIPIENTS)

    with dsl.ExitHandler(notify_email_task):
        
        # create dataset
        dataset_create_task = gcc_aip.TabularDatasetCreateOp(
            project=project,
            location=gcp_region,
            display_name=dataset_display_name, 
            bq_source=bq_source
        )

        # determine test dataset file path
        test_dataset_file_path = os.path.join(WORKDIR, "batch-prediction", "input", "test_dataset.jsonl")
        
        # determine splitter output path
        splitter_output_path = os.path.join(WORKDIR, "batch-prediction", "splitter")
        
        # determine batch prediction output path
        batch_prediction_output_path = os.path.join(WORKDIR, "batch-prediction", "output")

        # create custon container training job
        training_task = gcc_aip.CustomContainerTrainingJobRunOp(
            display_name=training_display_name,
            container_uri=container_uri,
            project=project,
            location=gcp_region,
            dataset=dataset_create_task.outputs["dataset"],
            staging_bucket=staging_bucket,
            bigquery_destination=bq_dest,
            environment_variables={
                "TEST_DATASET_FILE_PATH": test_dataset_file_path,
                "PYTHONUNBUFFERED": "1",
                "CLASS_COLUMN": "Class",
                "REMOVE_INPUT_COLUMNS": json.dumps(["Time", "Row_Weight"])
            },
            model_serving_container_image_uri=model_serving_container_image_uri,
            model_display_name=model_display_name,
            machine_type=training_machine_type,
        )

        # remove target column from batch prediction input path
        target_remover_task = gcc_evaluation.TargetFieldDataRemoverOp(
            project=project,
            location=gcp_region,
            root_dir=splitter_output_path,
            gcs_source_uris=[test_dataset_file_path],
            instances_format="jsonl",
            target_field_name=target_column_name,
        ).after(training_task)

        # make batch prediction
        batch_prediction_task = gcc_aip.ModelBatchPredictOp(
            project=project,
            location=gcp_region,
            job_display_name=batch_predict_display_name,
            model=training_task.outputs["model"],
            gcs_source_uris=target_remover_task.outputs["gcs_output_directory"],
            instances_format="jsonl",
            predictions_format="jsonl",
            gcs_destination_output_uri_prefix=batch_prediction_output_path,
            machine_type=batch_predict_machine_type
        )

        # run evaluation based on prediction type and feature attribution component
        eval_task = gcc_evaluation.ModelEvaluationClassificationOp(
            project=project,
            location=gcp_region,
            root_dir=batch_prediction_output_path,
            classification_type="multiclass",
            predictions_format="jsonl",
            predictions_gcs_source=batch_prediction_task.outputs["gcs_output_directory"],
            ground_truth_format="jsonl",
            ground_truth_gcs_source=[test_dataset_file_path],
            target_field_name=target_column_name,
            class_labels=class_names,
            model=training_task.outputs["model"]
        )

        # get feature attributions
        feature_attribution_task = gcc_evaluation.ModelEvaluationFeatureAttributionOp(
            project=project,
            location=gcp_region,
            root_dir=batch_prediction_output_path,
            predictions_format="jsonl",
            predictions_gcs_source=batch_prediction_task.outputs["gcs_output_directory"],
        )

        # Import the evaluation results to the model resource
        model_import_evaluation_task = gcc_evaluation.ModelImportEvaluationOp(
            classification_metrics=eval_task.outputs["evaluation_metrics"],
            feature_attributions=feature_attribution_task.outputs["feature_attributions"],
            model=training_task.outputs["model"],
            dataset_type="jsonl",
        )

        conditional_dep_task = make_deployment_decision(
            project=project,
            staging_bucket=staging_bucket,
            metric_name=deployment_metric,
            threshold=deployment_threshold,
        ).after(eval_task)


        with dsl.Condition(conditional_dep_task.outputs["dep_decision"] == "true", name="deploy_decision"):

            endpoint_op = gcc_aip.EndpointCreateOp(
                project=project,
                location=gcp_region,
                display_name=endpoint_display_name,
            )

            gcc_aip.ModelDeployOp(
                model=training_task.outputs["model"],
                endpoint=endpoint_op.outputs["endpoint"],
                dedicated_resources_min_replica_count=dedicated_resources_min_replica_count,
                dedicated_resources_max_replica_count=dedicated_resources_max_replica_count,
                dedicated_resources_machine_type=dedicated_resources_machine_type,
            )

In [13]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="credit_card_fraud_detection_pipeline.json"
)



In [14]:
ml_pipeline_job = aiplatform.PipelineJob(
    display_name="credit-card-fraud-detection-tf-pipeline-{}".format(UUID),
    template_path="credit_card_fraud_detection_pipeline.json",
    pipeline_root=WORKDIR,
    parameter_values=dict(
        project=PROJECT_ID,
        gcp_region="us-central1",
        dataset_display_name="credit_card_fraud_detection",
        bq_source="bq://aadev-end-to-end-pipeline-demo.gcs_lake.credit_card_fraud_detection",
        training_display_name="credit-card-fraud-detection-tf-train",
        staging_bucket=WORKDIR,
        container_uri="us-central1-docker.pkg.dev/aadev-end-to-end-pipeline-demo/vertex-ai-pipeline-custom-training-jobs/credit_card_fraud_detection_tf:latest",
        target_column_name="Class",
        class_names=["0", "1"],
        key_columns=["key"],
        bq_dest=os.path.join("bq://", PROJECT_ID),
        model_display_name="credit-card-fraud-detection-tf",
        model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
        training_machine_type="n1-standard-4",
        batch_predict_display_name="credit-card-fraud-detection-tf-batch-predict",
        batch_predict_machine_type="n1-standard-4",
        deployment_metric="auPrc",
        deployment_threshold=0.95,
        aiplatform_host="us-central1-aiplatform.googleapis.com",
        endpoint_display_name="credit-card-fraud-detection-tf",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
        dedicated_resources_machine_type="n1-standard-4"
    ),
    enable_caching=True
)

## Submit pipeline job

In [15]:
ml_pipeline_job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/654648321406/locations/us-central1/pipelineJobs/credit-card-fraud-detection-training-20221120104203
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/654648321406/locations/us-central1/pipelineJobs/credit-card-fraud-detection-training-20221120104203')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/credit-card-fraud-detection-training-20221120104203?project=654648321406
