# An Introduction to Kubeflow Pipelines SDK

## Imports

I like to put all my imports at the top of the notebook.

In [1]:
import os
import time

import kfp
from kfp import dsl
from kfp import compiler

from random import SystemRandom
from string import ascii_lowercase as lc

rand = SystemRandom()


def upload_pipeline(client: kfp.Client, metadata: dict, pipeline_function):
    
    compiler.Compiler().compile(
        pipeline_function,
        metadata.get("pipeline_package_path"))

    return client.upload_pipeline(
        metadata.get("pipeline_package_path"),
        metadata.get("pipeline_name"))


def random_string():
    return ''.join(rand.choice(lc) for _ in range(4))


def experiment_metadata(
    namespace: str,
    experiment_name: str,
    experiment_description: str,
    pipeline_name: str,
    pipeline_description: str
):
    """Create Metadata for Kubeflow Pipeline Experiment."""

    _namespace = namespace.lower().replace(" ", "-")
    _experiment_name = f"{namespace}-{experiment_name}".lower().replace(" ", "-")
    _experiment_description = experiment_description
    _pipeline_name = f"{_experiment_name}-{pipeline_name}-{random_string()}".lower().replace(" ", "-")
    _pipeline_description = pipeline_description
    _run_name = f"{time.strftime('%Y%m%d-%H%M%S')}-{_pipeline_name}"
    _pipeline_package_path = f"{_run_name}.yaml.zip"

    print("--------------------------")
    print("Metadata")
    print("--------------------------")
    print("Namespace")
    print(f"Name:\t\t{_namespace}")
    print("--------------------------")
    print("Experiment")
    print(f"Name:\t\t{_experiment_name}")
    print(f"Description:\t{_experiment_description}")
    print("--------------------------")
    print("Pipeline")
    print(f"Name:\t\t{_pipeline_name}")
    print(f"Description:\t{_pipeline_description}")
    print(f"Zipped YAML:\t{_pipeline_package_path}")
    print("--------------------------")
    print("Run")
    print(f"Name:\t\t{_run_name}")
    print("--------------------------")

    return {
        "namespace": _namespace,
        "experiment_name": _experiment_name,
        "experiment_description": _experiment_description,
        "pipeline_name": _pipeline_name,
        "pipeline_description": _pipeline_description,
        "run_name": _run_name,
        "pipeline_package_path": _pipeline_package_path
    }

## Metadata

Fill out the metadata for the run, pipeline and experiment!

1. `namespace`: Your namespace.
1. `experiment_name`: Your pipelines are run in an experiment. Give your experiment a unique and descriptive name.
1. `experiment_description`: You should provide a short description, it will be a gift to your future self.
1. `pipeline_name`: Name your pipeline. Must be unique. Try to be descriptive.
1. `pipeline_description`: The more metadata the better!
1. `pipeline_package_path`: This is the location of the zipped YAML containing the description of the pipeline.
1. `run_name`: The run's name is automatically generated by concatenating the `experiment_name`, `pipeline_name` and today's time/date.

In [2]:
# Fill in the following 5 metadata fields:
namespace = "bryanpaget"

experiment_name = "Introduction to Kubeflow Pipelines Python SDK!"
experiment_description = "The Kubeflow Pipelines SDK provides a set of Python packages that you can use to specify and run your machine learning (ML) workflow as a pipeline."

pipeline_name = "Simple Pipeline"
pipeline_description = "Just an Example Pipeline."

# -------------------------------------------------

# Metadata is created here:
metadata = experiment_metadata(
    namespace, experiment_name, experiment_description,
    pipeline_name, pipeline_description)

--------------------------
Metadata
--------------------------
Namespace
Name:		bryanpaget
--------------------------
Experiment
Name:		bryanpaget-introduction-to-kubeflow-pipelines-python-sdk!
Description:	The Kubeflow Pipelines SDK provides a set of
Python packages that you can use to specify and run your machine learning
(ML) workflow as a pipeline. Version 2 of the SDK adds support for tracking
pipeline runs and artifacts using ML Metadata. Starting with Kubeflow Pipelines
1.6, you can build and run pipelines in v2 compatibility mode.
--------------------------
Pipeline
Name:		bryanpaget-introduction-to-kubeflow-pipelines-python-sdk!-simple-pipeline-nktb
Description:	Just an Example Pipeline.
Zipped YAML:	20220510-144247-bryanpaget-introduction-to-kubeflow-pipelines-python-sdk!-simple-pipeline-nktb.yaml.zip
--------------------------
Run
Name:		20220510-144247-bryanpaget-introduction-to-kubeflow-pipelines-python-sdk!-simple-pipeline-nktb
--------------------------


## Pipeline Parameters

This is where you populate a dictionary with your pipeline's parameters. For this simple example we just need a dictionary of 5 integers.

In [3]:
pipeline_parameters = {
    'a': 5,
    'b': 5,
    'c': 8,
    'd': 10,
    'e': 18
}

## Function Operator

This is a simple operator for Kubeflow. For the next demo I'll do something more interesting. In the mean time here is the documentation on writing your own components. https://www.kubeflow.org/docs/components/pipelines/sdk/component-development/#writing-your-component-definition-file


In [4]:
def average_op(*numbers):
    """
    Factory for average ContainerOps: accepts an arbitrary number of input numbers,
    returning a ContainerOp that passes those numbers to the underlying Docker image
    for averaging.

    For dsl.ContainerOp:

        name (String): What will show up on the pipeline viewer.
        image (String): The container image that KFP runs to do the work.
        command (List): Put the commands for the container here.
        arguments (Dictionary): Passes each number as a separate command line argument.
                                Note that these arguments get serialized to strings
        file_outputs (Dictionary): Expect an output file called out.txt to be
                                   generated KFP can read this file and bring it back automatically

    Returns: output collected from ./out.txt from inside the container

    """

    if len(numbers) < 1:
        raise ValueError("You must specify at least one number to average.")

    return dsl.ContainerOp(
        name="average",
        image="k8scc01covidacr.azurecr.io/kfp-components/average:v1",
        command=["python", "average.py"],
        arguments=numbers,
        file_outputs={'data': './out.txt'}
    )

## Pipeline

This is where the pipeline is created using the `@dsl.pipeline` decorator.

In [5]:
@dsl.pipeline(name=pipeline_name, description=pipeline_description)
def pipeline(a, b, c, d, e):

    avg_1 = average_op(a, b)
    
    avg_2 = average_op(d, e)
        
    avg_3 = average_op(avg_1.output, avg_1.output)

    average_result_overall = average_op(c, avg_3.output)
    
    print(average_result_overall)

## Publish Pipeline and Run Pipeline in an Experiment

The experiment is created once a connection is established to the KFP client. The pipeline is compiled and then run inside the experiment.

In [6]:
client = kfp.Client()

response = upload_pipeline(client, metadata, pipeline)

try:
    experiment = client.get_experiment(
        name=experiment_name,
        description=experiment_description,
        namespace=namespace)
except:
    experiment = client.create_experiment(
        name=experiment_name,
        description=experiment_description,
        namespace=namespace)

run = client.run_pipeline(
    experiment_id=experiment.id,
    job_name=metadata.get("run_name"),
    pipeline_package_path=metadata.get("pipeline_package_path"))

{'ContainerOp': {'is_exit_handler': False, 'human_name': 'average', 'display_name': None, 'name': 'average-4', 'node_selector': {}, 'volumes': [], 'tolerations': [], 'affinity': {}, 'pod_annotations': {}, 'pod_labels': {}, 'num_retries': 0, 'retry_policy': None, 'backoff_factor': None, 'backoff_duration': None, 'backoff_max_duration': None, 'timeout': 0, 'init_containers': [], 'sidecars': [], 'loop_args': None, '_component_spec_inputs_with_pipeline_params': [], '_inputs': [], 'dependent_names': [], 'enable_caching': True, 'attrs_with_pipelineparams': ['node_selector', 'volumes', 'pod_annotations', 'pod_labels', 'num_retries', 'init_containers', 'sidecars', 'tolerations', '_container', 'artifact_arguments', '_parameter_arguments'], '_is_v2': False, '_container': {'args': ['{{pipelineparam:op=;name=c}}',
          '{{pipelineparam:op=average-3;name=data}}'],
 'command': ['python', 'average.py'],
 'env': None,
 'env_from': None,
 'image': 'k8scc01covidacr.azurecr.io/kfp-components/average



ApiException: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'content-type': 'application/json', 'date': 'Tue, 10 May 2022 14:42:48 GMT', 'content-length': '790', 'x-envoy-upstream-service-time': '64', 'server': 'envoy'})
HTTP response body: {"error":"Create experiment failed.: InternalServerError: Failed to add experiment to experiment table: Error 1406: Data too long for column 'Description' at row 1: Error 1406: Data too long for column 'Description' at row 1","code":13,"message":"Create experiment failed.: InternalServerError: Failed to add experiment to experiment table: Error 1406: Data too long for column 'Description' at row 1: Error 1406: Data too long for column 'Description' at row 1","details":[{"@type":"type.googleapis.com/api.Error","error_message":"Internal Server Error","error_details":"Create experiment failed.: InternalServerError: Failed to add experiment to experiment table: Error 1406: Data too long for column 'Description' at row 1: Error 1406: Data too long for column 'Description' at row 1"}]}
