# Kubeflow pipeline as AI Core workflow


This notebook contains the code for the corresponding blog post.

## Simple Kubeflow pipeline


The pipeline will have two steps:
1. Download the California housing dataset from a URL and store it as an artifact
2. Use the stored dataset to train a Linear Regression model and make predictions which are also stored as an artifact

Using the Kubeflow Python SDK `kfp`, we create Python functions for the two steps in our pipeline. The first function is called `make_step_download`. It uses pandas to read a csv file from a url and save it as an artifact. We say to Kubeflow, that this function will output a csv file with the parameters `output_csv: comp.OutputPath('CSV')`.

The second function will receive a csv file as input `input_csv: comp.InputPath('CSV')` and output again a csv file with predictions `output_csv: comp.OutputPath('CSV'`. Again all necesary libraries are imported within the defined function. We train then a linear regression model on the dataset and save a csv file with predictions.

After the definition of these functions, we use the `kfp.components.create_component_from_func` function to make these functions available as steps for our Kubeflow pipeline. We can also define the base docker images and which Python libraries should be installed.

Afterwards we create a pipeline function where we use the before defined steps for our pipeline. We also define, that the output from our first step should be the input to the second step. This pipeline function can no be compiled to a YAML workflow file using the Kubeflow compiler `kfp.compiler.Compiler()`.

This YAML file can be uploaded to a Kubeflow cluster and then the pipeline can be started. 

In [13]:
import kfp
import kfp.components as comp
kfp.__version__

'1.8.9'

In [3]:
def make_step_download(output_csv: comp.OutputPath('CSV')):
    import pandas as pd
    url = "https://raw.githubusercontent.com/ageron/handson-ml/master/datasets/housing/housing.csv"
    df = pd.read_csv(url)
    df.fillna(0,inplace=True)
    df.to_csv(output_csv, index=False)
    return None
    
def make_step_train(input_csv: comp.InputPath('CSV'), output_csv: comp.OutputPath('CSV')):
    from sklearn.model_selection import train_test_split
    from sklearn.linear_model import LinearRegression
    from sklearn.metrics import mean_squared_error
    import pandas as pd
    from joblib import load, dump
    
    df = pd.read_csv(input_csv)
    X = df[['total_rooms','households', 'latitude','longitude','total_bedrooms','population','median_income']]
    y_target = df['median_house_value'] 

    X_train, X_test, y_train, y_test = train_test_split(X, y_target, test_size=0.3)
    model = LinearRegression()
    model.fit(X_train, y_train)
    
    y_predict = model.predict(X_test) 
    rmse = mean_squared_error(y_predict, y_test, squared=False)
    print("RMSE = ",rmse)
    
    df = pd.DataFrame(y_predict)
    df.to_csv(output_csv)
    return None


step_download = kfp.components.create_component_from_func(
    func=make_step_download,
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4'])

step_train = kfp.components.create_component_from_func(
    func=make_step_train,
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4','sklearn'])

def my_pipeline():
    download_step = step_download()
    train_step = step_train(download_step.outputs['output_csv'])
    
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path='pipeline.yaml')

## Transform Kubeflow pipeline to AI Core Workflow

In the following, we will show the changes we have to make to the YAML file to make it work with AI Core.

In [4]:
# Start with a before created docker image as base image
step_download = kfp.components.create_component_from_func(
    func=make_step_download,
    base_image='docker.io/flxschneider/text-train:0.0.1')

step_train = kfp.components.create_component_from_func(
    func=make_step_train,
    base_image='docker.io/flxschneider/text-train:0.0.1')

def my_pipeline():
    download_step = step_download()
    train_step = step_train(download_step.outputs['output_csv'])
    
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path='workflow.yaml')

In [5]:
import ruamel.yaml as yaml
with open("workflow.yaml", "r") as stream:
      workflow = yaml.safe_load(stream)

The beginning of the workflow YAML file looks like this:

`{'apiVersion': 'argoproj.io/v1alpha1',
 'kind': 'Workflow',
 'metadata': {'generateName': 'my-pipeline-',
  'annotations': {'pipelines.kubeflow.org/kfp_sdk_version': '1.8.9',
   'pipelines.kubeflow.org/pipeline_compilation_time': '2022-01-18T12:57:16.488742',
   'pipelines.kubeflow.org/pipeline_spec': '{"name": "My pipeline"}'},
  'labels': {'pipelines.kubeflow.org/kfp_sdk_version': '1.8.9'}},`

In [6]:
# 1. Change kind from Workflow to WorkflowTemplate
workflow["kind"] = "WorkflowTemplate"

In [7]:
# 2. Define a name for the workflow
name = "ca-housing-linreg"
workflow["metadata"]["name"] = name

In [8]:
# 3. Add AI Core annotations, also define what kind of outputs we create
AI_Core_annotations = {'scenarios.ai.sap.com/description': "CA Housing linear regression",
  'scenarios.ai.sap.com/name': "ca-housing-train-scenario",
  'executables.ai.sap.com/description': "CA Housing linear regression",
  'executables.ai.sap.com/name': name,
  'artifacts.ai.sap.com/make-step-download-output_csv.kind': "dataset",
  'artifacts.ai.sap.com/make-step-train-output_csv.kind': "dataset"}

workflow["metadata"]["annotations"] = {**workflow["metadata"]["annotations"],**AI_Core_annotations}


In [9]:
# 4. Add AI Core labels
AI_Core_labels = {'scenarios.ai.sap.com/id': "ca-housing",
    'executables.ai.sap.com/id': name,
    'ai.sap.com/version': "1.0.2"}

workflow["metadata"]["labels"] = {**workflow["metadata"]["labels"], **AI_Core_labels}

In [10]:
# 5. Add ImagePullSecret if docker image is not public
workflow["spec"]["imagePullSecrets"] = [{"name" : "docker-registry-secret2"}]

With the changes, the file looks now like this:

`{'apiVersion': 'argoproj.io/v1alpha1',
 'kind': 'WorkflowTemplate',
 'metadata': {'generateName': 'my-pipeline-',
  'annotations': {'pipelines.kubeflow.org/kfp_sdk_version': '1.8.9',
   'pipelines.kubeflow.org/pipeline_compilation_time': '2022-01-18T12:59:24.447573',
   'pipelines.kubeflow.org/pipeline_spec': '{"name": "My pipeline"}',
   'scenarios.ai.sap.com/description': 'CA Housing linear regression',
   'scenarios.ai.sap.com/name': 'ca-housing-train-scenario',
   'executables.ai.sap.com/description': 'CA Housing linear regression',
   'executables.ai.sap.com/name': 'ca-housing-linreg',
   'artifacts.ai.sap.com/make-step-download-output_csv.kind': 'dataset',
   'artifacts.ai.sap.com/make-step-train-output_csv.kind': 'dataset'},
  'labels': {'pipelines.kubeflow.org/kfp_sdk_version': '1.8.9',
   'scenarios.ai.sap.com/id': 'ca-housing',
   'executables.ai.sap.com/id': 'ca-housing-train-LinReg',
   'ai.sap.com/version': '1.0.2'},
  'name': 'ca-housing-linreg'},`

In [11]:
# 6. Add globalName to the output artifacts

name = workflow["spec"]["templates"][0]["outputs"]["artifacts"][0]["name"]
workflow["spec"]["templates"][0]["outputs"]["artifacts"][0]["archive"] = {"none":{}}
workflow["spec"]["templates"][0]["outputs"]["artifacts"][0]["globalName"] = name

name = workflow["spec"]["templates"][1]["outputs"]["artifacts"][0]["name"]
workflow["spec"]["templates"][1]["outputs"]["artifacts"][0]["archive"] = {"none":{}}
workflow["spec"]["templates"][1]["outputs"]["artifacts"][0]["globalName"] = name
workflow["spec"]["templates"][0]["outputs"]["artifacts"][0]

{'name': 'make-step-download-output_csv',
 'path': '/tmp/outputs/output_csv/data',
 'archive': {'none': {}},
 'globalName': 'make-step-download-output_csv'}

We now add a template with the name `my-pipeline` which is also our entrypoint. There we define which steps should be done by our pipeline:

`{'name': 'my-pipeline',
  'steps': [{'name': 'make-step-download', 
            'template': 'make-step-download'},
           {'name': 'make-step-train', 
            'template': 'make-step-train'}]}`

In [14]:
# 7. We define the steps of the pipeline
new = {'name': 'my-pipeline',
 'steps': [[{'name': 'make-step-download', 'template': 'make-step-download'}],
  [{'name': 'make-step-train', 'template': 'make-step-train'}]]}
workflow["spec"]["templates"][2] = new
new

{'name': 'my-pipeline',
 'steps': [[{'name': 'make-step-download', 'template': 'make-step-download'}],
  [{'name': 'make-step-train', 'template': 'make-step-train'}]]}

# Save new workflow

In [15]:
with open("workflow.yaml", "w") as f:
    yaml.dump(workflow, f)

This YAML file can now be uploaded to a github repository which is connected to a AI Core instance. When we create a docker image before and adjust the smaller changes to the YAML file, we can use the Kubeflow SDK to create workflows for AI Core.