In [None]:
import json
import yaml

from kfp import dsl
import kfp.compiler as compiler
from kfp.components import load_component_from_url, load_component_from_file

In [None]:
%config IPCompleter.greedy=True

# Parameters

Next cell handled via papermill parameters

In [None]:
param_file = None
container_tag = None
output_pipeline_filename = "training_pipeline.tar.gz"

github_event_ref = None
pr_num = None

# For debugging until multi-tenant working
minio_url = "http://minimal-tenant1-minio.minio:9000"
minio_access_key = "profile-andrew-scribner-7af84d76-cdd1-6bb6-ea17-bf2f35872d03"  # noqa: E501
minio_secret_key = "GYh1XnB5mCmCgdstPeNTcQxn"

Defaults for debugging/running locally

(Will only fire if not overridden by papermill in the parameters cell)

In [None]:
# Indicator to say we used all default values (which probably means we're
# debugging)
if param_file is None \
   and container_tag is None \
   and github_event_ref is None \
   and pr_num is None:
    debugging = True
else:
    debugging = False

In [None]:
# some values for debugging
github_event_ref = "refs/pull/22/merge" if github_event_ref is None else github_event_ref  # noqa: E501
pr_num = 22 if pr_num is None else pr_num
param_file = "params.yml" if param_file is None else param_file
container_tag = "7a8078420cb7ace889500f8864d4eec0e0114e53" if container_tag is None else container_tag  # noqa: E501

In [None]:
with open(param_file, 'r') as fin:
    params = yaml.safe_load(fin)

svm_c = params['svm_c']
svm_gamma = params['svm_gamma']

In [None]:
print(f"Running with github_event_ref = {github_event_ref}")
print(f"Running with pr_num = {pr_num}")
print(f"Running with param_file = {param_file}")
print(f"Running with svm_c = {svm_c}")
print(f"Running with svm_gamma = {svm_gamma}")
print(f"Running with container_tag = {container_tag}")

# Fixed settings

In [None]:
callback_url = 'kubemlopsbot-svc.kubeflow.svc.cluster.local:8080'

TRAIN_START_EVENT = "Training Started"
TRAIN_FINISH_EVENT = "Training Finished"

# Helpers

In [None]:
def task_use_image(image_name):
    """
    Helper to update the image used by an existing kfp task

    From https://github.com/kaizentm/kubemlops/blob/master/code/utils/kfp_helper.py

    Usage:
      my_task = task_op_factory()
      my_task.apply(use_image(my_image_name))
    """
    def _use_image(task):
        task.image = image_name
        return task
    return _use_image


def component_use_image(component, image_name):
    """
    Helper to update the image used by a component (eg: task factory)

    Note that this edits component in place (and copying or deepcopying
    does not prevent that).

    Usage:
      task_op_factory = load_component_from_url(...)
      task_op_factory = component_use_image(component, image_name)
    """
    implementation = component.component_spec.implementation.to_dict()
    implementation['container']['image'] = image_name
    component.component_spec.implementation = \
        component.component_spec.implementation.from_dict(implementation)


def load_component_from_file_and_pin_version(component_filename,
                                             image_repo_name, image_name, tag):
    """
    Returns the given component but updated to use image repo_name/image_name:tag
    """
    component = load_component_from_file(component_filename)
    component_image_name_untagged = f"{image_repo_name}/{image_name}"
    component_image_name = f"{component_image_name_untagged.rstrip(':')}" \
                           f":{tag.lstrip(':')}"
    component_use_image(component, component_image_name)
    return component


def get_callback_payload(event_type, github_event_ref, pr_num):
    payload = {}
    payload['event_type'] = event_type
    payload['sha'] = github_event_ref
    payload['pr_num'] = pr_num
    payload['run_id'] = dsl.RUN_ID_PLACEHOLDER
    if (event_type == TRAIN_FINISH_EVENT):
        payload['status'] = '{{workflow.status}}'
    return json.dumps(payload)

# Components

In [None]:
# TODO: These should be pulled from a more long-term home of
# reusable components
copy_to_minio_op = load_component_from_url('https://raw.githubusercontent.com/StatCan/kubeflow-mlops/scribner-iowa-pipeline/components/copy_to_minio.yaml')  # noqa: E501
copy_from_minio_op = load_component_from_url('https://raw.githubusercontent.com/StatCan/kubeflow-mlops/scribner-iowa-pipeline/components/copy_from_minio.yaml')  # noqa: E501

In [None]:
# DEBUGGING ONLY.  THIS JUST GIVES ME AUTO COMPLETES IN MY NOTEBOOK
# DELETE THIS CELL
train_component = load_component_from_file("../../containers/iowa-train/component.yaml")  # noqa: E501
score_component = load_component_from_file("../../containers/iowa-score/component.yaml")  # noqa: E501

In [None]:
def get_finish_notification_op(github_event_ref, pr_num):
    return dsl.ContainerOp(
        name='Notify on finish',
        image="curlimages/curl",
        command=['curl'],
        arguments=[
            '-d', get_callback_payload(TRAIN_FINISH_EVENT, github_event_ref,
                                       pr_num),
            callback_url
        ]
    )


def get_start_notification_op(github_event_ref, pr_num):
    return dsl.ContainerOp(
        name='Notify on start',
        image="curlimages/curl",
        command=['curl'],
        arguments=[
            '-d', get_callback_payload(TRAIN_START_EVENT, github_event_ref,
                                       pr_num),
            callback_url
        ]
    )


def get_report_metrics_op(github_event_ref, pr_num, metrics):
    # Use the CI's generic event handling to write a custom metric description
    # TODO: Formalize this in the event_dispatcher.py?
    event_md = f"Metrics from scoring for runid {dsl.RUN_ID_PLACEHOLDER}:"
    for k, v in metrics.items():
        event_md = event_md + f"\n* {k}: {v}"

    return dsl.ContainerOp(
        name='Report metrics',
        image="curlimages/curl",
        command=['curl'],
        arguments=[
            '-d', get_callback_payload(event_md, github_event_ref,
                                       pr_num),
            callback_url
        ]
    )

# Pipeline

In [None]:
def build_pipeline(svm_c: float, svm_gamma: float,
                   container_tag: str = "latest", github_event_ref: str = None,
                   pr_num: int = None):
    """
    Returns a training pipeline pinned to containers and model params

    This pattern makes sense if you're using CI to produce a pipeline that
    will recreate the same trained model when fed the same data.  The
    resultant pipeline definition will clearly show the parameters/containers
    used. If doing a hyperparameter search with this pattern, the search must
    be done through the CI system (eg: make 10 branches, edit params on
    each branch, commit each branch, they all build independent containers and
    pipelines, then all run and report their results).

    For hyperparameter searches, a more efficient pattern might be to make the
    training pipeine accept hyperparameters as well as dataset.  But if we do
    this, it means the same pipeline can later be used with different args so
    maybe CD workflows are harder to trace?  Would need to try it out.
    """
    # TODO: Add default minio creds?
    image_repo_name = "k8scc01covidmlopsacr.azurecr.io/mlops"

    # Set up any components that require version pinning

    # Use train/score from a template component.yaml held elsewhere.
    # Alternatively we could define the component by:
    # train_component = dsl.ContainerOp(
#             "train",
#             image=...,
#             ...
#         )

    train_component = load_component_from_file_and_pin_version(
        component_filename="../../containers/iowa-train/component.yaml",
        image_repo_name=image_repo_name,
        image_name="iowa-train",
        tag=container_tag
    )

    score_component = load_component_from_file_and_pin_version(
        component_filename="../../containers/iowa-score/component.yaml",
        image_repo_name=image_repo_name,
        image_name="iowa-score",
        tag=container_tag
    )

    # Define the pipeline
    @dsl.pipeline(
        name="Iowa Liquor Training Pipeline",
        description="Trains a pipeline to classify liquor based on its name using a specific model and hyperparameters"  # noqa: E501
    )
    def training_pipeline(
        data_train: str,
        data_test: str,
        # TODO: Handle these automatically once multitenancy is available
        minio_url: str,
        minio_access_key: str,
        minio_secret_key: str,
    ):
        """
        Pipeline for training the Iowa liquor categorization pipeline
        """

        operations = {}

        exit_callback = get_finish_notification_op(github_event_ref=github_event_ref,  # noqa: E501
                                                   pr_num=pr_num)
        with dsl.ExitHandler(exit_callback):

            start_callback = get_start_notification_op(github_event_ref=github_event_ref,  # noqa: E501
                                                       pr_num=pr_num)

            # TODO: Temporarily using .ignore_type()
            operations['get training data'] = copy_from_minio_op(
                minio_url=minio_url.ignore_type(),
                minio_access_key=minio_access_key.ignore_type(),
                minio_secret_key=minio_secret_key.ignore_type(),
                minio_source=data_train.ignore_type(),
            ).set_display_name("get training data").after(start_callback)

            operations['get scoring data'] = copy_from_minio_op(
                minio_url=minio_url.ignore_type(),
                minio_access_key=minio_access_key.ignore_type(),
                minio_secret_key=minio_secret_key.ignore_type(),
                minio_source=data_test.ignore_type(),
            ).set_display_name("get scoring data").after(start_callback)

            operations['train'] = train_component(
                data_train=operations['get training data'].output,
                svm_gamma=svm_gamma,
                svm_c=svm_c
            )

            operations['score'] = score_component(
                data=operations['get scoring data'].output,
                model=operations['train'].outputs['model'],
            )

            metrics = {
                'f1': operations['score'].outputs['f1']
            }
            operations['report_score'] = get_report_metrics_op(
                github_event_ref=github_event_ref,
                pr_num=pr_num,
                metrics=metrics,
            )

    return training_pipeline

In [None]:
pipeline = build_pipeline(container_tag=container_tag,
                          svm_c=svm_c,
                          svm_gamma=svm_gamma,
                          github_event_ref=github_event_ref,
                          pr_num=pr_num
                          )

In [None]:
compiler = compiler.Compiler().compile(pipeline_func=pipeline,
                                       package_path=output_pipeline_filename,
                                       # TEMP WHILE I FIGURE OUT HOW TO USE
                                       # TYPES FOR MINIO URL
                                       # handled atm above by .ignore_type()
                                       # type_check=False,
                                       )

# For debugging in notebook

Use below code to submit the pipeline from here

In [None]:
# # Uses debugging guard to avoid accidentally deploying ipynb with debug code
# # in production.  Not foolproof, but better than nothing...  Should comment
# # this block out before using in production just in case
# if debugging:
#     from utilities import get_minio_credentials

#     # Get minio credentials using a helper
#     minio_tenant = 'minimal'
#     minio_settings = get_minio_credentials(minio_tenant, strip_http=False)
#     minio_url = minio_settings["url"]
#     minio_access_key = minio_settings["access_key"]
#     minio_secret_key = minio_settings["secret_key"]

#     arguments = {
#         'data_train': 'andrew-scribner/iowa/processed/train/2020-08-13_18:02:01_train.csv',  # noqa: E501
#         'data_test': 'andrew-scribner/iowa/processed/test/2020-08-13_18:02:01_test.csv',  # noqa: E501
#         'minio_url': minio_url,
#         'minio_access_key': minio_access_key,
#         'minio_secret_key': minio_secret_key,
#     }

#     import kfp
#     client = kfp.Client()
#     run = client.create_run_from_pipeline_package(
#         pipeline_file=output_pipeline_filename,
#         arguments=arguments,
#     )