### Build and Run Multi-Step Kubeflow Pipeline from Notebook using Lightweight Components 


Lightweight Python components do not require you to build a new container image for every code change. They’re intended for fast iteration in a notebook environment. This allows you to quikly build multi step pipeline from jupyter notebook and run on Kubeflow.

**Advantages over container components**

- Faster iteration: No need to build new container image after every change (building images takes some time).
- Easier authoring: Components can be created in a local environment. Docker and Kubernetes are not required.

[Read more on requirements to build lightwieght components](https://www.kubeflow.org/docs/pipelines/sdk/lightweight-python-components)


In [None]:
# Install dependencies
#!pip install kfp
#!pip install python-dotenv
#!pip install adal --upgrade


## Step 1 : Process Data 

In [None]:
def process_data(train_datapath) -> str:
    print(train_datapath)    
    processed_data_path= train_datapath + '/processed'
    return processed_data_path

## Step 2 : Train Model

In [None]:
#Advanced function
#Demonstrates imports, helper functions and multiple outputs
from typing import NamedTuple
def train_model(train_data_path:str, epoch : int) -> NamedTuple('TrainModelOutput', [('model_path', str), ('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics')]):
    '''Divides two numbers and calculate  the quotient and remainder'''
    #Pip installs inside a component function.
    #NOTE: installs should be placed right at the beginning to avoid upgrading a package
    # after it has already been imported and cached by python
    # If possible avoid installs and include them in base image 
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn'])
    
    #Imports inside a component function:
    import sklearn
    import json
    from collections import namedtuple

    #This function demonstrates how to use nested functions inside a component function:
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)

    #Train model placeholder
    model_path= train_data_path + "/trained_model/model.pkl" 
    print(model_path)
    
    # Exports a sample tensorboard:
    metadata = {
      'outputs' : [{
        'type': 'model',
        'source': '/path/to/model',
      }]
    }
    with open('/mlpipeline_ui_metadata.json', 'w') as f:
       json.dump(metadata, f)


    train_loss = '11'
    metrics = {
        'metrics': [{
          'name': 'loss', # The name of the metric. Visualized as the column name in the runs table.
          'numberValue':  train_loss, # The value of the metric. Must be a numeric value.
          'format': "PERCENTAGE",   # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
        }]
      }
    with open('/mlpipeline-metrics.json', 'w') as f:
       json.dump(metrics, f)


    TrainModelOutput = namedtuple('TrainModelOutput', ['model_path', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return TrainModelOutput(model_path,json.dumps(metadata), json.dumps(metrics))

## Step 2 : Evaluate Model

In [None]:
from typing import NamedTuple
def evaluate_model(trained_model_path:str) -> NamedTuple('EvaluateModelOutput', [('mlpipeline_metrics', 'Metrics')]):
    import json
    from collections import namedtuple
    print(trained_model_path)    
    #Evalaute model placeholder
    
    accuracy_score = '70'
    metrics = {
        'metrics': [{
          'name': 'accuracy_score', # The name of the metric. Visualized as the column name in the runs table.
          'numberValue':  accuracy_score, # The value of the metric. Must be a numeric value.
          'format': "PERCENTAGE",   # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
        }]
      }
    with open('/mlpipeline-metrics.json', 'w') as f:
       json.dump(metrics, f)
    
    EvaluteModelOutput = namedtuple('EvaluteModelOutput', ['mlpipeline_metrics'])
    return EvaluteModelOutput(json.dumps(metrics))
    

## Build and compile Kubeflow pipeline

In [None]:
import kfp.dsl as dsl
import kfp
import kfp.components as comp
import kfp.compiler as compiler
@dsl.pipeline(
   name='Local Dev pipeline',
   description='Pipeline demonstrating local dev experiece with Kubeflow'
)
def tacosandburritos_train():
    process_data_op = comp.func_to_container_op(process_data,base_image='kubeflowyoacr.azurecr.io/mexicanfood/notebook-comp:latest')
    process_data_task = process_data_op("/train_data") #Returns a dsl.ContainerOp class instance. 
    
    train_model_op = comp.func_to_container_op(train_model,base_image='kubeflowyoacr.azurecr.io/mexicanfood/notebook-comp:latest')
    train_model_task = train_model_op(process_data_task.output,5) #Returns a dsl.ContainerOp class instance. 
    
    evaluate_model_op = comp.func_to_container_op(evaluate_model,base_image='kubeflowyoacr.azurecr.io/mexicanfood/notebook-comp:latest')
    evaluate_model_task = evaluate_model_op(train_model_task.output) #Returns a dsl.ContainerOp class instance. 
    
    
if __name__ == '__main__':
    compiler.Compiler().compile(tacosandburritos_train, 'pipeline.tar.gz')

In [None]:
%%bash
# Verify compiled pipeline in directory
ls *.tar.gz

## Load Environement Variables from .ENV

**Note**: Make sure the .env file is in the same directory as this notebook.Do not check .env with secerets values in your repo. Refere the env.example from the repo 

In [None]:
from dotenv import load_dotenv
# Load env variable from
load_dotenv()

## Upload and run pipeline on kubeflow 

Upload compiled pipeline to Kubeflow, create experiment and run 

In [None]:
import os
from kubernetes import client as k8s_client
import kfp
import kfp.dsl as dsl
import kfp.compiler as compiler
import kfp.components as components
from kfp.azure import use_azure_secret
import uuid
import subprocess
import adal

authorityHostUrl = "https://login.microsoftonline.com"
GRAPH_RESOURCE = '00000002-0000-0000-c000-000000000000'
authority_url = authorityHostUrl + '/' + os.environ.get("TENANT_ID")
context = adal.AuthenticationContext(authority_url)
token = context.acquire_token_with_client_credentials(GRAPH_RESOURCE, os.environ.get("SP_APP_ID"), os.environ.get("SP_APP_SECRET"))  # noqa: E501
client = kfp.Client(os.environ.get("KFP_HOST"), existing_token=token['accessToken'])
pipeline_file = os.path.join('pipeline.tar.gz')
pipeline_name = os.environ.get("PIPELINE_NAME") + "-" + str(uuid.uuid4())
#Upload Pipeline 
pipeline = client.pipeline_uploads.upload_pipeline(pipeline_file, name=pipeline_name)   
exp = client.get_experiment(experiment_name=os.environ.get("EXP_NAME"))
# Create parameters and passto run_pipeline if your pipeline takes paprameters 
# pipeline_params = {}
# pipeline_params["paramname"] = paramvalue
# Run pipeline
client.run_pipeline(exp.id, job_name=os.environ.get("RUN_NAME"),pipeline_id=pipeline.id)