# Vertex Pipelines - A Serverless framework for MLOps

## Setup

In [1]:
# !pip install google-cloud-pipeline-components==0.2.0

In [1]:
import os
PROJECT_ID = ""

# Obtener el project ID
# Get your Google Cloud project ID from gcloud

if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  teco-prod-adam-dev-826c


In [2]:
# variables de entorno: PATH local, region de GCP y timestamp

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

REGION="us-central1" # disponibilidad completa de Vertex / Complete Vertex availability

from datetime import datetime

TIMESTAMP =datetime.now().strftime("%Y%m%d%H%M%S")

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


In [3]:
# parametros cloud (almacenamiento de objetos, outputs, etc)
# cloud environment parameters (object storage, outputs, etc)
STAGE_DATA_BUCKET = f'{PROJECT_ID}-chicago_taxi_stage'
PIPELINE_BUCKET = f'{PROJECT_ID}-chicago_taxi_pipelines'
PIPELINE_ROOT = f"gs://{PIPELINE_BUCKET}/pipeline_root/"

# configuracion de componentes de ml
# ML components configurations
SERVING_CONTAINER = 'us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest'
MACHINE_TYPE = 'n1-standard-16'

# configuracion de intentos de tuneo de hiperparametros
# configuration of hp tuning job trials
RF_HP_IMAGE = f'gcr.io/{PROJECT_ID}/rf_hp_job:v1'
LR_HP_IMAGE = f'gcr.io/{PROJECT_ID}/lr_hp_job:v1'
HP_TRAIN_MACHINE = "n1-standard-16"
HP_TRIALS = 3
PARALLEL_TRIALS = 3

# umbral de aceptabilidad para prediccion batch
# acceptability threshold for batch prediction
THRESHOLD = 0.7

# habilitar cache en el pipeline para ahorrar costos
# enable cache in pipeline execution to prevent costs
ENABLE_CACHE = True

# rutas de acceso a los datasets de train, val y test
# access paths for train, val and test datasets
TRAIN_DATA_PATH = 'chicago_taxi_train.csv'
VAL_DATA_PATH = 'chicago_taxi_val.csv'
TEST_DATA_PATH = 'chicago_taxi_test.csv'

In [4]:
# BigQuery: definiciones de variables, pueden ser facilmente reemplazadas para adaptarse a otros propositos
# BigQuery variable definitions: those can be easily changed to suit another purpose

BQ_DATASET_HISTORIC_NAME = 'chicago_taxi_historic_test'
BQ_DATASET_CURRENT_NAME = 'chicago_taxi_current_test'

BQ_HISTORIC_RAW = 'raw'
BQ_HISTORIC_STAGE = 'stage_ml'

BQ_CURRENT_RAW = 'raw'
BQ_CURRENT_STAGE = 'stage_ml'

BQ_CURRENT_RAW_URL = f"{PROJECT_ID}.{BQ_DATASET_CURRENT_NAME}.{BQ_CURRENT_RAW}"
BQ_CURRENT_STAGE_URL = f"{PROJECT_ID}.{BQ_DATASET_CURRENT_NAME}.{BQ_CURRENT_STAGE}"

BQ_HISTORIC_RAW_URL = f"{PROJECT_ID}.{BQ_DATASET_HISTORIC_NAME}.{BQ_HISTORIC_RAW}"
BQ_HISTORIC_STAGE_URL = f"{PROJECT_ID}.{BQ_DATASET_HISTORIC_NAME}.{BQ_HISTORIC_STAGE}"

### Librerias - Packages

In [5]:
import matplotlib.pyplot as plt
import pandas as pd

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath, ClassificationMetrics
from typing import NamedTuple

from google.cloud import aiplatform, bigquery

# We'll use this namespace for metadata querying
from google.cloud import aiplatform_v1

from google.cloud.aiplatform import pipeline_jobs

from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.experimental.custom_job.utils import create_custom_training_job_op_from_component

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)
    
    


Project ID:  teco-prod-adam-dev-826c


### Definicion de componentes - Component definition

Hay una carpeta llamada *components* donde se encuentra una notebook que genera los distintos componentes necesarios para el pipeline y sus correspondientes yaml. Dada las características de este pipeline, muchos componentes requieren armarse de manera customizada y no depender de los preconstruidos.

There's a folder named *components* with a notebook that generates the needed components for the pipeline and their corresponding yaml files. Given the complexity of this pipeline, many components needs to be custom built and not use the pre-built one.

In [6]:
best_model_evaluation = kfp.components.load_component_from_file('./components/best_model_evaluation.yaml')
best_model_hp_tuning = kfp.components.load_component_from_file('./components/best_model_hp_tuning.yaml')
bq_current_raw_to_stage = kfp.components.load_component_from_file('./components/bq_current_raw_to_stage.yaml')
bq_historic_raw_to_stage = kfp.components.load_component_from_file('./components/bq_historic_raw_to_stage.yaml')
get_chicago_data = kfp.components.load_component_from_file('./components/get_chicago_data.yaml')
model_evaluation = kfp.components.load_component_from_file('./components/model_evaluation.yaml')
train_best_model = kfp.components.load_component_from_file('./components/train_best_model.yaml')
train_lr_chicago = kfp.components.load_component_from_file('./components/train_lr_chicago.yaml')
train_rf_chicago = kfp.components.load_component_from_file('./components/train_rf_chicago.yaml')
upload_model_to_vertex_and_batch_prediction = kfp.components.load_component_from_file('./components/upload_model_to_vertex_and_batch_prediction.yaml')


El componente de entrenamiento tiene que ser convertido en jobs para que pueda funcionar dentro del contexto de la suite.

The training components need to be turned into jobs to be performed within the context of the suite.

In [7]:
# Definicion del job usando el componente custom previamente creado
# Job definition using previously created custom component

train_rf_chicago_op = create_custom_training_job_op_from_component(
    train_rf_chicago,
    machine_type=MACHINE_TYPE
)

train_best_model_op = create_custom_training_job_op_from_component(
    train_best_model,
    machine_type=MACHINE_TYPE
)

train_lr_chicago_op = create_custom_training_job_op_from_component(
    train_lr_chicago,
    machine_type=MACHINE_TYPE
)

### Pipeline

La definición del pipeline es esencialmente una función. Desde aquí se orquesta todo el proceso, pasándole los parámetros necesarios para que cada elemento del pipeline pueda performar. De manera similar a un orquestador, cada una de las funciones es un paso que tiene *inputs* y *outputs*, así como también algunos parámetros y valores que opcionalmente pueden salirse del flujo natural: esto resulta de mucha utilidad para almacenar métricas o valores a partir de los cuales se tomarán decisiones a lo largo del camino. 

Esto se evidencia especialmente en la predicción batch, que utiliza el objeto Condition a partir del cual, si el modelo entrenado cumple con un cierto standard (en este caso, la métrica F1) el trabajo se ejecuta, y en caso contrario, se detiene todo el proceso.

En cuanto a las métricas, estas pueden ser consultadas en distintas etapas del proceso, así como también visualizarse en la interfaz gráfica de Vertex.

The pipeline definition is essentially a function. From here the entire process is orchestrated, passing along needed parameters so each element can trigger. Each of the functions is a step with inputs and outputs, as well as some parameters and values that can optionally skip or exit the flow: this is very useful to store metrics or values upon which certain parts of the process are triggered or not.

This is specially notable in batch prediction job, based on the dsl.Condition object that executes jobs according to a certain criteria met. In this case, if the trained model has an F1 score above the threshold, the batch prediction gets triggered. 

In [8]:
### pipeline mio
@dsl.pipeline(name='chicago-taxi-pipeline',
                pipeline_root=PIPELINE_ROOT)
def pipeline(
    project_id: str = PROJECT_ID,
    gcp_region: str = REGION,
    stage_data_bucket: str = STAGE_DATA_BUCKET,
    pipelines_bucket: str = PIPELINE_BUCKET,
    pipeline_root: str = PIPELINE_ROOT,
    serving_container: str = SERVING_CONTAINER,
    machine_type: str = MACHINE_TYPE,
    trials: int = HP_TRIALS,
    parallel_trials: int = PARALLEL_TRIALS,
    rf_hp_image: str = RF_HP_IMAGE,
    lr_hp_image: str = LR_HP_IMAGE,
    hp_train_machine: str = HP_TRAIN_MACHINE,
    bq_current_raw_url: str = BQ_CURRENT_RAW_URL,
    bq_current_stage_url: str = BQ_CURRENT_STAGE_URL,
    bq_historic_raw_url: str = BQ_HISTORIC_RAW_URL,
    bq_historic_stage_url: str = BQ_HISTORIC_STAGE_URL,
    threshold: float = THRESHOLD,
    enable_cache: bool = ENABLE_CACHE
    
    
):
    
    bq_stage_ml = bq_historic_raw_to_stage(
        project = project_id,
        region = gcp_region,
        bq_historic_raw_url = bq_historic_raw_url,
        bq_historic_stage_url = bq_historic_stage_url
        
    )
    
    bq_current_ml = bq_current_raw_to_stage(
        project = project_id,
        region = gcp_region,
        bq_current_raw_url = bq_current_raw_url,
        bq_current_stage_url = bq_current_stage_url,
        stage_data_bucket = stage_data_bucket
        
    )
    
    dataframe = get_chicago_data(project = project_id,
                                 region = gcp_region,
                                 bq_source_url = bq_stage_ml.output,
                                 stage_data_bucket = stage_data_bucket)
    
    train_lr_op = train_lr_chicago_op(dataframe.outputs['dataset_train'],
                                         project = project_id,
                                         location = gcp_region)
    
    
    train_rf_op = train_rf_chicago_op(dataframe.outputs['dataset_train'],
                                         project = project_id,
                                         location = gcp_region)
    
    model_selection = model_evaluation(
        val_set = dataframe.outputs['dataset_val'],
        lr_chicago_model = train_lr_op.outputs['model'],
        rf_chicago_model = train_rf_op.outputs['model'],
    
    )
    
    hp_search = best_model_hp_tuning(
        project = project_id,
        region = gcp_region,
        stage_data_bucket = stage_data_bucket,
        timestamp = dataframe.outputs['timestamp'],
        winning_model_name = model_selection.outputs['winning_model_name'],
        trials = trials,
        parallel_trials = parallel_trials,
        rf_hp_image = rf_hp_image,
        lr_hp_image = lr_hp_image,
        hp_train_machine = hp_train_machine
    )
    
    best_model = train_best_model_op(
        dataset_train = dataframe.outputs['dataset_train'], 
        dataset_val = dataframe.outputs['dataset_val'],
        project = project_id,
        location = gcp_region,
        winning_model_name = model_selection.outputs['winning_model_name'],
        parameters = hp_search.outputs['model_spec']
    )
    
    best_model_eval_decision = best_model_evaluation(
        test_set = dataframe.outputs['dataset_test'],
        winning_model_name = model_selection.outputs['winning_model_name'],
        best_model = best_model.outputs['model'],
        threshold = threshold
    )
    
    with dsl.Condition(
        best_model_eval_decision.outputs['dep_decision']=='true',
        name = 'predict_decision'
    ):
        predict_op = upload_model_to_vertex_and_batch_prediction(
            project = project_id,
            region = gcp_region,
            serving_container = serving_container,
            trained_model = best_model.outputs['model'],
            winning_model_name = model_selection.outputs['winning_model_name'],
            gcs_predict_source = bq_current_ml.outputs['gcs_predict_source'],
            gcs_predict_dest = f'gs://{stage_data_bucket}'
        )
    
    

El compilador arroja como resultado un template en formato json que puede ser reutilizado en otras ejecuciones.

The compiler creates a template in json format that can be reused in other executions.

In [9]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='chicago-taxi-pipeline.json')



El ID tiene que ser único, y es en esta parte donde se puede habilitar el caché para reducir costos. Por default es *true*.

The ID must be unique, and here the cache can be enabled to save costs. The default value is *true*.

In [10]:
pipeline_job = aiplatform.PipelineJob(
    display_name="chicago-taxi-pipeline",
    template_path="chicago-taxi-pipeline.json",
    job_id="chicago-taxi-pipeline-{0}".format(TIMESTAMP),
    enable_caching=ENABLE_CACHE,
)

In [11]:
pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/1085281337041/locations/us-central1/pipelineJobs/chicago-taxi-pipeline-20220315175301
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/1085281337041/locations/us-central1/pipelineJobs/chicago-taxi-pipeline-20220315175301')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/chicago-taxi-pipeline-20220315175301?project=1085281337041
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/1085281337041/locations/us-central1/pipelineJobs/chicago-taxi-pipeline-20220315175301 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/1085281337041/locations/u

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [best-model-hp-tuning].; Job (project_id = teco-prod-adam-dev-826c, job_id = 4294688020046544896) is failed due to the above error.; Failed to handle the job: {project_number = 1085281337041, job_id = 4294688020046544896}"


In [None]:
# "google.api_core.exceptions.Forbidden: 403 GET https://storage.googleapis.com/storage/v1/b/chicago_taxi_stage?projection=noAcl&prettyPrint=false: service-1085281337041@gcp-sa-aiplatform-cc.iam.gserviceaccount.com does not have storage.buckets.get access to the Google Cloud Storage bucket.
"

In [None]:
# gcloud services enable compute.googleapis.com         \
#                        containerregistry.googleapis.com  \
#                        aiplatform.googleapis.com  \
#                        cloudbuild.googleapis.com \
#                        cloudfunctions.googleapis.com

In [1]:
!gcloud projects describe $GOOGLE_CLOUD_PROJECT

[1;31mERROR:[0m (gcloud.projects.describe) argument PROJECT_ID_OR_NUMBER: Must be specified.
Usage: gcloud projects describe PROJECT_ID_OR_NUMBER [optional flags]
  optional flags may be  --help

For detailed information on this command and its flags, run:
  gcloud projects describe --help


In [2]:
!echo $GOOGLE_CLOUD_PROJECT




In [13]:
from google.cloud import storage
bqclient = bigquery.Client()
storage_client = storage.Client()

In [14]:
STAGE_DATA_BUCKET = 'teco-prod-adam-dev-826c-chicago_taxi_stage'

In [15]:
TRAIN_DATA_PATH = 'data/chicago_taxi_train.csv'

In [16]:
gcsclient = storage.Client() # tal vez vaya stage_data_bucket
bucket = gcsclient.get_bucket(STAGE_DATA_BUCKET)
blob = bucket.blob(TRAIN_DATA_PATH)
blob.download_to_filename(TRAIN_DATA_PATH)

FileNotFoundError: [Errno 2] No such file or directory: 'data/chicago_taxi_train.csv'

In [19]:
blob.download_to_filename('test.csv')

FileNotFoundError: [Errno 2] No such file or directory: 'hola/test.csv'