In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/google_cloud_pipelines_dataproc_tabular/google_cloud_pipeline_components_dataproc_tabular.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/notebooks/official/pipelines/google_cloud_pipelines_dataproc_tabular/google_cloud_pipeline_components_dataproc_tabular.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/notebooks/official/pipelines/google_cloud_pipelines_dataproc_tabular/google_cloud_pipeline_components_dataproc_tabular.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>                                                                                               
</table>

## Overview

This notebook shows how to build a Spark ML pipeline using Spark MLlib and DataprocPySparkBatchOp component to determine the customer eligibility for a loan from a banking company. In particular, the pipeline covers a Spark MLib pipeline, from data preprocessing to hyperparameter tuning of a random forest classifier which predicts the probability of a customer being eligible for a loan. 

### Dataset

The dataset is an preprocessed version of the [loan eligiability dataset](https://datasetsearch.research.google.com/search?src=2&query=Loan%20Eligible%20Dataset&docid=L2cvMTFsajJrM3EzcA%3D%3D).


### Objective

In this notebook, you will learn how to

*   Use the DataprocPySparkBatchOp to preprocess data and train an random forest model using Pyspark

*   Use the Spark serving image in order to deploy a Spark model on Vertex AI Endpoint

### Costs 

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage
* Dataproc Serverless

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Set up your local development environment

**If you are using Colab or Vertex AI Workbench Notebooks**, your environment already meets
all the requirements to run this notebook. You can skip this step.

**Otherwise**, make sure your environment meets this notebook's requirements.
You need the following:

* The Google Cloud SDK
* Git
* Python 3
* virtualenv
* Jupyter notebook running in a virtual environment with Python 3

The Google Cloud guide to [Setting up a Python development
environment](https://cloud.google.com/python/setup) and the [Jupyter
installation guide](https://jupyter.org/install) provide detailed instructions
for meeting these requirements. The following steps provide a condensed set of
instructions:

1. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)

1. [Install Python 3.](https://cloud.google.com/python/setup#installing_python)

1. [Install
   virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)
   and create a virtual environment that uses Python 3. Activate the virtual environment.

1. To install Jupyter, run `pip3 install jupyter` on the
command-line in a terminal shell.

1. To launch Jupyter, run `jupyter notebook` on the command-line in a terminal shell.

1. Open this notebook in the Jupyter Notebook Dashboard.

### Install additional packages

Install the following packages required to execute this notebook.

In [None]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

!pip3 install {USER_FLAG} --upgrade google-cloud-aiplatform kfp google-cloud-pipeline-components tensorflow --quiet --no-warn-conflicts

### Restart the kernel

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Before you begin

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

1. [Enable APIs](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleleapis.com,%20cloudbuild.googleapis.com,%20storage-api.googleapis.com,%20artifactregistry.googleapis.com&_ga)

1. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).

1. Enter your project ID in the cell below. Then run the cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [None]:
import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Otherwise, set your project ID here.

In [None]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project]"  # @param {type:"string"}

In [None]:
! gcloud config set project $PROJECT_ID

#### Get your project number

Now that the project ID is set, you get your corresponding project number.

In [None]:
shell_output = ! gcloud projects list --filter="PROJECT_ID:'{PROJECT_ID}'" --format='value(PROJECT_NUMBER)'
PROJECT_NUMBER = shell_output[0]
print("Project Number:", PROJECT_NUMBER)

#### Region

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook.  Below are regions supported for Vertex AI. We recommend that you choose the region closest to you.

- Americas: `us-central1`
- Europe: `europe-west4`
- Asia Pacific: `asia-east1`

You may not use a multi-regional bucket for training with Vertex AI. Not all regions provide support for all Vertex AI services.

Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [None]:
REGION = "[your-region]"  # @param {type: "string"}

if REGION == "[your-region]":
    REGION = "us-central1"

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append it onto the name of resources you create in this tutorial.

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Authenticate your Google Cloud account

**If you are using Vertex AI Workbench Notebooks**, your environment is already
authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions
when prompted to authenticate your account via oAuth.

**Otherwise**, follow these steps:

1. In the Cloud Console, go to the [**Create service account key**
   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).

2. Click **Create service account**.

3. In the **Service account name** field, enter a name, and
   click **Create**.

4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type the following roles into the filter box, and select

  - Artifact Registry Administrator
  - Artifact Registry Repository Administrator
  - Cloud Build Editor
  - Compute Network Admin
  - Dataproc Administrator
  - Dataproc Worker
  - Service Account User
  - Storage Admin
  - Storage Object Admin
  - Vertex AI Administrator

5. Click *Create*. A JSON file that contains your key downloads to your
local environment.

6. Enter the path to your service account key as the
`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell.

In [None]:
# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

import os
import sys

# If on Vertex AI Workbench, then don't execute this code
IS_COLAB = "google.colab" in sys.modules
if not os.path.exists("/opt/deeplearning/metadata/env_version") and not os.getenv(
    "DL_ANACONDA_HOME"
):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

When you submit a Spark job using the Cloud SDK, you need a bucket to read the python module associated and the data required to it. 

Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets.

You may also change the `REGION` variable, which is used for operations
throughout the rest of this notebook. We suggest that you [choose a region where Vertex AI services are
available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions).

In [None]:
BUCKET_NAME = "[your-bucket-name]"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [None]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID + "-aip-" + TIMESTAMP
    BUCKET_URI = f"gs://{BUCKET_NAME}"

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [None]:
! gsutil ls -al $BUCKET_URI

#### Service Account 

If you do not want to use your project's Compute Engine service account, set `SERVICE_ACCOUNT` to another service account ID.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [None]:
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    else:  # IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access

Run the following commands to grant your service account access to the bucket that you created in the previous step. You only need to run this step once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Enabling Private Google Access for Dataproc Serverless

In [None]:
SUBNETWORK = "default"  # @param {type:"string"}

In [None]:
!gcloud compute networks subnets list --regions=$REGION --filter=$SUBNETWORK

In [None]:
!gcloud compute networks subnets update $SUBNETWORK \
--region=$REGION \
--enable-private-ip-google-access

In [None]:
!gcloud compute networks subnets describe $SUBNETWORK \
--region=$REGION \
--format="get(privateIpGoogleAccess)"

### Create the Docker repository

You create a Docker repository in the Artefact Registry for the custom dataproc image we are going to create.

In [None]:
REPO_NAME = "loan-eligiability-spark-demo"

In [None]:
!gcloud artifacts repositories create $REPO_NAME \
    --repository-format=docker \
    --location=$REGION \
    --description="loan eligibility spark docker repository"

### Load preprocessing data

The notebook uses a preprocessed set of data you would read from the Vertex AI Feature Store. 

In [None]:
PUBLIC_DATA_URI = "gs://cloud-samples-data/vertex-ai/dataset-management/datasets/loan_eligibilty/data.csv"
FEATURES_TRAIN_URI = f"{BUCKET_URI}/data/features/snapshots/{TIMESTAMP}"

In [None]:
!gsutil cp -r $PUBLIC_DATA_URI $FEATURES_TRAIN_URI

### Import libraries

In [None]:
# General
import random
from pathlib import Path as path
from typing import NamedTuple

import tensorflow as tf
# ML Training
from google.cloud import aiplatform as vertex_ai
from google.cloud import aiplatform_v1beta1 as vertex_ai_beta
from google_cloud_pipeline_components import aiplatform as vertex_ai_components
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Condition, Input,
                        Metrics, Output, component)

### Set variables

In [None]:
ID = random.randint(1, 10000)

In [None]:
# Setup
SPARK_VERSION = "3.1.2"
SRC = path("src")
BUILD_PATH = path("build")
DELIVERABLES = path("deliverables")
DATA_PATH = path("data")
RUNTIME_IMAGE = "dataproc_serverless_custom_runtime"
IMAGE_TAG = "1.0.0"

# Pipeline
PIPELINE_NAME = "pyspark-loan-eligiability-pipeline"
PIPELINE_ROOT = f"{BUCKET_URI}/pipelines"
PIPELINE_PACKAGE_PATH = str(BUILD_PATH / f"pipeline_{TIMESTAMP}.json")
RUNTIME_CONTAINER_IMAGE = f"gcr.io/{PROJECT_ID}/{RUNTIME_IMAGE}:{IMAGE_TAG}"
ML_APPLICATION = "spark"
TASK = "classifier"
MODEL_TYPE = "rfor"
VERSION = "1.0.0"
MODEL_NAME = f"{ML_APPLICATION}-{TASK}-{MODEL_TYPE}-{VERSION}"

# Preprocessing
PREPROCESSING_BATCH_ID = f"data-preprocessing-{ID}"
PREPROCESSING_PYTHON_FILE_URI = f"{BUCKET_URI}/src/data_preprocessing.py"
PROCESSED_DATA_URI = f"{BUCKET_URI}/data/processed"
PREPROCESSING_ARGS = [
    "--train-data-path",
    FEATURES_TRAIN_URI,
    "--out-process-path",
    PROCESSED_DATA_URI,
]

# Dataset
DATASET_NAME = f"preprocessed-dataset-{ID}"
GCS_PREPROCESSED_URI = f"{PROCESSED_DATA_URI}/*/?.csv"

# Training
TRAINING_BATCH_ID = f"model-training-{ID}"
TRAINING_PYTHON_FILE_URI = f"{BUCKET_URI}/src/model_training.py"
MODEL_URI = f"{BUCKET_URI}/deliverables/model/rfor/{TIMESTAMP}/train_model"
METRICS_URI = f"{BUCKET_URI}/deliverables/metrics/rfor/{TIMESTAMP}/train_metrics.json"
TRAINING_ARGS = [
    "--train-path",
    PROCESSED_DATA_URI,
    "--model-path",
    MODEL_URI,
    "--metrics-path",
    METRICS_URI,
]

# Condition
AUPR_THRESHOLD = 0.5
AUPR_HYPERTUNE_CONDITION = "[AUPR_HYPERTUNE]"

# Hypertuning
HPT_TRAINING_BATCH_ID = f"hyper-tuning-{ID}"
HPT_PYTHON_FILE_URI = f"{BUCKET_URI}/src/hp_tuning.py"
HPT_MODEL_URI = f"{BUCKET_URI}/deliverables/model/rfor/{TIMESTAMP}/model"
HPT_METRICS_URI = f"{BUCKET_URI}/deliverables/metrics/rfor/{TIMESTAMP}/metrics.json"
HPT_ARGS = [
    "--train-path",
    PROCESSED_DATA_URI,
    "--model-path",
    HPT_MODEL_URI,
    "--metrics-path",
    HPT_METRICS_URI,
]

### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
vertex_ai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

### Vertex AI constants

Setup up the following constants for Vertex AI:

- `API_ENDPOINT`: The Vertex AI API service endpoint for `ML Metadata` services.

In [None]:
# API service endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

# Vertex location root path for your dataset, model and endpoint resources
PARENT = "projects/" + PROJECT_ID + "/locations/" + REGION

### Set up clients

The Vertex  works as a client/server model. On your side (the Python script) you will create a client that sends requests and receives responses from the Vertex AI server.

You will use different clients in this tutorial for different steps in the workflow. So set them all up upfront.

- Metadata Service for creating recording, searching and analyzing artifacts and metadata.

In [None]:
# client options same for all services
client_options = {"api_endpoint": API_ENDPOINT}


def create_metadata_client():
    client = vertex_ai_beta.MetadataServiceClient(client_options=client_options)
    return client


clients = {}
clients["metadata"] = create_metadata_client()

for client in clients.items():
    print(client)

## PART I - Build the Vertex Pipeline to train and deploy a Spark model

In this case, the ML pipeline will have the following steps:

1.   Impute categorical and numerical variables with `DataprocPySparkBatchOp`
2.   Train an `RandomForestClassifier` with `DataprocPySparkBatchOp`
3.   Run a custom component in order to evaluate the model

If the model respects the performance condition, then

4.   Hypertune the `RandomForestClassifier` with `DataprocPySparkBatchOp`
5.   Register the model in the Vertex AI Model Registry


### Upload source code

In order to use the `DataprocPySparkBatchOp`, you need to upload the code to the cloud bucket.

In [None]:
!gsutil cp $SRC/__init__.py $BUCKET_URI/src/__init__.py
!gsutil cp $SRC/data_preprocessing.py $BUCKET_URI/src/data_preprocessing.py
!gsutil cp $SRC/model_training.py $BUCKET_URI/src/model_training.py
!gsutil cp $SRC/hp_tuning.py $BUCKET_URI/src/hp_tuning.py

### Build a custom dataproc serverless image

The `DataprocPySparkBatchOp` allows you to pass custom image you would like to use when the [provided Dataproc Serverless runtime versions](https://cloud.google.com/dataproc-serverless/docs/concepts/versions/spark-runtime-versions) does not respect your requirements. 

**Notice** that this step is generally optional. And we include it in this case for general awareness.

#### Define the Dataproc serverless custom runtime image

In [None]:
!mkdir -m 777 -p $BUILD_PATH

In [None]:
dataproc_serverless_custom_runtime_image = """
# Debian 11 is recommended.
FROM debian:11-slim

# Suppress interactive prompts
ENV DEBIAN_FRONTEND=noninteractive

# (Required) Install utilities required by Spark scripts.
RUN apt update && apt install -y procps tini

# (Optional) Add extra jars.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
COPY spark-bigquery-with-dependencies_2.12-0.22.2.jar "${SPARK_EXTRA_JARS_DIR}"

# (Optional) Install and configure Miniconda3.
ENV CONDA_HOME=/opt/miniconda3
ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
ENV PATH=${CONDA_HOME}/bin:${PATH}
COPY Miniconda3-py39_4.10.3-Linux-x86_64.sh .
RUN bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \
  && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
  && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
  && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
  && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict

# (Optional) Install Conda packages.
#
# The following packages are installed in the default image, it is strongly
# recommended to include all of them.
#
# Use mamba to install packages quickly.
RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
    && ${CONDA_HOME}/bin/mamba install \
      conda \
      cython \
      fastavro \
      fastparquet \
      gcsfs \
      google-cloud-bigquery-storage \
      google-cloud-bigquery[pandas] \
      google-cloud-dataproc \
      numpy \
      pandas \
      python \
      scikit-image \
      scikit-learn \
      scipy \
      mleap 

# (Required) Create the 'spark' group/user.
# The GID and UID must be 1099. Home directory is required.
RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark
"""

with open(BUILD_PATH / "Dockerfile", "w") as f:
    f.write(dataproc_serverless_custom_runtime_image)
f.close()

#### Download the `spark-bigquery-with-dependencies` 

In [None]:
!gsutil cp gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar $BUILD_PATH
!wget -P $BUILD_PATH https://repo.anaconda.com/miniconda/Miniconda3-py39_4.10.3-Linux-x86_64.sh

#### Build the Dataproc serverless custom runtime using Google Cloud Build

In [None]:
!gcloud builds submit --tag $RUNTIME_CONTAINER_IMAGE $BUILD_PATH --machine-type=N1_HIGHCPU_32 --timeout=900s --verbosity=info 

### Build custom components for pipeline arguments

In order to pass job arguments, we decide to create some custom components for each step of the pipeline. Also we create a `register_model` component in order to register the PySpark model in Vertex AI Metadata. 

#### Data preprocessing custom component

In [None]:
@component(base_image="python:3.8-slim")
def build_preprocessing_args(train_data_path: str, processed_data_path: str) -> list:
    return [
        "--train-data-path",
        train_data_path,
        "--out-process-path",
        processed_data_path,
    ]

#### Model Training custom component

In [None]:
@component(base_image="python:3.8-slim")
def build_training_args(
    dataset_uri: Input[Artifact], train_path: str, model_path: str, metrics_path: str
) -> list:
    return [
        "--train-path",
        train_path,
        "--model-path",
        model_path,
        "--metrics-path",
        metrics_path,
    ]

#### Model Evaluation custom component

In [None]:
@component(
    base_image="python:3.8",
    packages_to_install=["numpy==1.21.2", "pandas==1.3.3", "scikit-learn==0.24.2"],
)
def evaluate_model(
    metrics_uri: str,
    metrics: Output[Metrics],
    plots: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("threshold_metric", float)]):

    # Libraries --------------------------------------------------------------------------------------------------------------------------
    import json

    import numpy as np
    from sklearn.metrics import confusion_matrix, roc_curve

    # Variables --------------------------------------------------------------------------------------------------------------------------
    metrics_path = metrics_uri.replace("gs://", "/gcs/")
    labels = ["not eligiable", "eligiable"]

    # Helpers --------------------------------------------------------------------------------------------------------------------------
    def calculate_roc(metrics, true, score):
        y_true_np = np.array(metrics[true])
        y_score_np = np.array(metrics[score])
        fpr, tpr, thresholds = roc_curve(
            y_true=y_true_np, y_score=y_score_np, pos_label=True
        )
        return fpr, tpr, thresholds

    def calculate_confusion_matrix(metrics, true, prediction):
        y_true_np = np.array(metrics[true])
        y_pred_np = np.array(metrics[prediction])
        c_matrix = confusion_matrix(y_true_np, y_pred_np)
        return c_matrix

    # Main -------------------------------------------------------------------------------------------------------------------------------
    with open(metrics_path, mode="r") as json_file:
        metrics_dict = json.load(json_file)

    area_roc = metrics_dict["test_area_roc"]
    area_prc = metrics_dict["test_area_prc"]
    acc = metrics_dict["test_accuracy"]
    f1 = metrics_dict["test_f1"]
    prec = metrics_dict["test_precision"]
    rec = metrics_dict["test_recall"]

    metrics.log_metric("Test_areaUnderROC", area_roc)
    metrics.log_metric("Test_areaUnderPRC", area_prc)
    metrics.log_metric("Test_Accuracy", acc)
    metrics.log_metric("Test_f1-score", f1)
    metrics.log_metric("Test_Precision", prec)
    metrics.log_metric("Test_Recall", rec)

    fpr, tpr, thresholds = calculate_roc(metrics_dict, "true", "score")
    c_matrix = calculate_confusion_matrix(metrics_dict, "true", "prediction")
    plots.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
    plots.log_confusion_matrix(labels, c_matrix.tolist())

    component_outputs = NamedTuple(
        "Outputs",
        [
            ("threshold_metric", float),
        ],
    )

    return component_outputs(area_prc)

#### Model registration custom component

In [None]:
# TODO: Build a custom compiler using Spark docker image to compile the Mleap bundle


@component(base_image="python:3.8-slim")
def register_model(
    artifact_uri: str,
    model: Output[Artifact],
) -> NamedTuple("Outputs", [("uri", str)]):

    component_outputs = NamedTuple(
        "Outputs",
        [
            ("uri", str),
        ],
    )
    return component_outputs(artifact_uri)

### Define your workflow using Kubeflow Pipelines DSL package

Below you use the kfp.dsl package to build our pipeline. 


In [None]:
@dsl.pipeline(name=PIPELINE_NAME, description="A pipeline to train a PySpark model.")
def pipeline(
    preprocessing_batch_id: str = PREPROCESSING_BATCH_ID,
    preprocessing_main_python_file_uri: str = PREPROCESSING_PYTHON_FILE_URI,
    train_data_path: str = FEATURES_TRAIN_URI,
    preprocessed_data_path: str = PROCESSED_DATA_URI,
    dataset_name: str = DATASET_NAME,
    dataset_uri: str = GCS_PREPROCESSED_URI,
    training_batch_id: str = TRAINING_BATCH_ID,
    training_main_python_file_uri: str = TRAINING_PYTHON_FILE_URI,
    train_path: str = PROCESSED_DATA_URI,
    model_path: str = MODEL_URI,
    metrics_path: str = METRICS_URI,
    threshold: float = AUPR_THRESHOLD,
    hpt_batch_id: str = HPT_TRAINING_BATCH_ID,
    hpt_main_python_file_uri: str = HPT_PYTHON_FILE_URI,
    hpt_model_path: str = HPT_MODEL_URI,
    hpt_metrics_path: str = HPT_METRICS_URI,
    custom_container_image: str = RUNTIME_CONTAINER_IMAGE,
    model_name: str = MODEL_NAME,
    project_id: str = PROJECT_ID,
    location: str = REGION,
):

    from google_cloud_pipeline_components.experimental.dataproc import \
        DataprocPySparkBatchOp

    # build preprocessed data args
    build_preprocessing_args_op = build_preprocessing_args(
        train_data_path=train_data_path, processed_data_path=preprocessed_data_path
    ).set_display_name("Set preprocessing args")

    # preprocess data
    data_preprocessing_op = (
        DataprocPySparkBatchOp(
            project=project_id,
            location=location,
            container_image=custom_container_image,
            batch_id=preprocessing_batch_id,
            main_python_file_uri=preprocessing_main_python_file_uri,
            args=build_preprocessing_args_op.output,
        )
        .after(build_preprocessing_args_op)
        .set_display_name("Preprocess data")
    )

    # create dataset
    create_dataset_op = (
        vertex_ai_components.TabularDatasetCreateOp(
            display_name=dataset_name,
            gcs_source=dataset_uri,
            project=project_id,
            location=location,
        )
        .after(data_preprocessing_op)
        .set_display_name("Create dataset")
    )

    # build training data args
    build_training_args_op = (
        build_training_args(
            dataset_uri=create_dataset_op.output,
            train_path=train_path,
            model_path=model_path,
            metrics_path=metrics_path,
        )
        .after(create_dataset_op)
        .set_display_name("Set training args")
    )

    # training model
    model_traning_op = (
        DataprocPySparkBatchOp(
            project=project_id,
            location=location,
            container_image=custom_container_image,
            batch_id=training_batch_id,
            main_python_file_uri=training_main_python_file_uri,
            args=build_training_args_op.output,
        )
        .after(build_training_args_op)
        .set_display_name("Train Spark MLlib model")
    )

    evaluate_model_op = (
        evaluate_model(metrics_uri=metrics_path)
        .after(model_traning_op)
        .set_display_name("Evaluate model")
    )

    # evaluate condition
    with Condition(
        evaluate_model_op.outputs["threshold_metric"] >= threshold,
        name=AUPR_HYPERTUNE_CONDITION,
    ):

        build_hpt_args_op = (
            build_training_args(
                dataset_uri=create_dataset_op.output,
                train_path=train_path,
                model_path=hpt_model_path,
                metrics_path=hpt_metrics_path,
            )
            .after(evaluate_model_op)
            .set_display_name("Set hyperparameter tuning args")
        )

        # hyperparameter tuning
        hyperparameter_tuning_op = (
            DataprocPySparkBatchOp(
                project=project_id,
                location=location,
                container_image=custom_container_image,
                batch_id=hpt_batch_id,
                main_python_file_uri=hpt_main_python_file_uri,
                args=build_hpt_args_op.output,
            )
            .after(model_traning_op)
            .set_display_name("Tune Spark MLlib model")
        )

        # upload model
        _ = (
            register_model(artifact_uri=hpt_model_path)
            .after(hyperparameter_tuning_op)
            .set_display_name("Register model")
        )

### Compile your pipeline into a JSON file

Now that you define the workflow of your pipeline, you compile the pipeline into a JSON format.

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path=PIPELINE_PACKAGE_PATH)

### Submit your pipeline run

Finally, you use the Vertex AI Python SDK to submit and run your pipeline.

It takes the following parameters which we set as default:

- `preprocessing_batch_id`: The ID of preprocessing batch job.
- `preprocessing_main_python_file_uri` : The bucket URI of the preprocessing python file.
- `train_data_path` : The bucket path of the raw data.
- `preprocessed_data_path` : The bucket URI of the preprocessed data.
- `dataset_name`: The Vertex AI Dataset name.
- `dataset_uri` : The bucket URI of the preprocessed data.
- `training_batch_id` : The ID of training batch job.
- `training_main_python_file_uri` : The bucket URI of the training python file.
- `train_path` : The bucket URI of the training data.
- `model_path` : The bucket URI of the trained model.
- `metrics_path`: The bucket URI of the training metrics.
- `threshold`: The value of the metric threshold.
- `hpt_batch_id` : The ID of hyperparameter tuning batch job.
- `hpt_main_python_file_uri` : The bucket URI of the hypertuning python file.
- `hpt_model_path` : The bucket URI of the trained model.
- `custom_container_image` : The name of the custom container.
- `model_name` : The name of the trained model.
- `project_id` : The project ID.
- `location` : The location.

In [None]:
pipeline = vertex_ai.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path=PIPELINE_PACKAGE_PATH,
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,  # True
)

pipeline.submit()

### View Dataproc serverless ML training pipeline results

Finally, you will view the artifact outputs of each task in the pipeline.

In [None]:
PROJECT_NUMBER = pipeline.gca_resource.name.split("/")[1]
print("PROJECT NUMBER: ", PROJECT_NUMBER)
print("\n\n")


def print_pipeline_output(job, output_task_name):
    JOB_ID = job.name
    print(JOB_ID)
    for _ in range(len(job.gca_resource.job_detail.task_details)):
        TASK_ID = job.gca_resource.job_detail.task_details[_].task_id
        EXECUTE_OUTPUT = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/executor_output.json"
        )
        GCP_RESOURCES = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/gcp_resources"
        )
        EVAL_METRICS = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/evaluation_metrics"
        )
        if tf.io.gfile.exists(EXECUTE_OUTPUT):
            ! gsutil cat $EXECUTE_OUTPUT
            return EXECUTE_OUTPUT
        elif tf.io.gfile.exists(GCP_RESOURCES):
            ! gsutil cat $GCP_RESOURCES
            return GCP_RESOURCES
        elif tf.io.gfile.exists(EVAL_METRICS):
            ! gsutil cat $EVAL_METRICS
            return EVAL_METRICS

    return None


print("dataproc-create-pyspark-batch")
artifacts = print_pipeline_output(pipeline, "dataproc-create-pyspark-batch")
print("\n\n")
print("dataproc-create-pyspark-batch-2")
artifacts = print_pipeline_output(pipeline, "dataproc-create-pyspark-batch-2")
print("\n\n")
print("dataproc-create-pyspark-batch-3")
artifacts = print_pipeline_output(pipeline, "dataproc-create-pyspark-batch-3")
print("\n\n")

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial with the following code.

In [None]:
# Delete artifacts
metadata_store_id = f"projects/{PROJECT_ID}/locations/{REGION}/metadataStores/default"
now = datetime.now().isoformat().replace(".", ":")[:-7]
filter = f'create_time <= "{now}"'
artifact_req = {
    "parent": metadata_store_id,
    "filter": filter,
}

result = clients["metadata"].purge_artifacts(artifact_req)

In [None]:
# Delete dataset
dataset_list = vertex_ai.TabularDataset.list()
dataset_resource_name = dataset_list[-1].resource_name
dataset = vertex_ai.TabularDataset(dataset_resource_name)
dataset.delete()

In [None]:
# delete pipeline
pipeline.delete()

In [None]:
! gcloud artifacts repositories delete $REPO_NAME --location=$REGION --quiet

In [None]:
! gsutil -m rm -r $BUCKET_URI