# About this notebook
@author: Yingding Wang\
@created_on: 18.10.2022\
@upated_on: 18.10.2022

This notebook shows how to create a KFP v2 pipeline, with the KFP SDK. It also runs the KFP v2 pipeline in V2_COMPATIBLE mode. 

Notice:
* v2 compatible mode -- known caveasts and breaking changes: https://github.com/kubeflow/pipelines/issues/6133

### Useful JupyterLab Basic

Before start, you may consider to update the jupyterlab with the command

<code>python
!{sys.executable} -m pip install --upgrade --user jupyterlab    
</code>  

### Useful Jupyter Notebook Basic
1. Autocomplete syntax with "Tab"
2. View Doc String with "Shift + Tab"
3. mark the code snippet -> select with right mouse -> Show Contextual Help (see the function code)
4. comment lines on macosx the cmd + / (over num pad, not the / on 7)

In [1]:
import sys, os
print(f"Sys version: {sys.version}")

# %env
# os.environ["KF_PIPELINES_SA_TOKEN_PATH"]="/var/run/secrets/kubernetes.io/serviceaccount/token"
# os.environ["KF_PIPELINES_SA_TOKEN_PATH"]="/var/run/secrets/kubeflow/pipelines/token"

Sys version: 3.8.10 | packaged by conda-forge | (default, May 11 2021, 07:01:05) 
[GCC 9.3.0]


In [2]:
from platform import python_version
print (f"current platform python version: {python_version()}")

current platform python version: 3.8.10


### Update the JupyterLab

In [3]:
!{sys.executable} -m pip show jupyterlab # 3.0.16
# !{sys.executable} -m pip show jupyter_contrib_nbextensions

Name: jupyterlab
Version: 3.4.8
Summary: JupyterLab computational environment
Home-page: https://jupyter.org
Author: Jupyter Development Team
Author-email: jupyter@googlegroups.com
License: 
Location: /home/jovyan/.local/lib/python3.8/site-packages
Requires: ipython, jinja2, jupyter-core, jupyter-server, jupyterlab-server, nbclassic, notebook, packaging, tomli, tornado
Required-by: 


In [4]:
# update the jupyter lab
#!{sys.executable} -m pip install --upgrade --user jupyterlab

In [5]:
"""upgrade the kfp server api version to 1.7.0 for KF 1.4"""
# !{sys.executable} -m pip uninstall -y kfp-server-api
# !{sys.executable} -m pip install --user --upgrade kfp-server-api==1.7.0
"""upgrade the kfp server api version to 1.8.0 for KF 1.5.1"""
# !{sys.executable} -m pip uninstall -y kfp-server-api
# !{sys.executable} -m pip install --user --upgrade kfp-server-api==1.8.2
"""upgrade the kfp server api version to 1.8.5 for KF 1.6.1 (with backend 2.0.0a5 for V2 compatible)"""
# !{sys.executable} -m pip uninstall -y kfp-server-api
# !{sys.executable} -m pip install --user --upgrade kfp-server-api==1.8.5

'upgrade the kfp server api version to 1.8.5 for KF 1.6.1 (with backend 2.0.0a5 for V2 compatible)'

In [6]:
%%capture
# use %%capture magic to suppress the output of this cell, comment it to see the installation logs.

# the kfp-server-api 1.8.5 shall be updated with the KFP 1.8.14.
!{sys.executable} -m pip install --upgrade --user kfp==1.8.14

#!{sys.executable} -m pip install --upgrade --user kubernetes==18.20.0
#!{sys.executable} -m pip install --upgrade --user kubernetes==21.7.0

# Restart the kernal
After update the kfp, restart this notebook kernel

Jupyter notebook: Meun -> Kernel -> restart kernel

## Check the KubeFlow Pipeline version on the server side

In [7]:
import sys

In [8]:
!{sys.executable} -m pip list | grep kfp

kfp                      1.8.14
kfp-pipeline-spec        0.1.16
kfp-server-api           1.8.5


### Check my KubeFlow namespace total resource limits

In [9]:
# run command line to see the quota
# !kubectl describe quota

### Setup
Example Pipeline from
https://github.com/kubeflow/examples/tree/master/pipelines/simple-notebook-pipeline

### Getting started with Python function-based components
https://www.kubeflow.org/docs/components/pipelines/v1/sdk-v2/python-function-components/

In [10]:
from dataclasses import dataclass
# Used components in the pipeline, which has no root
@dataclass
class Setup():
    base_image: str = "python:3.8.15"
    
setup = Setup()    

In [11]:
import kfp
import kubernetes

# import kfp.compiler as compiler
# use the v2 dsl and components to run the pipeline in V2_COMPATIBLE mode
from kfp.v2 import dsl
from kfp.v2.dsl import (
    ContainerOp,
    component,
    Input,
    Output,
    Artifact,
    Dataset,
    Model
)

EXPERIMENT_NAME = 'demo' # Name of the experiment folder in the KF WebApp UI
EXPERIMENT_DESC = 'demo of kubeflow experiment folder'
# client = kfp.Client()
NAME_SPACE = kfp.Client().get_user_namespace() # read the namespace of notebook namespace

## Connecting KFP Python SDK from Notebook to Pipeline

* Overview https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/
* From Inside KF https://www.kubeflow.org/docs/components/pipelines/v1/sdk/connect-api/#full-kubeflow-subfrom-inside-clustersub

In [12]:
print(kfp.__version__)
print(kubernetes.__version__)

1.8.14
12.0.1


In [13]:
@component(
    base_image=setup.base_image,
    packages_to_install=["numpy==1.23.4"],
    output_component_file="my_cal_op_component.yaml"
)
def add_op(a: float, b: float) -> float:
    '''Calculates sum of two arguments'''
    import numpy as np
    vec_a = np.array([a])
    vec_b = np.array([b])
    vec_sum = np.add(vec_a, vec_b)
    print(f"vec_a: {vec_a} + vec_b: {vec_b} = {vec_sum}")
    return vec_sum[0]

### Add pod memory and cpu restriction

https://github.com/kubeflow/pipelines/pull/5695

In [14]:
def op_resource_transformer(op: ContainerOp, mem_req="200Mi", cpu_req="500m", mem_lim="4000Mi", cpu_lim='4000m'):
    """
    this function helps to set the resource limit for container operators
    op.set_memory_limit('1000Mi') = 1GB
    op.set_cpu_limit('1000m') = 1 cpu core
    """
    return op.set_memory_request(mem_req)\
            .set_memory_limit(mem_lim)\
            .set_cpu_request(cpu_req)\
            .set_cpu_limit(cpu_lim)

### Run V2/V2 compatible pipeline
* https://www.kubeflow.org/docs/components/pipelines/v1/sdk-v2/build-pipeline/#compile-and-run-your-pipeline

In [15]:
@dsl.pipeline(
   name='Calculation-Pipeline', 
   description='A toy pipeline that performs arithmetic calculations.',
   # no pipeline root necessary to use the built-in on-prem minio artifact respository 
   # You can optionally specify your own pipeline_root,
   # pipeline_root='minio://mlpipeline/v2/artifacts'
)
def calc_pipeline(
   a: float =0,
   b: float =7
):  
    increase: float = 4
    
    first_add_task = op_resource_transformer(add_op(a, increase))
    first_add_task.set_caching_options(False)
    first_add_task.set_display_name("add op 1")
    second_add_task = op_resource_transformer(add_op(first_add_task.output, b)) 
    second_add_task.set_caching_options(False)
    second_add_task.set_display_name("add op 2")

### (optional step) Compile the pipeline to see the settings

In [16]:
PIPE_LINE_FILE_NAME="calc_pipeline_with_resource_limits_v2"
kfp.compiler.Compiler(
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE
).compile(
    pipeline_func=calc_pipeline,
    package_path=f"{PIPE_LINE_FILE_NAME}.yaml"
)



# Run Pipeline with Multi-user Isolation

https://www.kubeflow.org/docs/components/pipelines/multi-user/

In [17]:
client = kfp.Client()

In [18]:
# Make sure the volume is mounted /run/secrets/kubeflow/pipelines 
# client.get_experiment(experiment_name=EXPERIMENT_NAME, namespace=NAME_SPACE)

In [19]:
# client.list_pipelines()
# print(NAME_SPACE)
# client.list_experiments(namespace=NAME_SPACE)
# exp = client.create_experiment(EXPERIMENT_NAME, description=EXPERIMENT_DESC)

In [20]:
# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}
pipeline_config: kfp.dsl.PipelineConf = kfp.dsl.PipelineConf().set_image_pull_policy("IfNotPresent")

In [21]:
run_result = client.create_run_from_pipeline_func(
    pipeline_func=calc_pipeline, 
    arguments=arguments,
    experiment_name=EXPERIMENT_NAME, 
    namespace=NAME_SPACE,
    enable_caching=False,
    pipeline_conf=pipeline_config,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE
)

### Wait for the run completion
* doc: https://kubeflow-pipelines.readthedocs.io/en/latest/_modules/kfp/_client.html

In [22]:
seconds_to_wait = 60
max_try = 2

In [23]:
from datetime import timedelta
from kfp_server_api.models.api_run_detail import ApiRunDetail
timeout = timedelta(seconds=seconds_to_wait)

# check the run status, with max_try times if a timeout is detected.
current_try = 0
while current_try < max_try:
    try:
        # wait for run completion with timeout given
        kfp_run: ApiRunDetail = run_result.wait_for_run_completion(timeout)
        print(kfp_run.run.status)
        break
    except TimeoutError as e:
        print("Timeout detected, try to get run status again.")
    finally:
        current_try += 1

Timeout detected, try to get run status again.
Succeeded
