In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# 03 - Vertex Pipelines using  Google Cloud Pipeline Components

## Overview 

This notebook shows how to use Kubeflow components to build a custom regression workflow on `Vertex AI Pipelines`.

You will build a pipeline in this notebook that looks like this:

<img src="img/pipelines-gcc.png" width="90%"/>





## Notebook Objective

In this notebook, you will learn to use `Vertex AI Pipelines` with **ONLY** `Google Cloud Pipeline Components` to build a `custom` tabular regression model. In the pipeline we  will orchestrate data creation, data processing, model training and evaluation, and model deployment. We'll also see how to send payloads the endpoint deployed and how to run batch predition jobs.

This lab uses the following Google Cloud services and resources:

- `BigQuery`
- `Vertex AI Pipelines`
- `Google Cloud Pipeline Components`
- `Vertex AI Model`
- `Vertex AI Model Registry`
- `Vertex AI Metadata`
- `Vertex AI Endpoint`

The steps performed in this notebook include:

1. [Load Configuration settings from the setup notebook](#Load-Configuration-settings-from-the-setup-notebook)
1. [Vertex Pipelines Introduction](#Vertex-Pipelines-Introduction)
1. [Create a KFP Pipeline](#Create-a-KFP-Pipeline)
    1. [Create a dataset in BigQuery](#Step-1:-Create-a-dataset-in-BigQuery)
    1. [Transform the Data](#Step-2:-Transform-the-Data)
    1. [Train and Evaluate our custom Regression Model](#Step-3:-Train-and-Evaluate-our-custom-Regression-Model)
    1. [Upload Model to Vertex AI and Deploy Endpoint](#Step-4:-Upload-Model-to-Vertex-AI-and-Deploy-Endpoint)
1. [Compile the KFP Pipeline](#Compile-the-KFP-Pipeline)
1. [Execute the KFP Pipeline using Vertex AI Pipelines](#Execute-the-KFP-Pipeline-using-Vertex-AI-Pipelines)
1. [Inspect Experiments](#)
1. [Online Predictions](#)
1. [Batch Preditions](#)


The Google Cloud Components are [documented here](https://google-cloud-pipeline-components.readthedocs.io/en/latest/google_cloud_pipeline_components.aiplatform.html#module-google_cloud_pipeline_components.aiplatform) 

### Dataset

In this workshop we'll use the **public datase**t [Auto MPG](https://archive.ics.uci.edu/ml/datasets/auto+mpg) for demonstration purposes. The data concerns city-cycle fuel consumption in miles per gallon, to be predicted in terms of 3 multivalued discrete and 5 continuous attributes. The objective will be to build a model to predict "MPG" (Miles per Gallon).

Check notebook  `01_exploratory_data_analysis.ipynb` for further details of the dataset

## Load Configuration settings from the setup notebook

In [None]:
# import our configurations from notebook 00_environment_setup.ipynb
from src.config import config

PROJECT_ID = config['PROJECT_ID']
REGION = config['REGION']
ID = config['ID']
BUCKET_NAME = config['BUCKET_NAME']
GCS_DATA_URI = config['GCS_DATA_URI']
BQ_DATASET_URI = config['BQ_DATASET_URI']

### Import libraries

In [None]:
# Misc
import os
import shutil
import logging
from datetime import datetime

# Import the Vertex AI Python SDK 
from google.cloud import aiplatform as aip
from google.cloud.aiplatform import pipeline_jobs
import google.auth
from google.cloud import storage
from google_cloud_pipeline_components import aiplatform as gcc_aip

# kfp sdk, to create the Vertex AI Pipelines
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline

from typing import NamedTuple

from kfp.v2.dsl import (Artifact, Dataset, Input, Model, Output, Metrics, ClassificationMetrics, component, OutputPath, InputPath)
from kfp.v2 import compiler


# TensorFlow model building libraries.
import tensorflow as tf


# Custom Modules
from src.helper import *

------------------------------------

## Create Custom Training Container

Train and deploy your model on Google Cloud's Vertex AI platform.

To train your BERT classifier on Google Cloud, you will you will package your Python training scripts and write a Dockerfile that contains instructions on your ML model code, dependencies, and execution instructions. You will build your custom container with Cloud Build, whose instructions are specified in `cloudbuild.yaml` and publish your container to your Artifact Registry. This workflow gives you the opportunity to use the same container to run as part of a portable and scalable [Vertex Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) workflow. 


You will walk through creating the following project structure for your ML mode code:
```
|--/container
   |--/trainer
      |--__init__.py
      |--model.py
      |--task.py
   |--Dockerfile
   |--cloudbuild.yaml
   |--requirements.txt
```

### Step 1: Write model.py training script

In [None]:
MODEL_DIR = f"container"
MODEL_DIR

In [None]:
%%writefile {MODEL_DIR}/trainer/model.py

from google.cloud import bigquery
from google.cloud import storage
import logging
import numpy as np
import pandas as pd
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

bqclient = bigquery.Client()
storage_client = storage.Client()

def download_table(bq_table_uri: str):
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix):]

    table = bigquery.TableReference.from_string(bq_table_uri)
    rows = bqclient.list_rows(
        table,
    )
    
    return rows.to_dataframe(create_bqstorage_client=False)


def build_and_compile_model(norm):
        model = keras.Sequential([
            norm,
            layers.Dense(64, activation='relu'),
            layers.Dense(64, activation='relu'),
            layers.Dense(1)
        ])
        model.compile(loss='mean_absolute_error', optimizer=tf.keras.optimizers.Adam(0.001))
        return model

def transform_data(df):
    df.rename(columns = {
        'mpg':'MPG',
        'cyl':'Cylinders',
        'dis':'Displacement',
        'hp': 'Horsepower',
        'weight': 'Weight',
        'accel': 'Acceleration',
        'year': 'Model Year',
        'origin': 'Origin'}, inplace = True)

    # Get data in shape
    df = df.copy()
    df.tail()
    df = df.dropna()
    df['Origin'] = df['Origin'].map({1: 'USA', 2: 'Europe', 3: 'Japan'})
    df = pd.get_dummies(df, columns=['Origin'], prefix='', prefix_sep='')
    

def train_model(params):
    import logging
    logging.info(f"Training bq path {params['train-data-dir']}")
    logging.info(f"Validation bq path {params['val-data-dir']}")
    train_dataset = download_table(params['train-data-dir'])
    test_dataset = download_table(params['val-data-dir'])
    
    train_dataset = transform_data(df=train_dataset)
    test_dataset = transform_data(df=test_dataset)
    
    train_features = train_dataset.copy()
    train_labels = train_features.pop('MPG')
    
    test_features = test_dataset.copy()
    test_labels = test_features.pop('MPG')

    # Create model
    normalizer = tf.keras.layers.Normalization(axis=-1)
    normalizer = tf.keras.layers.Normalization(axis=-1)
    normalizer.adapt(np.array(train_features))
    normalizer = tf.keras.layers.Normalization(axis=-1)
    normalizer.adapt(np.array(train_features))
    first = np.array(train_features[:1])
    horsepower = np.array(train_features['Horsepower'])
    horsepower_normalizer = layers.Normalization(input_shape=[1,], axis=None)
    horsepower_normalizer.adapt(horsepower)


    dnn_model = build_and_compile_model(normalizer)
    dnn_model.summary()

    history = dnn_model.fit(
        train_features,
        train_labels,
        validation_split=0.2,
        verbose=0, epochs=100
    )

    test_results = {}

    test_results['dnn_model'] = dnn_model.evaluate(
        test_features,
        test_labels,
        verbose=0
    )

    # Log metrics
    metrics_training = {metric: values[-1] for metric, values in history.history.items()}
    metrics.log_metric('loss', metrics_training['loss'])
    metrics.log_metric('val_loss', metrics_training['val_loss'])
    model.uri = bucket
    model.metadata['loss'] = metrics_training['loss']
    model.metadata['val_loss'] = metrics_training['val_loss']
    model.metadata['pipeline'] = pipeline_name

    # Save the model to GCS
    dnn_model.save(params['model-dir'])




-----

### Step 2: Write a `task.py` file as an entrypoint to your custom model container

In [None]:
%%writefile {MODEL_DIR}/trainer/task.py

import os
import argparse
import logging
    
    

from trainer import model

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    logging.info(f"Training bq path {os.environ['AIP_TRAINING_DATA_URI']}")
    logging.info(f"Validation bq path {os.environ['AIP_VALIDATION_DATA_URI']}")
    # Vertex custom container training args. These are set by Vertex AI during training but can also be overwritten.
    parser.add_argument('--model-dir', dest='model-dir',
                        default=os.environ['AIP_MODEL_DIR'], type=str, help='GCS URI for saving model artifacts.')
    
    parser.add_argument('--train-data-dir', dest='train-data-dir',
                        default=os.environ['AIP_TRAINING_DATA_URI'], type=str, help='BQ URI where the data is')    
    
    parser.add_argument('--val-data-dir', dest='val-data-dir',
                        default=os.environ['AIP_VALIDATION_DATA_URI'], type=str, help='BQ URI where the data is')    
    
    args = parser.parse_args()
    params = args.__dict__
    
    
    
    model.train_model(params)

------

### Step 3: Write a `Dockerfile` for your custom model container

Third, you will write a `Dockerfile` that contains instructions to package your model code in `container` as well as specifies your model code's dependencies needed for execution together in a Docker container.

In [None]:
%%writefile {MODEL_DIR}/Dockerfile
# Specifies base image and tag.
# https://cloud.google.com/vertex-ai/docs/training/pre-built-containers
FROM us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-6:latest

# Sets the container working directory.
WORKDIR /root

# Copies the requirements.txt into the container to reduce network calls.
COPY requirements.txt .

# Installs additional packages.
RUN pip3 install -U -r requirements.txt

# b/203105209 Removes unneeded file from TF2.5 CPU image for python_module CustomJob training. 
# Will be removed on subsequent public Vertex images.
RUN rm -rf /var/sitecustomize/sitecustomize.py

# Copies the trainer code to the docker image.
COPY . /trainer

# Sets the container working directory.
WORKDIR /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

---------

### Step 4: Write a `requirements.txt` file to specify additional ML code dependencies

These are additional dependencies for your model code not included in the pre-built Vertex TensorFlow images such as TensorFlow and google cloud sdks

In [None]:
%%writefile {MODEL_DIR}/requirements.txt
pandas
google-cloud-storage
google-cloud-bigquery==2.34.3
protobuf==3.20.1

----

### Step 5: Use Cloud Build to build and submit your model container to Google Cloud Artifact Registry

Next, you will use [Cloud Build](https://cloud.google.com/build) to build and upload your custom TensorFlow model container to [Google Cloud Artifact Registry](https://cloud.google.com/artifact-registry).

Cloud Build brings reusability and automation to your ML experimentation by enabling you to reliably build, test, and deploy your ML model code as part of a CI/CD workflow. Artifact Registry provides a centralized repository for you to store, manage, and secure your ML container images. This will allow you to securely share your ML work with others and reproduce experiment results.

**Note**: the initial build and submit step will take about 16 minutes but Cloud Build is able to take advantage of caching for faster subsequent builds.

####  5.1. Create Artifact Registry for custom container images

In [None]:
ARTIFACT_REGISTRY="fuel-regression"

In [None]:
!gcloud artifacts repositories create $ARTIFACT_REGISTRY --location=us-central1 --repository-format=docker

#### 5.2. Create `cloudbuild.yaml` instructions

In [None]:
IMAGE_NAME=f"fuel-regression"
IMAGE_TAG="latest"
IMAGE_URI=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{ARTIFACT_REGISTRY}/{IMAGE_NAME}:{IMAGE_TAG}"
IMAGE_URI

In [None]:
cloudbuild_yaml = f"""steps:
- name: 'gcr.io/cloud-builders/docker'
  args: [ 'build', '-t', '{IMAGE_URI}', '.' ]
images: 
- '{IMAGE_URI}'"""

with open(f"{MODEL_DIR}/cloudbuild.yaml", "w") as fp:
    fp.write(cloudbuild_yaml)

#### 5.3. Build and submit your container image to Artifact Registry using Cloud Build
In the terminal do
cd container and run below command

```shell
gcloud builds submit --region=us-central1 --config cloudbuild.yaml
```

------

## Vertex Pipelines Introduction

[Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) helps you to automate, monitor, and govern your ML systems by orchestrating your ML workflow in a serverless manner, and storing your workflow's artifacts using Vertex ML Metadata. By storing the artifacts of your ML workflow in Vertex ML Metadata, you can analyze the lineage of your workflow's artifacts — for example, an ML model's lineage may include the training data, hyperparameters, and code that were used to create the model.

You can build your Pipelines using the battle-tested and easy-to-use `KubeFlow Pipelines (KFP) SDK` or ` TensorFlow Extended (TFX) SDK`. 

Within your Vertex Pipeline with `KubeFlow Pipelines (KFP) SDK` you can use either your own custom components using `KubeFlow Components` or already-built compontents using `Google Cloud Pipeline Components`.

The Google Cloud Components are [documented here](https://google-cloud-pipeline-components.readthedocs.io/en/latest/google_cloud_pipeline_components.aiplatform.html#module-google_cloud_pipeline_components.aiplatform). 

The KubeFlow Compoenents are [documented here](https://www.kubeflow.org/docs/components/pipelines/v1/sdk-v2/python-function-components/)

<img src=img/vertex-pipelines-def.png width=80%>

-----


## Create a KFP Pipeline

To address your business requirements and get your higher performing model into production to deliver value faster, you will define a pipeline using the [**Kubeflow Pipelines (KFP) V2 SDK**](https://www.kubeflow.org/docs/components/pipelines/sdk/v2/v2-compatibility) to orchestrate the training and deployment of your model on [**Vertex Pipelines**](https://cloud.google.com/vertex-ai/docs/pipelines) below.

The pipeline consists of four `Google Cloud Custom Components`:

* `TabularDatasetCreateOp`[(documentation)](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.aiplatform.html#google_cloud_pipeline_components.aiplatform.TabularDatasetCreateOp): Creates a Tabular Managed Dataset in Vertex AI Datasets.

* `CustomContainerTrainingJobRunOp` [(documentation)](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.aiplatform.html#google_cloud_pipeline_components.aiplatform.CustomContainerTrainingJobRunOp): trains your custom model container using Vertex Training. This is the same as configuring a Vertex Custom Container Training Job using the Vertex Python SDK you covered in the Vertex AI: Qwik Start lab.

*  `EndpointCreateOp` [(documentation)](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.aiplatform.html#google_cloud_pipeline_components.aiplatform.EndpointCreateOp): Creates a Google Cloud Vertex Endpoint resource that maps physical machine resources with your model to enable it to serve online predictions. Online predictions have low latency requirements; providing resources to the model in advance reduces latency. 

* `ModelDeployOp`[(documentation)](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.aiplatform.html#google_cloud_pipeline_components.aiplatform.ModelDeployOp): deploys your model to a Vertex Prediction Endpoint for online predictions.

--------

### Compile the KFP Pipeline

#### Define Pipeline Parameters

In [None]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

USER = 'add-your-name-lowercase'
USER = 'gabriela'
if USER == 'add-your-name-lowercase':
    USER = 'unknown'


EXPERIMENT_NAME = "fuel-model-google-cloud-components"
EXPERIMENT_DESCRIPTION = "Fuel prediction pipeline usign Google Cloud Components and Custom Container"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root/{USER}"

## Important the pipeline name always has the timestamp as part of the name.
PIPELINE_NAME = f"{EXPERIMENT_NAME}-{TIMESTAMP}"
PIPELINE_PACKAGE_PATH = f'{PIPELINE_NAME}-path.json'


MODEL_NAME = EXPERIMENT_NAME
DEPLOY_ENDPOINT = "True"

MANAGED_DATASET_NAME = 'fuel_dataset'
MODEL_NAME = 'fuel-prediction'
BQ_DATA_DIR = BQ_DATASET_URI

ENDPOINT_DISPLAY_NAME = 'fuel-endpoint'
SERVING_CONTAINER_IMAGE_URI = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
ENDPOINT_MACHINE_TYPE = "n1-standard-4"

LABELS = {
    'creator': USER,
    'workflow': 'fuel-prediction',
    'type': 'regression'}


aip.init(
    project=PROJECT_ID,
    staging_bucket=BUCKET_NAME,
    experiment=EXPERIMENT_NAME,
    experiment_description="Fuel prediction pipeline")

In [None]:
# PIPELINE
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=PIPELINE_NAME
)
def pipeline(
    bq_dataset_uri: str,
    managed_dataset_name: str,
    container_uri: str,
    staging_bucket:str,
    bq_destination: str,
    deploy_endpoint:str,
    pipeline_name: str,
    endpoint_display_name: str,
    endpoint_machine_type:str,
    model_name: str,
    model_serving_container_image_uri: str,
    region: str,
    project_id: str
):


    # STEP 1: Create Managed Dataset
    
    create_dataset_op = gcc_aip.TabularDatasetCreateOp(
        display_name=managed_dataset_name,
        bq_source=bq_dataset_uri,
        project=project_id,
        location=region
    ).set_caching_options(True) \
        .set_display_name('create-managed-dataset-op')
    
    ## STEP 2: Create Training Job
    training_op = gcc_aip.CustomContainerTrainingJobRunOp(
        display_name="pipeline-fuel-custom-train",
        container_uri=container_uri,
        project=project_id,
        location=region,
        dataset=create_dataset_op.outputs["dataset"],
        staging_bucket=staging_bucket,
        training_fraction_split=0.7,
        validation_fraction_split=0.2,
        test_fraction_split=0.1,
        bigquery_destination=bq_destination,
        model_serving_container_image_uri=model_serving_container_image_uri,
        model_display_name="fuel-custom-model-pipeline",
        machine_type="n1-standard-4",
    )
    

    ## Step3: Decision: If model performs according to our threshold, then deploy model and Enp
    with dsl.Condition(
            deploy_endpoint == "true",
            name="deploy_decision",
        ):
        ## Step 4: Create Endpoint
        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project_id,
            location=region,
            display_name=endpoint_display_name,
        ).set_display_name('create-endpoint-op').set_caching_options(True)

        ## Step 5: Deploy Model To Endpoint
        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type=endpoint_machine_type
        ).set_display_name('deploy-model-op').set_caching_options(True)
    



In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path=PIPELINE_PACKAGE_PATH
)

--------------------

### Execute the KFP Pipeline using Vertex AI Pipelines

In [None]:
job = aip.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path=PIPELINE_PACKAGE_PATH,
    pipeline_root=PIPELINE_ROOT,
    enable_caching=True,
    labels=LABELS,
    parameter_values={
        'bq_dataset_uri': BQ_DATA_DIR,
        'managed_dataset_name': MANAGED_DATASET_NAME,
        'container_uri': IMAGE_URI,
        'staging_bucket': BUCKET_NAME,
        'bq_destination':f"bq://{PROJECT_ID}",
        'deploy_endpoint': DEPLOY_ENDPOINT,
        'pipeline_name': PIPELINE_NAME,
        'endpoint_display_name': ENDPOINT_DISPLAY_NAME,
        'endpoint_machine_type': ENDPOINT_MACHINE_TYPE,
        'model_name': MODEL_NAME,
        'model_serving_container_image_uri': SERVING_CONTAINER_IMAGE_URI,
        'project_id': PROJECT_ID,
        'region': REGION
    }
)

job.submit(experiment=EXPERIMENT_NAME)

------------

## Inspect Experiments

In [None]:
def get_experiments_data(
  experiment_name: str,
  project: str,
  location: str
):
    """
    Get experiments
    """
    aip.init(experiment=experiment_name, project=project, location=location)
    experiments_df = aip.get_experiment_df()
    return experiments_df

In [None]:
df = get_experiments_data(
    experiment_name=EXPERIMENT_NAME,
    project=PROJECT_ID,
    location=REGION
)

df

-----------

## Online Predictions with Deployed Endpoint

Retrieve the `Endpoint` deployed by the pipeline and use it to query your model for online predictions.

Configure the `Endpoint()` function below with the following parameters:

*  `endpoint_name`: A fully-qualified endpoint resource name or endpoint ID. Example: "projects/123/locations/us-central1/endpoints/456" or "456" when project and location are initialized or passed.
*  `project_id`: GCP project.
*  `location`: GCP region.

Call `predict()` to return a prediction for a test review.

In [None]:
ENDPOINT_ID = 'insert-your-endpoint-id'

In [None]:
endpoint = vertexai.Endpoint(ENDPOINT_ID)

In [None]:
prediction = endpoint.predict([4,90.0,75.0,2125.0,14.5,74,0,0,1])

In [None]:
prediction

----

## Batch Predictions with Created Model

In [None]:
## Create a fake batch file in Cloud Storage by randomly sampling our dataset
import pandas as pd
dataset = pd.read_csv(GCS_DATA_URI, header=None)
# Remove label
dataset = dataset.iloc[:,1:]

batch_data = dataset.sample(10)
batch_data.to_csv('data/batch_data_ex.csv', index=False)
batch_data.head()


In [None]:
## Upload data to Cloud Storage
from src.helper import upload_file_to_gcs

gcs_batch_input_data_path = upload_file_to_gcs(
    project_id=PROJECT_ID,
    target=BUCKET_NAME,
    source='data/batch_data_ex.csv',
    blob_name=f'data/batch_prediction/input_data/fuel_data_{TIMESTAMP}.csv')
gcs_batch_input_data_path

In [None]:
# Define batch job args
TIMESTAMP =datetime.now().strftime("%Y%m%d%H%M%S") 
batch_job_display_name = "fuel-batch-prediction-job"
gcs_batch_data = gcs_batch_input_data_path
instances_format = 'csv'
gcs_dest_results = f'gs://{BUCKET_NAME}/batch_jobs/output/{TIMESTAMP}/'
machine_type = "n1-standard-2"

In [None]:
## List all Models and pick the Model ID 
!gcloud ai models list --region=us-central1

In [None]:
MODEL_ID = 'insert-your-model-id'

In [None]:
model_resource_name = f'projects/{PROJECT_ID}/locations/{REGION}/models/{MODEL_ID}'
model_resource_name

In [None]:
# aiplatform.init(project=project, location=location)
model = aip.Model(model_resource_name)

In [None]:
batch_prediction_job = model.batch_predict(
        job_display_name=batch_job_display_name,
        instances_format='csv', #json
        gcs_source=[gcs_batch_data],
        gcs_destination_prefix=gcs_dest_results,
        machine_type=machine_type, # must be present      
    )

------

# IMPORTANT! CLEAN UP ALL RESOURCES CREATED