In [ ]:
import os

GOOGLE_APPLICATION_CREDENTIALS="./user-gcp-sa.json"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = GOOGLE_APPLICATION_CREDENTIALS

In [ ]:
#!pip3 install -r requirements.txt

In [ ]:
#!pip3 install --user google-cloud-automl==0.9.0

In [ ]:
#! pip3 install --user kfp --upgrade
#! pip3 install --user google-cloud-automl

In [ ]:
import kfp
client = kfp.Client(host='4709991555f6b0ae-dot-us-central1.notebooks.googleusercontent.com')

# AutoML Serverless ML (Taxi) on KFP

TODO: set GCS location to experiment run
TODO: set num passengers out of categorical
TODO: run on unsampled datset
TODO: set split column to use fingerprint
TODO: Difference between table and dataset?
TODO: try using a cleanup task with boolean in testing mode
TODO: does generatinge new stats do anything?

In [ ]:
# Configuration

PROJECT_ID = 'dhodun1'
COMPUTE_REGION = 'us-central1' # Currently us-central1 is only region
BUCKET = 'dhodun1-central1'

# Raw dataset, not cleaned
QUERY = '''
SELECT
  (tolls_amount + fare_amount) AS fare_amount,
  pickup_longitude AS pickuplon,
  pickup_latitude AS pickuplat,
  dropoff_longitude AS dropofflon,
  dropoff_latitude AS dropofflat,
  passenger_count*1.0 AS passengers
FROM `nyc-tlc.yellow.trips`
WHERE MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))), 100000) = 1
'''


In [ ]:
# AutoML Tables components
! git clone https://github.com/kubeflow/pipelines.git


In [ ]:
from kfp import components

component_store = components.ComponentStore(local_search_paths=['./pipelines/components'])

In [ ]:
automl_create_dataset_for_tables_op = component_store.load_component('gcp/automl/create_dataset_for_tables')
automl_import_data_from_bigquery_op = component_store.load_component('gcp/automl/import_data_from_bigquery')
automl_import_data_from_gcs_op = component_store.load_component('gcp/automl/import_data_from_gcs')
automl_create_model_for_tables_op = component_store.load_component('gcp/automl/create_model_for_tables')
prediction_service_batch_predict_op = component_store.load_component('gcp/automl/prediction_service_batch_predict')
automl_split_dataset_table_column_names_op = component_store.load_component('gcp/automl/split_dataset_table_column_names')

bigquery_query_op = component_store.load_component('gcp/bigquery/query')
#automl_create_dataset_for_tables_op = comp.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/b3179d86b239a08bf4884b50dbf3a9151da96d66/components/gcp/automl/create_dataset_for_tables/component.yaml')

In [ ]:
#automl_create_dataset_for_tables_op(PROJECT_ID, COMPUTE_REGION, 'taxi_data',)

## Create Op to change 'passengers' from categorical to numeric
Should be fixed with *1.0 in BQ Query, but related to this BQ CSV export bug: https://b.corp.google.com/issues/143356550

In [ ]:
dataset_path = 'projects/978546835329/locations/us-central1/datasets/TBL5712864505831096320'

In [ ]:
from typing import NamedTuple

def set_automl_tables_column_type(
    dataset_path: str,
    column_display_name: str,
    type_code: str,    
):
    # Updates AutuML Column with new column type, does trigger a new column statistics job? how do we check?
    import sys
    import subprocess
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'google-cloud-automl==0.9.0', '--quiet', '--no-warn-script-location'], env={'PIP_DISABLE_PIP_VERSION_CHECK': '1'}, check=True)
    
    from google.cloud import automl_v1beta1
    client = automl_v1beta1.TablesClient()
    
    dataset = client.get_dataset(dataset_name=dataset_path)
    
    column_specs_response = client.list_column_specs(dataset=dataset)
    column_specs = list(column_specs_response)
    
    for column in column_specs:
        if column.display_name == column_display_name:
            # This kicks off a new statistics job... how to check to see if it's done? Took ~ 1 minute this time
            response = client.update_column_spec(column_spec_name=column.name, dataset=dataset, type_code=type_code)
            print('Updated column: {} to type code {}. Generating new statistics now...'.format(column_display_name, type_code))

set_automl_tables_column_type_op = components.func_to_container_op(set_automl_tables_column_type, base_image='python:3.7')

In [ ]:
set_automl_tables_column_type(dataset_path=dataset_path, column_display_name='passengers', type_code='FLOAT64')

In [ ]:
# Define the pipeline

import kfp
from kfp import dsl

def serverless_automl(
    gcp_project_id: str,
    gcp_region: str,
    query: str,
    gcs_bucket: str,
    gcs_temp_directory: str = 'ml-taxi/{}'.format(dsl.RUN_ID_PLACEHOLDER),
    dataset_display_name: str = 'taxi_data',
    dataset_location: str = 'US',
    target_column_name: str = 'fare_amount',
    model_display_name: str = 'taxi_data_model',
    train_budget_milli_node_hours: 'Integer' = 1000,
):
    
    output_gcs_path='gs://{}/{}/bq_taxi_output.csv'.format(gcs_bucket, gcs_temp_directory)
    '''
    # Create dataset
    create_dataset_task = automl_create_dataset_for_tables_op(
        gcp_project_id=gcp_project_id,
        gcp_region=gcp_region,
        display_name=dataset_display_name,
    )
    
    # Query to clean dataset and dump to GCS
    bigquery_export_task = bigquery_query_op(
        query=query,
        project_id=gcp_project_id,
        output_gcs_path=output_gcs_path,
        dataset_location=dataset_location,
    )
    
    # Import data from GCS automl_import_data_from_gcs_op
    import_data_task = automl_import_data_from_gcs_op(
        dataset_path=create_dataset_task.outputs['dataset_path'],
        input_uris=[output_gcs_path],
    ).after(bigquery_export_task)
    '''
    # Prepare column schemas
    split_column_specs_task = automl_split_dataset_table_column_names_op(
        #dataset_path=import_data_task.outputs['dataset_path'],
        dataset_path='projects/978546835329/locations/us-central1/datasets/TBL5712864505831096320',
        table_index=0,
        target_column_name=target_column_name,        
    )
    
    # Train a model
    create_model_task = automl_create_model_for_tables_op(
        gcp_project_id=gcp_project_id,
        gcp_region=gcp_region,
        display_name=model_display_name,
        #dataset_id=create_dataset_task.outputs['dataset_id'],
        dataset_id='TBL5712864505831096320',
        target_column_path=split_column_specs_task.outputs['target_column_path'],
        #input_feature_column_paths=None, # All non-target columns will be used if None is passed
        input_feature_column_paths=split_column_specs_task.outputs['feature_column_paths'],
        optimization_objective='MINIMIZE_RMSE',
        train_budget_milli_node_hours=train_budget_milli_node_hours,
    )
    
    from kfp.gcp import use_gcp_secret
    kfp.dsl.get_pipeline_conf().add_op_transformer(use_gcp_secret('user-gcp-sa'))

In [ ]:
# Run the pipeline

from google.cloud import automl

#location_path = automl.AutoMlClient().location_path(PROJECT_ID, COMPUTE_REGION)

In [ ]:
client = kfp.Client(host='4709991555f6b0ae-dot-us-central1.notebooks.googleusercontent.com')

In [ ]:
from google.cloud import automl

#PROJECT_ID = 'dhodun1'
#COMPUTE_REGION = 'us-central1' # Currently us-central1 is only region

#location_path = automl.AutoMlClient().location_path(PROJECT_ID, COMPUTE_REGION)

client.create_run_from_pipeline_func(
    serverless_ml_taxi_pipeline,
    arguments=dict(
        gcp_project_id=PROJECT_ID,
        gcp_region=COMPUTE_REGION,
        query=QUERY,
        gcs_bucket=BUCKET,
        dataset_display_name='taxi_data',
    )
)

In [ ]:
!gcloud container clusters list

In [ ]:
!gcloud container clusters get-credentials kubeflow-marketplace-1 --zone us-central1-a

In [ ]:
dataset_path = 'projects/978546835329/locations/us-central1/datasets/TBL5712864505831096320'
client = automl.AutoMlClient()
list_table_specs_response = client.list_table_specs(dataset_path)
list_table_specs_response
table_specs = [s for s in list_table_specs_response]
print('table_specs=')
print(table_specs)
table_spec_name = table_specs[0].name

list_column_specs_response = client.list_column_specs(table_spec_name)
column_specs = [s for s in list_column_specs_response]
#client.get_column_spec('passengers')
for column in column_specs:
    if column.display_name == 'passengers':
        passenger_column = column.name

print('column_specs=')
#print(column_specs)

column = client.get_column_spec(passenger_column)
print(column)

#client = automl_v1beta1.TablesClient()

client.update_column_spec(column_spec_name=passenger_column, type_code='NUMERIC')

In [ ]:
column.name

In [ ]:
table_specs[]

In [ ]:
kfp.run_pipeline_func_on_cluster(
    serverless_ml_taxi_pipeline,
    arguments = dict(
        gcp_project_id=PROJECT_ID,
        gcp_region=COMPUTE_REGION,
        display_name='taxi_data'
    )
)