In [None]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.gcp import use_gcp_secret
from kfp.components import ComponentStore
from os import path
import json

In [None]:
cs = ComponentStore(local_search_paths=['.', '{{config.output_package}}'],
                    url_search_prefixes=['{{config.github_component_url}}'])

In [None]:
pre_process_op = cs.load_component('{{config.preprocess.component}}')
hpt_op = cs.load_component('hptune')
param_comp = cs.load_component('get_tuned_params')
train_op = cs.load_component('{{config.train.component}}')
deploy_op = cs.load_component('{{config.deploy.component}}')


In [None]:
@dsl.pipeline(
    name='KFP-Pipelines Example',
    description='Kubeflow pipeline generated from ai-pipeline asset'
)
def pipeline_sample(
   project_id='{{config.project_id}}',
   region = '{{config.region}}',
    python_module = '{{config.train.python_module}}',
    package_uri = '{{config.train.python_package}}',
    dataset_bucket = '{{config.bucket_id}}',
    staging_bucket = 'gs://{{config.bucket_id}}',
    job_dir_hptune = 'gs://{{config.bucket_id}}/hptune',
    job_dir_train = 'gs://{{config.bucket_id}}/train',
    runtime_version_train = '{{config.runtime_version}}',
    runtime_version_deploy = '{{config.runtime_version}}',
    hptune_config='{{config.hptune.config}}',
    model_id='{{config.deploy.model_id}}',
    version_id='{{config.deploy.version_id}}',
    common_args_hpt=json.dumps([
        {% for arg in config.hptune.args %}          {% set name = arg.name %}          {% set value = arg.default %}                    '--{{name}}', '{{value}}' ,
        {% endfor %}        ]),
    common_args_train=json.dumps([
        {% for arg in config.train.args %}          {% set name =  arg.name %}          {% set value = arg.default%}                    '--{{name}}', '{{value}}' ,
        {% endfor %}        ]),
    replace_existing_version=True
):

    #Preprocess Task
    pre_process_task = pre_process_op(
        {% for arg in config.preprocess.component_args %}
          {% set name = arg.name %}
        {{name}}={{name}},
        {% endfor %}
     )

    # HP tune Task
    hpt_task = hpt_op (
         region = region,
         python_module = python_module,
         package_uri = package_uri,
         staging_bucket = staging_bucket,
         job_dir = job_dir_hptune,
         config=hptune_config,
         runtime_version = runtime_version_train,
         args = common_args_hpt ,
    )
    hpt_task.after(pre_process_task)

    # Get the best hyperparameters
    param_task = param_comp (
        project_id=project_id,
        hptune_job_id=hpt_task.outputs['job_id'].to_struct(),
        common_args=common_args_train,
    )

    # Train Task
    train_task = train_op (
        project_id = project_id,
        python_module = python_module,
        package_uris = json.dumps([package_uri.to_struct()]),
        region = region,
        args = str(param_task.outputs['tuned_parameters_out']) ,
        job_dir = job_dir_train,
        python_version = '',
        runtime_version = runtime_version_train,
        master_image_uri = '',
        worker_image_uri = '',
        training_input = '',
        job_id_prefix = '',
        wait_interval = '30'
    )

         #model_uri=train_task.outputs['job_dir'],
         #model_uri='gs://poc-bucket-0120/train/out/export/exporter',
    deploy_model = deploy_op(
         model_uri=train_task.outputs['job_dir'].to_struct()+'{{config.train.model_out_prefix}}',
         project_id=project_id,
         model_id=model_id,
         version_id=version_id,
         runtime_version=runtime_version_deploy,
         replace_existing_version=replace_existing_version
    )
    kfp.dsl.get_pipeline_conf().add_op_transformer(use_gcp_secret('user-gcp-sa'))




In [None]:
client = kfp.Client(host='{{config.kfp_deployment_url}}')

client.create_run_from_pipeline_func(pipeline_sample, arguments={})