<a href="https://colab.research.google.com/github/deltorobarba/machinelearning/blob/master/kubeflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Kubeflow

In [0]:
# imports

In [0]:
%%capture

# Install the SDK (Uncomment the code if the SDK is not installed before)
!pip3 install --upgrade pip -q
!pip3 install kfp --upgrade -q
!pip3 install pandas --upgrade -q

In [0]:
import json

import kfp
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
import kfp.gcp as gcp

import pandas as pd

import time

In [0]:
# Pipeline
# constants

In [0]:
# Required Parameters
PROJECT_ID = 'lunar-demo'
GCS_WORKING_DIR = 'gs://lunar-demo-kubeflow-bucket/chicagocrime' # No ending slash

# Optional Parameters
REGION = 'us-central1'
RUNTIME_VERSION = '1.13'
PACKAGE_URIS=json.dumps(['gs://chicago-crime/chicago_crime_trainer-0.0.tar.gz'])
TRAINER_OUTPUT_GCS_PATH = GCS_WORKING_DIR + '/train/output/' + str(int(time.time())) + '/'
DATA_GCS_PATH = GCS_WORKING_DIR + '/reports.csv'
PYTHON_MODULE = 'trainer.task'
TRAINER_ARGS =  json.dumps([
    '--data-file-url', DATA_GCS_PATH,
    '--job-dir', GCS_WORKING_DIR
])
EXPERIMENT_NAME = 'Chicago Crime Prediction'
PIPELINE_NAME = 'Chicago Crime Prediction'
PIPELINE_FILENAME_PREFIX = 'chicago'
PIPELINE_DESCRIPTION =''

In [0]:
# Download Data
# Define a download function that uses the BigQuery component

In [0]:
bigquery_query_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/1f65a564d4d44fa5a0dc6c59929ca2211ebb3d1c/components/gcp/bigquery/query/component.yaml')

QUERY = """
    SELECT count(*) as count, TIMESTAMP_TRUNC(date, DAY) as day
    FROM `bigquery-public-data.chicago_crime.crime`
    GROUP BY day
    ORDER BY day
"""

def download(project_id, data_gcs_path):

    return bigquery_query_op(
        query=QUERY,
        project_id=project_id,
        output_gcs_path=data_gcs_path
    ).apply(
       gcp.use_gcp_secret('user-gcp-sa') 
    )

In [0]:
# Train the model
# Run training code that will pre-process the data and then submit a training job to AI Platform

In [0]:
mlengine_train_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/1f65a564d4d44fa5a0dc6c59929ca2211ebb3d1c/components/gcp/ml_engine/train/component.yaml')

def train(project_id,
          trainer_args,
          package_uris,
          trainer_output_gcs_path,
          gcs_working_dir,
          region,
          python_module,
          runtime_version):
        
    return mlengine_train_op(
        project_id=project_id, 
        python_module=python_module,
        package_uris=package_uris,
        region=region,
        args=trainer_args,
        job_dir=trainer_output_gcs_path,
        runtime_version=runtime_version
    ).apply(gcp.use_gcp_secret('user-gcp-sa'))

In [0]:
# Deploy model
# Deploy the model with the ID given from the training step

In [0]:
mlengine_deploy_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/1f65a564d4d44fa5a0dc6c59929ca2211ebb3d1c/components/gcp/ml_engine/deploy/component.yaml')

def deploy(
    project_id,
    model_uri,
    model_id,
    runtime_version):
    
    return mlengine_deploy_op(
        model_uri=model_uri,
        project_id=project_id, 
        model_id=model_id, 
        runtime_version=runtime_version, 
        replace_existing_version=True, 
        set_default=True).apply(gcp.use_gcp_secret('user-gcp-sa'))

In [0]:
# Define pipeline

In [0]:
@dsl.pipeline(
    name=PIPELINE_NAME,
    description=PIPELINE_DESCRIPTION
)

def pipeline(
    data_gcs_path=dsl.PipelineParam(name='data_gcs_path', value=DATA_GCS_PATH),
    gcs_working_dir=dsl.PipelineParam(name='gcs_working_dir', value=GCS_WORKING_DIR),
    project_id=dsl.PipelineParam(name='project_id', value=PROJECT_ID),
    python_module=dsl.PipelineParam(name='python_module', value=PYTHON_MODULE),
    region=dsl.PipelineParam(name='region', value=REGION),
    runtime_version=dsl.PipelineParam(name='runtime_version', value=RUNTIME_VERSION),
    package_uris=dsl.PipelineParam(name='package_uris', value=PACKAGE_URIS),
    trainer_output_gcs_path=dsl.PipelineParam(name='trainer_output_gcs_path', value=TRAINER_OUTPUT_GCS_PATH),
    trainer_args=dsl.PipelineParam(name='trainer_args', value=TRAINER_ARGS),
):      
    download_task = download(project_id,
                             data_gcs_path)

    train_task = train(project_id,
                       trainer_args,
                       package_uris,
                       trainer_output_gcs_path,
                       gcs_working_dir,
                       region,
                       python_module,
                       runtime_version).after(download_task)
    
    deploy_task = deploy(project_id,
                         train_task.outputs['job_dir'],
                         train_task.outputs['job_id'],
                         runtime_version)    
    return True

# Reference for invocation later
pipeline_func = pipeline

In [0]:
# compile pipeline

In [0]:
pipeline_filename = PIPELINE_FILENAME_PREFIX + '.pipeline.tar.gz'

compiler.Compiler().compile(pipeline_func, pipeline_filename)

In [0]:
# Submit the pipeline for execution

In [0]:
# Specify pipeline argument values
arguments = {}

# Get or create an experiment and submit a pipeline run
client = kfp.Client()
try:
    experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
except:
    experiment = client.create_experiment(EXPERIMENT_NAME)

# Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)