# An end-to-end Vertex Batch Prediction Pipeline Demonstration

Finally, check that you have correctly installed the packages. The KFP SDK version should be >=1.6:

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.2


In [1]:
import os
from functools import partial

import kfp
import pprint
import yaml
from jinja2 import Template
from kfp.v2 import dsl
from kfp.v2.compiler import compiler
from kfp.v2.dsl import Dataset
from kfp.v2.google.client import AIPlatformClient

from google.cloud import aiplatform, storage

import pandas as pd

In [2]:
project_id='petcircle-science-playground'
project_number='734227425472'

In [3]:
af_registry_location='us-central1'
af_registry_name='mlops-vertex-kit'

In [4]:
components_dir='../components/'

In [14]:
def _load_custom_component(project_id: str,
                           af_registry_location: str,
                           af_registry_name: str,
                           components_dir: str,
                           component_name: str):
  component_path = os.path.join(components_dir,
                                component_name,
                                'component.yaml.jinja')
  with open(component_path, 'r') as f:
    component_text = Template(f.read()).render(
      project_id=project_id,
      af_registry_location=af_registry_location,
      af_registry_name=af_registry_name)

  return kfp.components.load_component_from_text(component_text)

load_custom_component = partial(_load_custom_component,
                                project_id=project_id,
                                af_registry_location=af_registry_location,
                                af_registry_name=af_registry_name,
                                components_dir=components_dir)

In [15]:
preprocess_op = load_custom_component(component_name='data_preprocess')
batch_prediction_op = load_custom_component(component_name='batch_prediction')

Then define the pipeline using the following function:

In [7]:
pipeline_region='us-central1'
pipeline_root='gs://vertex_pipeline_demo_root_hy/pipeline_root'

In [8]:
data_region='us-central1'
input_dataset_uri='bq://petcircle-science-playground.datalake.review_product_2013_2022'
gcs_data_output_folder='gs://vertex_pipeline_demo_root_hy/datasets/prediction'
gcs_result_folder='gs://vertex_pipeline_demo_root_hy/prediction'
prediction_results_uri='gs://vertex_pipeline_demo_root_hy/prediction'

data_pipeline_root='gs://vertex_pipeline_demo_root_hy/compute_root'

batch_prediction_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/batch_prediction:latest'
custom_job_service_account=f'{project_number}-compute@developer.gserviceaccount.com'

### New

@dsl.pipeline(name='batch-prediction-pipeline-template')
def pipeline(project_id: str,
             data_region: str,
             gcs_data_output_folder: str,
             data_pipeline_root: str,
             gcs_result_folder: str,
             model_resource_name: str = '',
             endpoint_resource_name: str = '',
             machine_type: str = "n1-standard-4",
             accelerator_count: int = 0,
             accelerator_type: str = 'ACCELERATOR_TYPE_UNSPECIFIED',
             starting_replica_count: int = 1,
             max_replica_count: int = 2,
             task_type: str = 'training'):

    preprocess_task = preprocess_op(
      project_id=project_id,
      data_region=data_region,
      gcs_output_folder=gcs_data_output_folder,
      gcs_output_format="CSV",
      task_type=task_type)
    
    #output_dataset = "gs://cloud-ai-platform-8d31bd26-da45-4554-9d98-d4267cffac95/X_test.jsonl"
    output_dataset = preprocess_task.outputs['output_dataset']

    batch_prediction_op(
      project_id=project_id,
      data_region=data_region,
      data_pipeline_root=data_pipeline_root,
      gcs_result_folder=gcs_result_folder,
      instances_format='jsonl',
      predictions_format='jsonl',
      input_dataset=output_dataset,
      model_resource_name=model_resource_name,
      endpoint_resource_name=endpoint_resource_name,
      machine_type=machine_type,
      accelerator_type=accelerator_type,
      accelerator_count=accelerator_count,
      starting_replica_count=starting_replica_count,
      max_replica_count=max_replica_count)

### Compile and run the end-to-end ML pipeline
With our full pipeline defined, it's time to compile it:

In [17]:
compiler.Compiler().compile(
    pipeline_func=pipeline, 
    package_path="batch_prediction_pipeline_job.json"
)

Next, instantiate an API client:

In [18]:
api_client = AIPlatformClient(
    project_id=project_id,
    region=pipeline_region)

In [20]:
ENDPOINT_ID="3122164422139707392"
PROJECT_ID="petcircle-science-playground"
task_type='batch_prediction'
model_resource_name="projects/734227425472/locations/us-central1/models/1841659986292244480"

pipeline_params = {
    'project_id': project_id,
    'data_region': data_region,
    'gcs_data_output_folder': gcs_data_output_folder,
    'gcs_result_folder': gcs_result_folder,
    'data_pipeline_root': data_pipeline_root,
    'model_resource_name': model_resource_name,
    'task_type': task_type
}

response = api_client.create_run_from_job_spec(
    job_spec_path="batch_prediction_pipeline_job.json", 
    pipeline_root=pipeline_root,
    parameter_values=pipeline_params,
    enable_caching=False)

### Inspect results

In [21]:
def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # source_blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(
        "Downloaded storage object {} from bucket {} to local file {}.".format(
            source_blob_name, bucket_name, destination_file_name
        )
    )

In [26]:
uri = 'gs://vertex_pipeline_demo_root_hy/prediction/prediction-model-1646273888729-2022_03_07T14_45_47_642Z/prediction.results-00000-of-00006'
pattern = 'prediction-model-1646774215078-2022_03_08T18_17_45_077Z/prediction.results'
bucket_name = 'vertex_pipeline_demo_root_hy'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
predictions = pd.DataFrame([])

if not os.path.exists('prediction'):
    os.mkdir('prediction')

for blob in bucket.list_blobs():
    if pattern in blob.name:
        local_path = os.path.join('prediction', blob.name.split('/')[-1])
        blob.download_to_filename(local_path)
        predictions = predictions.append(pd.read_json(path_or_buf=local_path, lines=True), ignore_index=True)
        
predictions['reviewtext'] = predictions['instance'].apply(lambda x: x['reviewtext'])
predictions['1'], predictions['0'] = zip(*predictions['prediction'].apply(lambda x: x['confidences']))
predictions = predictions[['reviewtext', '1', '0']]

In [27]:
predictions

Unnamed: 0,reviewtext,1,0
0,I didn't like it. The plastic bag got filthy a...,0.135712,0.864288
1,My puppy cavoodle ate threw the end within 5mins,0.002117,0.997883
2,My cavoodle destroyed this within the hour .....,0.008663,0.991337
3,Very happy. Fast delivery.Good service.thanks ...,0.998432,0.001568
4,"Dogs loved them, have two boxer crosses. Rewar...",0.970375,0.029625
5,I like this litter as it's recycled paper and ...,0.041058,0.958942
6,"Lightweight, good quality",0.973138,0.026862
7,My two cats are four years old and still behav...,0.390441,0.609559
8,"My dog wasn't happy eating meals, but he has b...",0.996557,0.003443
9,Excellent price and delivery was prompt.,0.997529,0.002471


### Just kickoff the batch job manually

In [10]:
ENDPOINT_ID="3122164422139707392"
PROJECT_ID="petcircle-science-playground"

endpoint_resource_name = 'projects/'+PROJECT_ID+'/locations/us-central1/endpoints/'+ENDPOINT_ID

def _get_endpoint(resource_name: str) -> aiplatform.Endpoint:
    return aiplatform.Endpoint(resource_name)


def _get_model(resource_name: str) -> aiplatform.Model:
    return aiplatform.Model(resource_name)


def _get_model_from_endpoint(endpoint: aiplatform.Endpoint) -> aiplatform.Model:
    current_deployed_model_id = None

    traffic_split = endpoint.gca_resource.traffic_split
    for key in traffic_split:
        if traffic_split[key] == 100:
            current_deployed_model_id = key
            break

        if current_deployed_model_id:
            for deployed_model in endpoint.gca_resource.deployed_models:
                if deployed_model.id == current_deployed_model_id:
                    return aiplatform.Model(deployed_model.model)

In [11]:
model = _get_model_from_endpoint(_get_endpoint(endpoint_resource_name))

In [25]:
endpoint = _get_endpoint(endpoint_resource_name)

In [31]:
endpoint.predict([{'reviewtext': 'pet circle is not recommended'}])

Prediction(predictions=[{'confidences': [0.0970509946346283, 0.9029490053653717], 'displayNames': ['1', '0']}], deployed_model_id='87705843524435968', explanations=None)

In [39]:
endpoint.gca_resource.traffic_split, endpoint.gca_resource.deployed_models

({'87705843524435968': 100},
 [id: "87705843524435968"
 model: "projects/734227425472/locations/us-central1/models/1841659986292244480"
 display_name: "model-1646273888729"
 create_time {
   seconds: 1646274579
   nanos: 116293000
 }
 dedicated_resources {
   machine_spec {
     machine_type: "n1-standard-4"
   }
   min_replica_count: 1
   max_replica_count: 1
 }
 ])

In [3]:
model = aiplatform.Model("projects/734227425472/locations/us-central1/models/1841659986292244480")

In [4]:
model.container_spec.__getattr__

<bound method Message.__getattr__ of image_uri: "us-central1-docker.pkg.dev/petcircle-science-playground/mlops-vertex-kit/serving:latest"
env {
  name: "TRAINING_DATA_SCHEMA"
  value: "reviewtext:string;Class:int"
}
env {
  name: "MODEL_FILENAME"
  value: "model.h5"
}
predict_route: "/predict"
health_route: "/health"
>

In [2]:
from google.cloud.aiplatform_v1.types.env_var import EnvVar

In [5]:
new_env = EnvVar(name="TRAINING_DATA_CSR", value="blablablabla")
model.container_spec.__getattr__("env").append(new_env)
model.container_spec.__getattr__

<bound method Message.__getattr__ of image_uri: "us-central1-docker.pkg.dev/petcircle-science-playground/mlops-vertex-kit/serving:latest"
env {
  name: "TRAINING_DATA_SCHEMA"
  value: "reviewtext:string;Class:int"
}
env {
  name: "MODEL_FILENAME"
  value: "model.h5"
}
env {
  name: "TRAINING_DATA_CSR"
  value: "blablablabla"
}
predict_route: "/predict"
health_route: "/health"
>

In [9]:
def download_from_gcs(gcs_file_path: str, local_file_path: str):
    client = storage.Client()
    with open(local_file_path, 'wb') as f:
        client.download_blob_to_file(gcs_file_path, f)

def get_latest_model_uri(data_pipeline_root: str, latest_model_filename: str):
    latest_model_path = os.path.join(data_pipeline_root, latest_model_filename)
    download_from_gcs(latest_model_path, latest_model_filename)
    with open(latest_model_filename, 'r') as f:
        latest_model_uri = json.load(f)['model_uri']

    logging.info(f'Latest model uri: {latest_model_uri}')

    return latest_model_uri

In [10]:
data_pipeline_root = 'gs://petcircle-ai-pipeline-storage-dev/basket-prediction/compute_root'
get_latest_model_uri(data_pipeline_root, 'latest_model.json')

Forbidden: 403 GET https://storage.googleapis.com/download/storage/v1/b/petcircle-ai-pipeline-storage-dev/o/basket-prediction%2Fcompute_root%2Flatest_model.json?alt=media: 734227425472-compute@developer.gserviceaccount.com does not have storage.objects.get access to the Google Cloud Storage object.: ('Request failed with status code', 403, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)

In [17]:
#input_dataset='gs://cloud-ai-platform-8d31bd26-da45-4554-9d98-d4267cffac95/X_test.jsonl'
#input_dataset='gs://vertex_pipeline_demo_root_hy/datasets/training/X_test.jsonl'
#input_dataset='gs://vertex_pipeline_demo_root_hy/datasets/training/X_test.csv'
input_dataset='gs://vertex_pipeline_demo_root_hy/datasets/training/X_test_text.jsonl'
gcs_result_folder='gs://vertex_pipeline_demo_root_hy/prediction'
bigquery_destination_prefix='petcircle-science-playground'

model.batch_predict(job_display_name='batch-prediction',
      gcs_source=input_dataset,
      #gcs_destination_prefix=gcs_result_folder,
    #predictions_format='jsonl',
    bigquery_destination_prefix=bigquery_destination_prefix,
      instances_format='jsonl',
       machine_type='n1-standard-4')

INFO:google.cloud.aiplatform.jobs:Creating BatchPredictionJob


InvalidArgument: 400 Bigquery format must be used as input and output simultaneously. Input format(jsonl) and output format(bigquery).