In [1]:
# !pip install --user --upgrade pip

In [2]:
# !pip install --user kfp

In [3]:
import kfp
import kfp.dsl as dsl
import kfp.gcp as gcp
from kfp import components
from kfp.dsl.types import GCSPath
import json
from string import Template
from jinja2 import Template as JinjaTemplate
from typing import NamedTuple
from train.helper_components import usedcars_model_training, evaluate_model, deploy_model

In [4]:
def convert_result(result) -> NamedTuple('Outputs', [
    ('n_estimators', int),
    ('min_samples_split', int),
    ('min_samples_leaf', int),
    ('max_features', str),
    ('max_depth', int),
]):
    import json
    hyperparameters = json.loads(result)
    res = {}
    args = []
    for param in hyperparameters:
        res[param['name']] = param['value']
    print("Hyperparameters for best score", res)
    
    n_estimators = int(res['--n-estimators'])
    min_samples_split = int(res['--min-samples-split'])
    min_samples_leaf = int(res['--min-samples-leaf'])
    max_features = str(res['--max-features'])
    max_depth = int(res['--max-depth'])
    
    return (n_estimators, min_samples_split, min_samples_leaf, max_features, max_depth)

In [5]:
TRAINING_FILE_PATH = 'datasets/training/data.csv'
VALIDATION_FILE_PATH = 'datasets/validation/data.csv'
TESTING_FILE_PATH = 'datasets/testing/data.csv'
KATIB_NAMESPACE = # katib namespace
SOURCE_TABLE = 'used_cars.vehicles_min'
PROJECT_ID = "hamoye-296618"
DATASET_ID = "used_cars"
TRAINING_IMAGE = "sklearn-usedcars-extratrees:latest"
RUNTIME_VERSION = "2.3"
PYTHON_VERSION = "3.7"
MODEL_ID = 'usedcars_price_regressor'
VERSION_ID = '1'
REPLACE_EXISTING_VERSION = 'True'
EVALUATION_METRIC_THRESHOLD = '8000'
EVALUATION_METRIC_NAME = 'rmse'

In [6]:
def generate_sampling_query(source_table_name, num_lots, lots):
    sampling_query_template = """
        SELECT 
        region,
        year,
        manufacturer,
        cylinders,
        fuel, odometer, title_status, transmission, drive, type, paint_color, state, price
        FROM
            `{{ source_table }}` AS vehicle
        WHERE
        MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(vehicle))), {{ num_lots }}) IN ({{ lots }})
        """

        
    query = JinjaTemplate(sampling_query_template).render(
        source_table=source_table_name,
        num_lots=num_lots,
        lots=str(lots)[1:-1]
    )

    return query

In [7]:
@dsl.pipeline(
    name="Used-Cars",
    description="A pipeline to train and serve the used cars price prediction model"
)

def used_cars_pipeline(
    name="used-cars-{{workflow.uid}}",
    katib_namespace=KATIB_NAMESPACE,
    goal=8000,
    source_table_name=SOURCE_TABLE,
    volume_mount='/mnt',
    training_image="gcr.io/{}/{}".format(PROJECT_ID, TRAINING_IMAGE),
    training_namespace="kubeflow",
    gcs_root="gs://used-cars",
    parallel_trial=10,
    max_trial=10,
    project_id=PROJECT_ID,
    dataset_id=DATASET_ID,
    model_id=MODEL_ID,
    version_id=VERSION_ID,
    dataset_location="US",
    python_version=PYTHON_VERSION,
    runtime_version=RUNTIME_VERSION,
    replace_existing_version=REPLACE_EXISTING_VERSION,
    evaluation_metric_name=EVALUATION_METRIC_NAME
    ):
    
    
    bigquery_train_op = kfp.components.load_component_from_file('./train/components/prebuilt/bqtrain.yml')
    bigquery_validation_op = kfp.components.load_component_from_file('./train/components/prebuilt/bqvalidation.yml')
    bigquery_test_op = kfp.components.load_component_from_file('./train/components/prebuilt/bqtest.yml')


    query = generate_sampling_query(
        source_table_name=source_table_name,
        num_lots=10,
        lots=[1, 2, 3, 4, 5]
    )

    training_file_path = f'{gcs_root}/{TRAINING_FILE_PATH}'

    create_training_split = bigquery_train_op(
        query=query,
        project_id=project_id,
        dataset_id=dataset_id,
        table_id='',
        output_gcs_path=training_file_path,
        dataset_location=dataset_location,
    )
    

    query = generate_sampling_query(
        source_table_name=source_table_name,
        num_lots=10,
        lots=[8]
    )

    validation_file_path = f'{gcs_root}/{VALIDATION_FILE_PATH}'
    

    create_validation_split = bigquery_validation_op(
        query=query,
        project_id=project_id,
        dataset_id=dataset_id,
        table_id='',
        output_gcs_path=validation_file_path,
        dataset_location=dataset_location,
    )
        
    
    query = generate_sampling_query(
        source_table_name=source_table_name,
        num_lots=10,
        lots=[9]
    )

    testing_file_path = f'{gcs_root}/{TESTING_FILE_PATH}'


    create_testing_split = bigquery_test_op(
        query=query,
        project_id=project_id,
        dataset_id=dataset_id,
        table_id='',
        output_gcs_path=testing_file_path,
        dataset_location=dataset_location,
    )
    
    
    
##################################KATIB SETTINGS #######################################################
    objectiveConfig = {
            "type": "minimize",
            "goal": goal,
            "objectiveMetricName": "rmse",
            "additionalMetricNames": ["r2_score"],
        }
    
    algorithmConfig = {"algorithmName": "random"}
    
    metricsCollectorSpec = {
        "collector": {
            "kind": "StdOut"
        }
    }
    
    parameters = [
        {"name": "--n-estimators", "parameterType": "int", "feasibleSpace": {"min": "100", "max":"500"}},
        {"name": "--min-samples-split", "parameterType": "int", "feasibleSpace": {"min": "2", "max":"10"}},
        {"name": "--min-samples-leaf", "parameterType": "int", "feasibleSpace": {"min": "1", "max":"10"}},
        {"name": "--max-features", "parameterType": "categorical", "feasibleSpace": {"list": ["auto", "sqrt", "log2"]}},
        {"name": "--max-depth", "parameterType": "int", "feasibleSpace": {"min": "10", "max":"50"}}
    ]
    
    rawTemplate = {
        "apiVersion": "batch/v1",
        "kind": "Job",
        "metadata": {
            "name": "{{.Trial}}",
            "namespace": "{{.NameSpace}}"
        },
        "spec": {
            "template": {
                "spec": {
                    "restartPolicy": "Never",
                    "containers": [
                        {
                            "name": "{{.Trial}}",
                            "image": str(training_image),
                            "imagePullPolicy": "Always",
                            "command": [
                                "python",
                                "model.py",
                                "--training-file-path={}".format(create_training_split.outputs['output_gcs_path']),
                                "--validation-file-path={}".format(create_validation_split.outputs['output_gcs_path']),
                                "{{- with .HyperParameters}}",
                                "{{- range .}}",
                                "{{.Name}}={{.Value}}",
                                "{{- end}}",
                                "{{- end}}"
                            ]
                        }
                    ]
                }
            }
        }
    }
    
    trialTemplate = {
        "goTemplate": {
            "rawTemplate": json.dumps(rawTemplate)
        }
    }
#######################################################################################################
    
    katib_launcher_op = components.load_component_from_file('./train/components/custom/hptuning.yml')
    
 
    katib_op = katib_launcher_op(
        experiment_name=name,
        experiment_namespace=katib_namespace,
        parallel_trial_count=parallel_trial,
        max_trial_count=max_trial,
        objective=str(objectiveConfig),
        algorithm=str(algorithmConfig),
        trial_template=str(trialTemplate),
        parameters=str(parameters),
        metrics_collector=str(metricsCollectorSpec),
        delete_finished_experiment=False
    ).after(create_training_split).after(create_validation_split)
    
    
    convert_op = components.func_to_container_op(convert_result)
    convert = convert_op(katib_op.output)
    

    train_op = components.func_to_container_op(usedcars_model_training, 
                                               base_image="gcr.io/deeplearning-platform-release/base-cpu")

    job_dir = '{}/{}'.format(gcs_root, 'jobdir')

    train = train_op(
        volume_mount=volume_mount,
        job_dir=job_dir,
        training_file_path=create_training_split.outputs['output_gcs_path'],
        validation_file_path=create_validation_split.outputs['output_gcs_path'],
        n_estimators=convert.outputs['n_estimators'],
        min_samples_split=convert.outputs['min_samples_split'],
        min_samples_leaf=convert.outputs['min_samples_leaf'],
        max_features=convert.outputs['max_features'],
        max_depth=convert.outputs['max_depth'],
    ).after(convert)
    
    evaluate_op = components.func_to_container_op(evaluate_model, 
                                               base_image="gcr.io/deeplearning-platform-release/base-cpu")
    
    evaluate = evaluate_op(
        dataset_path=create_testing_split.outputs['output_gcs_path'],
        model_path=train.outputs['job_dir'],
        metric_name=evaluation_metric_name
    ).after(train)


    deploy_op = components.func_to_container_op(deploy_model, base_image="gcr.io/deeplearning-platform-release/base-cpu" )

    deploy = deploy_op(
        model_uri=train.outputs['job_dir'].ignore_type(),
        project_id=project_id,
        model_id=model_id,
        version_id=version_id,
        runtime_version=RUNTIME_VERSION,
        python_version=PYTHON_VERSION,
        replace_existing_version=REPLACE_EXISTING_VERSION
    ).after(evaluate)

    deploy.name = "deploy model"

In [8]:
pipeline = kfp.Client().create_run_from_pipeline_func(used_cars_pipeline, arguments={})

