# Kubeflow Pipelines

Kubeflow pipelines are one of the most important features of Kubeflow and promise to make your AI experiments reproducible, composable, i.e. made of interchangeable components, scalable and easily shareable.

 

A pipeline is a codified representation of a machine learning workflow, analogous to the sequence of steps described in the first image, which includes components of the workflow and their respective dependencies. More specifically, a pipeline is a directed acyclic graph (DAG) with a containerized process on each node, which runs on top of argo.

Each pipeline component, represented as a block, is a self-contained piece of code, packaged as a Docker image. It contains inputs (arguments) and outputs and performs one step in the pipeline. In the example pipeline, above, the transform_data step requires arguments that are produced as an output of the extract_data and of the generate_schema steps, and its outputs are dependencies for train_model. 

 

Your ML code is wrapped into components, where you can:



 - Specify parameters – which become available to edit in the dashboard and configurable for every run.



 - Attach persistent volumes – without adding persistent volumes, we would lose all the data if our notebook was terminated for any reason. 



 - Specify artifacts to be generated – graphs, tables, selected images, models – which end up conveniently stored on the Artifact Store, inside the Kubeflow dashboard.

## Example

### Install Libraries

In [None]:
!pip install kfp

In [5]:
import kfp

from kfp import dsl
from kfp.components import func_to_container_op
from kfp.v2.dsl import (
    component,
    Output,
    ClassificationMetrics,
    Metrics,
    HTML,
    Markdown
)
from typing import NamedTuple

### Functions:

In [103]:
def add(a: float, b: float) -> NamedTuple('outputs', [('sum', float)]):
  "Calculates sum of two arguments"
  return [a + b]

In [104]:
def multiply(a: float, b: float) -> NamedTuple('outputs', [('multiply', float)]):
  "Calculates multiplication of two arguments"
  return [a * b]

In [105]:
def for_loop(start: int, end: int) -> NamedTuple('outputs', [('range', list)]):
    _range: list = [i for i in range(start, end, 1)]
    return [_range]

### Simple Kubeflow Pipeline: 

In [58]:
@dsl.pipeline(
   name='Simple Pipeline',
   description=''
)
def kf_pipeline():
    sum_func=kfp.components.create_component_from_func(add,
                                                       base_image="python:3.8",
                                                       #packages_to_install=[]
                                                      )
    task_1=sum_func(a=1, b=1)
    multiply_func=kfp.components.create_component_from_func(multiply,
                                                            base_image="python:3.8",
                                                            #packages_to_install=[]
                                                           )
    task_2=multiply_func(a=task_1.output, b=task_1.output)
kfp.compiler.Compiler().compile(kf_pipeline, 'kf_pipeline.tar.zip')
kfp_client = kfp.Client()
kfp_client.create_experiment(name="simple")
kfp_client.create_run_from_pipeline_func(kf_pipeline,
                                         experiment_name='simple', 
                                         run_name='simple_run', 
                                         arguments={}
                                        )

### Conditional Pipeline:

In [80]:
@dsl.pipeline(
   name='Conditional Pipeline',
   description=''
)
def kf_pipeline():
    sum_func=kfp.components.create_component_from_func(add,
                                                       base_image="python:3.8",
                                                       #packages_to_install=[]
                                                      )
    task_1=sum_func(a=1, b=1)
    task_1.set_display_name('Task 1: Add')
    multiply_func=kfp.components.create_component_from_func(multiply,
                                                            base_image="python:3.8",
                                                            #packages_to_install=[]
                                                           )
    with dsl.Condition(task_1.output=="2.0"):
        task_2=multiply_func(a=task_1.output, b=task_1.output)
        task_2.set_display_name('Task 2: Multiply')
    with dsl.Condition(task_1.output!="2.0"):
        task_2=multiply_func(a=1, b=1)
        task_2.set_display_name('Task 2: Multiply')
kfp.compiler.Compiler().compile(kf_pipeline, 'kf_pipeline_conditional.tar.zip')
kfp_client = kfp.Client()
kfp_client.create_experiment(name="conditional")
kfp_client.create_run_from_pipeline_func(kf_pipeline,
                                         experiment_name='conditional', 
                                         run_name='conditional_run', 
                                         arguments={}
                                        )

### Parallel Pipeline:

In [107]:
@dsl.pipeline(
    name='Parallel Pipeline',
    description='')
def kf_pipeline():
    loop_func=kfp.components.create_component_from_func(for_loop,
                                                        base_image="python:3.8",
                                                        #packages_to_install=[]
                                                       )
    task_1=loop_func(start=1, end=3)
    task_1.set_display_name('Task 1: Create List')
    multiply_func=kfp.components.create_component_from_func(multiply,
                                                            base_image="python:3.8",
                                                            #packages_to_install=[]
                                                           )
    with dsl.ParallelFor(task_1.output) as item:
        task_2=multiply_func(a=item, b=item)
        task_2.set_display_name('Task 2: Multiply')
    sum_func=kfp.components.create_component_from_func(add,
                                                       base_image="python:3.8",
                                                       #packages_to_install=[]
                                                      )
    task_3=sum_func(a=1, b=1).after(task_2)
    task_3.set_display_name('Task 3: Add')
kfp.compiler.Compiler().compile(kf_pipeline, 'kf_pipeline_parallel.tar.zip')
kfp_client = kfp.Client()
kfp_client.create_experiment(name="parallel")
kfp_client.create_run_from_pipeline_func(kf_pipeline,
                                         experiment_name='parallel', 
                                         run_name='parallel_run', 
                                         arguments={}
                                        )

RunPipelineResult(run_id=7606a1a2-33ba-4b23-b3e6-4f8b104ec891)

### Pre-defined Pipeline Components:

In [108]:
chicago_taxi_dataset_op = kfp.components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e3337b8bdcd63636934954e592d4b32c95b49129/components/datasets/Chicago%20Taxi/component.yaml')
convert_csv_to_apache_parquet_op = kfp.components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/0d7d6f41c92bdc05c2825232afe2b47e5cb6c4b3/components/_converters/ApacheParquet/from_CSV/component.yaml')
xgboost_train_on_csv_op = kfp.components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/567c04c51ff00a1ee525b3458425b17adbe3df61/components/XGBoost/Train/component.yaml')
xgboost_predict_on_csv_op = kfp.components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/31939086d66d633732f75300ce69eb60e9fb0269/components/XGBoost/Predict/component.yaml')
xgboost_train_on_parquet_op = kfp.components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/0ae2f30ff24beeef1c64cc7c434f1f652c065192/components/XGBoost/Train/from_ApacheParquet/component.yaml')
xgboost_predict_on_parquet_op = kfp.components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/31939086d66d633732f75300ce69eb60e9fb0269/components/XGBoost/Predict/from_ApacheParquet/component.yaml')

@kfp.dsl.pipeline(name='xgboost')
def xgboost_pipeline():
    training_data_csv = chicago_taxi_dataset_op(
        where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"',
        select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total',
        limit=10000,
    ).output

    # Training and prediction on dataset in CSV format
    model_trained_on_csv = xgboost_train_on_csv_op(
        training_data=training_data_csv,
        label_column=0,
        objective='reg:squarederror',
        num_iterations=200,
    ).outputs['model']

    xgboost_predict_on_csv_op(
        data=training_data_csv,
        model=model_trained_on_csv,
        label_column=0,
    )

    # Training and prediction on dataset in Apache Parquet format
    training_data_parquet = convert_csv_to_apache_parquet_op(
        training_data_csv
    ).output

    model_trained_on_parquet = xgboost_train_on_parquet_op(
        training_data=training_data_parquet,
        label_column_name='tips',
        objective='reg:squarederror',
        num_iterations=200,
    ).outputs['model']

    xgboost_predict_on_parquet_op(
        data=training_data_parquet,
        model=model_trained_on_parquet,
        label_column_name='tips',
    )

    # Checking cross-format predictions
    xgboost_predict_on_parquet_op(
        data=training_data_parquet,
        model=model_trained_on_csv,
        label_column_name='tips',
    )

    xgboost_predict_on_csv_op(
        data=training_data_csv,
        model=model_trained_on_parquet,
        label_column=0,
    )


kfp_endpoint=None
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(xgboost_pipeline, arguments={})

RunPipelineResult(run_id=b15a5069-f2d1-462b-a7fd-8a9e27522970)

### Add Visualization to Pipeline Task

In [None]:
@component(
    packages_to_install=['sklearn'],
    base_image='python:3.9'
)
def iris_sgdclassifier(test_samples_fraction: float, metrics: Output[ClassificationMetrics]):
    from sklearn import datasets, model_selection
    from sklearn.linear_model import SGDClassifier
    from sklearn.metrics import confusion_matrix

    iris_dataset = datasets.load_iris()
    train_x, test_x, train_y, test_y = model_selection.train_test_split(
        iris_dataset['data'], iris_dataset['target'], test_size=test_samples_fraction)


    classifier = SGDClassifier()
    classifier.fit(train_x, train_y)
    predictions = model_selection.cross_val_predict(classifier, train_x, train_y, cv=3)
    metrics.log_confusion_matrix(
        ['Setosa', 'Versicolour', 'Virginica'],
        confusion_matrix(train_y, predictions).tolist() # .tolist() to convert np array to list.
    )

@dsl.pipeline(
    name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
    iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.3)
kfp_endpoint=None
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(metrics_visualization_pipeline, arguments={})