# MLOps for Spark MLLib with Vertex AI Pipelines - Part 2
In this notebook, we create a Vertex AI pipeline for MLOps with Spark MLLib powered by Dataproc Serverless Spark

### 1. Setup

In [1]:
import random
from pathlib import Path as path
from typing import NamedTuple
import os


from google.cloud import aiplatform as vertex_ai
from google_cloud_pipeline_components import aiplatform as vertex_ai_components
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Condition, Input,
                        Metrics, Output, component)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#### a. Project specifics

In [2]:
import os

PROJECT_ID = ""
PROJECT_NBR = ""
UNIQUE_ID = random.randint(1, 10000)

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    project_id_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = project_id_output[0]
    print("Project ID: ", PROJECT_ID)
    
    
    project_nbr_output = !gcloud projects describe $PROJECT_ID --format='value(projectNumber)'
    PROJECT_NBR = project_nbr_output[0]
    print("Project Number: ", PROJECT_NBR)
    
umsa_output = !gcloud config list account --format "value(core.account)"
UMSA_FQN = umsa_output[0]
print("UMSA FQN: ", UMSA_FQN)

!gcloud config set project $PROJECT_ID

Project ID:  s8s-spark-ml-mlops
Project Number:  974925525028
UMSA FQN:  s8s-lab-sa@s8s-spark-ml-mlops.iam.gserviceaccount.com
Updated property [core/project].


#### b. The pre-created resources

In [3]:
LOCAL_SCRATCH_DIR = path("docker-build-scratch")
CODE_BUCKET = f"gs://s8s_code_bucket-{PROJECT_NBR}"
DATA_BUCKET = f"gs://s8s_data_bucket-{PROJECT_NBR}"
MODEL_BUCKET = f"gs://s8s_model_bucket-{PROJECT_NBR}"
SCRATCH_BUCKET = f"s8s-spark-bucket-{PROJECT_NBR}"
LOCATION = "us-central1"
VPC_NM = f"s8s-vpc-{PROJECT_NBR}"
SUBNET_RESOURCE_URI = f"projects/{PROJECT_ID}/regions/{LOCATION}/subnetworks/spark-snet"
PERSISTENT_SPARK_HISTORY_SERVER_RESOURCE_URI = f"projects/{PROJECT_ID}/regions/{LOCATION}/clusters/s8s-sphs-{PROJECT_NBR}"
GCR_REPO_NM = f"s8s-spark-{PROJECT_NBR}"
DOCKER_IMAGE_TAG = "1.0.3"
DOCKER_IMAGE_NM = "dataproc_serverless_custom_runtime"
DOCKER_IMAGE_FQN = f"gcr.io/{PROJECT_ID}/{DOCKER_IMAGE_NM}:{DOCKER_IMAGE_TAG}"

print('LOCAL_SCRATCH_DIR =',LOCAL_SCRATCH_DIR)
print('UMSA_FQN =',UMSA_FQN)
print('CODE_BUCKET =',CODE_BUCKET)
print('DATA_BUCKET =',DATA_BUCKET)
print('MODEL_BUCKET =',MODEL_BUCKET)
print('SCRATCH_BUCKET =',SCRATCH_BUCKET)
print('LOCATION =',LOCATION)
print('VPC_NM =',VPC_NM)
print('SUBNET_RESOURCE_URI =',SUBNET_RESOURCE_URI)
print('PERSISTENT_SPARK_HISTORY_SERVER_RESOURCE_URI =',PERSISTENT_SPARK_HISTORY_SERVER_RESOURCE_URI)
print('GCR_REPO_NM =',GCR_REPO_NM)
print('DOCKER_IMAGE_FQN =',DOCKER_IMAGE_FQN)

LOCAL_SCRATCH_DIR = docker-build-scratch
UMSA_FQN = s8s-lab-sa@s8s-spark-ml-mlops.iam.gserviceaccount.com
CODE_BUCKET = gs://s8s_code_bucket-974925525028
DATA_BUCKET = gs://s8s_data_bucket-974925525028
MODEL_BUCKET = gs://s8s_model_bucket-974925525028
SCRATCH_BUCKET = s8s-spark-bucket-974925525028
LOCATION = us-central1
VPC_NM = s8s-vpc-974925525028
SUBNET_RESOURCE_URI = projects/s8s-spark-ml-mlops/regions/us-central1/subnetworks/spark-snet
PERSISTENT_SPARK_HISTORY_SERVER_RESOURCE_URI = projects/s8s-spark-ml-mlops/regions/us-central1/clusters/s8s-sphs-974925525028
GCR_REPO_NM = s8s-spark-974925525028
DOCKER_IMAGE_FQN = gcr.io/s8s-spark-ml-mlops/dataproc_serverless_custom_runtime:1.0.2


#### c. Pipeline entity specific

In [4]:
PIPELINE_ID = UNIQUE_ID
PIPELINE_NM = "pyspark-customer-churn-pipeline"
PIPELINE_PACKAGE_SRC_LOCAL_PATH = f"{LOCAL_SCRATCH_DIR}/pipeline_{PIPELINE_ID}.json"
PIPELINE_ROOT_GCS_URI = f"{MODEL_BUCKET}/pipelines"

print('PIPELINE_ID =',PIPELINE_ID)
print('PIPELINE_NM =',PIPELINE_NM)
print('PIPELINE_PACKAGE_SRC_LOCAL_PATH =',PIPELINE_PACKAGE_SRC_LOCAL_PATH)
print('PIPELINE_ROOT_GCS_URI =',PIPELINE_ROOT_GCS_URI)

PIPELINE_ID = 1544
PIPELINE_NM = pyspark-customer-churn-pipeline
PIPELINE_PACKAGE_SRC_LOCAL_PATH = docker-build-scratch/pipeline_1544.json
PIPELINE_ROOT_GCS_URI = gs://s8s_model_bucket-974925525028/pipelines


#### d. Pipeline stage agnostic

In [5]:
PY_SCRIPTS_FQP = f"{CODE_BUCKET}/pyspark"
PYSPARK_COMMON_UTILS_SCRIPT_FQP = [f"{PY_SCRIPTS_FQP}/common_utils.py"]

print('PY_SCRIPTS_FQP =',PY_SCRIPTS_FQP)
print('PYSPARK_COMMON_UTILS_SCRIPT_FQP =',PYSPARK_COMMON_UTILS_SCRIPT_FQP)

PY_SCRIPTS_FQP = gs://s8s_code_bucket-974925525028/pyspark
PYSPARK_COMMON_UTILS_SCRIPT_FQP = ['gs://s8s_code_bucket-974925525028/pyspark/common_utils.py']


#### d. Data preprocessing stage specific

In [6]:
DATA_PREPROCESSING_BATCH_PREFIX = "customer-churn-data-preprocessing"
DATA_PREPROCESSING_BATCH_INSTANCE_ID = f"{DATA_PREPROCESSING_BATCH_PREFIX}-{UNIQUE_ID}"
DATA_PREPROCESSING_MAIN_PY_SCRIPT = f"{PY_SCRIPTS_FQP}/data_preprocessing.py"
DATA_PREPROCESSING_RAW_SOURCE_FQP = f"{DATA_BUCKET}/customer_churn_train_data.csv"
DATA_PREPROCESSING_SCRATCH_BUCKET = f"{SCRATCH_BUCKET}/{DATA_PREPROCESSING_BATCH_PREFIX}"
DATA_PREPROCESSING_ARGS = [f"--appName={DATA_PREPROCESSING_BATCH_PREFIX}", \
        f"--projectID={PROJECT_ID}", \
        f"--rawDatasetBucketFQN={DATA_PREPROCESSING_RAW_SOURCE_FQP}",  \
        f"--sparkBigQueryScratchBucketUri={DATA_PREPROCESSING_SCRATCH_BUCKET}",  \
        f"--enableDataframeDisplay={True}"]

DATA_PROCESSING_BQ_SINK = f"{PROJECT_ID}.customer_churn_ds.customer_churn_training_data"
DATA_PROCESSING_BQ_SINK_URI = f"bq://{DATA_PROCESSING_BQ_SINK}"

print('DATA_PREPROCESSING_BATCH_PREFIX =',DATA_PREPROCESSING_BATCH_PREFIX)
print('DATA_PREPROCESSING_BATCH_INSTANCE_ID =',DATA_PREPROCESSING_BATCH_INSTANCE_ID)
print('DATA_PREPROCESSING_MAIN_PY_SCRIPT =',DATA_PREPROCESSING_MAIN_PY_SCRIPT)
print('DATA_PREPROCESSING_RAW_SOURCE_FQP =',DATA_PREPROCESSING_RAW_SOURCE_FQP)
print('DATA_PREPROCESSING_SCRATCH_BUCKET =',DATA_PREPROCESSING_SCRATCH_BUCKET)
print('DATA_PREPROCESSING_ARGS =',DATA_PREPROCESSING_ARGS)
print('DATA_PROCESSING_BQ_SINK =',DATA_PROCESSING_BQ_SINK)
print('DATA_PROCESSING_BQ_SINK_URI =',DATA_PROCESSING_BQ_SINK_URI)

DATA_PREPROCESSING_BATCH_PREFIX = customer-churn-data-preprocessing
DATA_PREPROCESSING_BATCH_INSTANCE_ID = customer-churn-data-preprocessing-1544
DATA_PREPROCESSING_MAIN_PY_SCRIPT = gs://s8s_code_bucket-974925525028/pyspark/data_preprocessing.py
DATA_PREPROCESSING_RAW_SOURCE_FQP = gs://s8s_data_bucket-974925525028/customer_churn_train_data.csv
DATA_PREPROCESSING_SCRATCH_BUCKET = s8s-spark-bucket-974925525028/customer-churn-data-preprocessing
DATA_PREPROCESSING_ARGS = ['--appName=customer-churn-data-preprocessing', '--projectID=s8s-spark-ml-mlops', '--rawDatasetBucketFQN=gs://s8s_data_bucket-974925525028/customer_churn_train_data.csv', '--sparkBigQueryScratchBucketUri=s8s-spark-bucket-974925525028/customer-churn-data-preprocessing', '--enableDataframeDisplay=True']
DATA_PROCESSING_BQ_SINK = s8s-spark-ml-mlops.customer_churn_ds.customer_churn_training_data
DATA_PROCESSING_BQ_SINK_URI = bq://s8s-spark-ml-mlops.customer_churn_ds.customer_churn_training_data


#### e. Dataset registration specific

In [7]:
MANAGED_DATASET_NM = f"Customer_Churn_Model_Training_Base-{UNIQUE_ID}"

#### f. Model specific

In [8]:
MODEL_TRAINING_BATCH_PREFIX = "customer-churn-model-training"
MODEL_TRAINING_BATCH_INSTANCE_ID = f"{MODEL_TRAINING_BATCH_PREFIX}-{UNIQUE_ID}"
MODEL_TRAINING_MAIN_PY_SCRIPT = f"{PY_SCRIPTS_FQP}/model_training.py"
MODEL_TRAINING_RAW_SOURCE_FQP = f"{DATA_PROCESSING_BQ_SINK}"
MODEL_TRAINING_SCRATCH_BUCKET = f"{SCRATCH_BUCKET}/{MODEL_TRAINING_BATCH_PREFIX}"
MODEL_TEST_RESULTS_TABLE_FQN = f"{PROJECT_ID}.customer_churn_ds.customer_churn_test_predictions"
MODEL_BUCKET_URI = f"{MODEL_BUCKET}/{MODEL_TRAINING_BATCH_PREFIX}"
MODEL_METRICS_TABLE_FQN = f"{PROJECT_ID}.customer_churn_ds.customer_churn_model_metrics"
MODEL_FEATURE_IMP_TABLE_FQN = f"{PROJECT_ID}.customer_churn_ds.customer_churn_model_feature_importance"
MODEL_TRAINING_ARGS = [f"--appName={MODEL_TRAINING_BATCH_PREFIX}", \
        f"--projectID={PROJECT_ID}", \
        f"--bigQuerySourceTableFQN={MODEL_TRAINING_RAW_SOURCE_FQP}",  \
        f"--bigQueryModelTestResultsTableFQN={MODEL_TEST_RESULTS_TABLE_FQN}",  \
        f"--sparkBigQueryScratchBucketUri={MODEL_TRAINING_SCRATCH_BUCKET}",  \
        f"--sparkMlModelBucketUri={MODEL_BUCKET_URI}",  \
        f"--bigQueryModelMetricsTableFQN={MODEL_METRICS_TABLE_FQN}", \
        f"--bigQueryFeatureImportanceTableFQN={MODEL_FEATURE_IMP_TABLE_FQN}", \
        f"--enableDataframeDisplay={True}"]

print('MODEL_TRAINING_BATCH_PREFIX =',MODEL_TRAINING_BATCH_PREFIX)
print('MODEL_TRAINING_BATCH_INSTANCE_ID =',MODEL_TRAINING_BATCH_INSTANCE_ID)
print('MODEL_TRAINING_MAIN_PY_SCRIPT =',MODEL_TRAINING_MAIN_PY_SCRIPT)
print('MODEL_TRAINING_RAW_SOURCE_FQP =',MODEL_TRAINING_RAW_SOURCE_FQP)
print('MODEL_TRAINING_SCRATCH_BUCKET =',MODEL_TRAINING_SCRATCH_BUCKET)
print('MODEL_TEST_RESULTS_TABLE_FQN =',MODEL_TEST_RESULTS_TABLE_FQN)
print('MODEL_BUCKET_URI =',MODEL_BUCKET_URI)
print('MODEL_METRICS_TABLE_FQN =',MODEL_METRICS_TABLE_FQN)
print('MODEL_FEATURE_IMP_TABLE_FQN =',MODEL_FEATURE_IMP_TABLE_FQN)
print('MODEL_TRAINING_ARGS =',MODEL_TRAINING_ARGS)

MODEL_TRAINING_BATCH_PREFIX = customer-churn-model-training
MODEL_TRAINING_BATCH_INSTANCE_ID = customer-churn-model-training-1544
MODEL_TRAINING_MAIN_PY_SCRIPT = gs://s8s_code_bucket-974925525028/pyspark/model_training.py
MODEL_TRAINING_RAW_SOURCE_FQP = s8s-spark-ml-mlops.customer_churn_ds.customer_churn_training_data
MODEL_TRAINING_SCRATCH_BUCKET = s8s-spark-bucket-974925525028/customer-churn-model-training
MODEL_TEST_RESULTS_TABLE_FQN = s8s-spark-ml-mlops.customer_churn_ds.customer_churn_test_predictions
MODEL_BUCKET_URI = gs://s8s_model_bucket-974925525028/customer-churn-model-training
MODEL_METRICS_TABLE_FQN = s8s-spark-ml-mlops.customer_churn_ds.customer_churn_model_metrics
MODEL_FEATURE_IMP_TABLE_FQN = s8s-spark-ml-mlops.customer_churn_ds.customer_churn_model_feature_importance
MODEL_TRAINING_ARGS = ['--appName=customer-churn-model-training', '--projectID=s8s-spark-ml-mlops', '--bigQuerySourceTableFQN=s8s-spark-ml-mlops.customer_churn_ds.customer_churn_training_data', '--bigQuery

### 2. Initialize Vertex AI SDK for Python

In [9]:
vertex_ai.init(project=PROJECT_ID, location=LOCATION, staging_bucket=SCRATCH_BUCKET)

### 3. Define Vertex AI Pipeline

In [10]:
print('common_utils_py_fqn =', PYSPARK_COMMON_UTILS_SCRIPT_FQP)
print('deps_bucket_fqn  =', PY_SCRIPTS_FQP)
print('project_id  =', PROJECT_ID)
print('location =', LOCATION)
print('subnetwork_uri =', SUBNET_RESOURCE_URI)
print('spark_phs_nm =', PERSISTENT_SPARK_HISTORY_SERVER_RESOURCE_URI)
print('container_image =', DOCKER_IMAGE_FQN)
print('service_account =', UMSA_FQN)

print('data_preprocessing_batch_id =', DATA_PREPROCESSING_BATCH_INSTANCE_ID)
print('data_preprocessing_main_py_fqn =', DATA_PREPROCESSING_MAIN_PY_SCRIPT)
print('data_preprocessing_args =', DATA_PREPROCESSING_ARGS)

common_utils_py_fqn = ['gs://s8s_code_bucket-974925525028/pyspark/common_utils.py']
deps_bucket_fqn  = gs://s8s_code_bucket-974925525028/pyspark
project_id  = s8s-spark-ml-mlops
location = us-central1
subnetwork_uri = projects/s8s-spark-ml-mlops/regions/us-central1/subnetworks/spark-snet
spark_phs_nm = projects/s8s-spark-ml-mlops/regions/us-central1/clusters/s8s-sphs-974925525028
container_image = gcr.io/s8s-spark-ml-mlops/dataproc_serverless_custom_runtime:1.0.2
service_account = s8s-lab-sa@s8s-spark-ml-mlops.iam.gserviceaccount.com
data_preprocessing_batch_id = customer-churn-data-preprocessing-1544
data_preprocessing_main_py_fqn = gs://s8s_code_bucket-974925525028/pyspark/data_preprocessing.py
data_preprocessing_args = ['--appName=customer-churn-data-preprocessing', '--projectID=s8s-spark-ml-mlops', '--rawDatasetBucketFQN=gs://s8s_data_bucket-974925525028/customer_churn_train_data.csv', '--sparkBigQueryScratchBucketUri=s8s-spark-bucket-974925525028/customer-churn-data-preprocessing'

In [11]:
@dsl.pipeline(
    name=PIPELINE_NM, 
    description="A SparkMLlib MLOps Vertex pipeline")
def fn_SparkMlopsPipeline(
    project_id: str = PROJECT_ID,
    location: str = LOCATION,
    service_account: str = UMSA_FQN,
    subnetwork_uri: str = SUBNET_RESOURCE_URI,
    spark_phs_nm: str = PERSISTENT_SPARK_HISTORY_SERVER_RESOURCE_URI,
    container_image: str = DOCKER_IMAGE_FQN,
    common_utils_py_fqn: list = PYSPARK_COMMON_UTILS_SCRIPT_FQP,
    data_preprocessing_pyspark_batch_id: str = DATA_PREPROCESSING_BATCH_INSTANCE_ID,
    data_preprocessing_main_py_fqn: str = DATA_PREPROCESSING_MAIN_PY_SCRIPT,
    data_preprocessing_args: list = DATA_PREPROCESSING_ARGS,
    managed_dataset_src_uri: str = DATA_PROCESSING_BQ_SINK_URI,
    managed_dataset_nm: str = MANAGED_DATASET_NM,
    model_training_pyspark_batch_id: str = MODEL_TRAINING_BATCH_PREFIX,
    model_training_main_py_fqn: str = MODEL_TRAINING_MAIN_PY_SCRIPT,
    model_training_args: list = MODEL_TRAINING_ARGS,
):
    from google_cloud_pipeline_components.experimental.dataproc import \
        DataprocPySparkBatchOp

    # Step 1. PRE-PROCESS DATA in PREP FOR MODEL TRAINING
    # ....................................................................
    dataPreprocessingStep = DataprocPySparkBatchOp(
        project = project_id,
        location = location,
        container_image = container_image,
        subnetwork_uri = subnetwork_uri,
        spark_history_dataproc_cluster = spark_phs_nm,
        service_account = service_account,     
        batch_id = data_preprocessing_pyspark_batch_id,
        main_python_file_uri = data_preprocessing_main_py_fqn,
        python_file_uris = common_utils_py_fqn,
        args = data_preprocessing_args
    ).set_display_name("Pre-processing")
    
    # Step 2. REGISTER PRE-PROCESSED DATA AS MANAGED DATASET
    # ....................................................................
    createManagedDatasetStep = vertex_ai_components.TabularDatasetCreateOp(
        display_name= managed_dataset_nm,
        bq_source=managed_dataset_src_uri,
        project=project_id,
        location=location,
    ).after(dataPreprocessingStep).set_display_name("Dataset registration")
    
    # Step 3. TRAIN MODEL
    # .................................................................... 
    trainSparkMLModelStep = DataprocPySparkBatchOp(
        project = project_id,
        location = location,
        container_image = container_image,
        subnetwork_uri = subnetwork_uri,
        spark_history_dataproc_cluster = spark_phs_nm,
        service_account = service_account,     
        batch_id = model_training_pyspark_batch_id,
        main_python_file_uri = model_training_main_py_fqn,
        python_file_uris = common_utils_py_fqn,
        args = model_training_args
    ).after(dataPreprocessingStep).set_display_name("Model training")
    
    # Step 4. LOG MODEL METRICS INTO MODEL METASTORE
    # .................................................................... 
    #TODO
    

    

### 4. Compile the Vertex AI Pipeline into a JSON

In [12]:
compiler.Compiler().compile(pipeline_func=fn_SparkMlopsPipeline, package_path=PIPELINE_PACKAGE_SRC_LOCAL_PATH)



### 5. Submit the Pipeline for execution

In [13]:
pipeline = vertex_ai.PipelineJob(
    display_name=PIPELINE_NM,
    template_path=PIPELINE_PACKAGE_SRC_LOCAL_PATH,
    pipeline_root=PIPELINE_ROOT_GCS_URI,
    enable_caching=False  # True
)

pipeline.submit(service_account=UMSA_FQN)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/974925525028/locations/us-central1/pipelineJobs/pyspark-customer-churn-pipeline-20220726225150
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/974925525028/locations/us-central1/pipelineJobs/pyspark-customer-churn-pipeline-20220726225150')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pyspark-customer-churn-pipeline-20220726225150?project=974925525028
