# Demonstrate Kubeflow Pipelines with Dask and MLRun
  --------------------------------------------------------------------


## Create a project to host our functions, jobs and artifacts

Projects are used to package multiple functions, workflows, and artifacts. We usually store project code and definitions in a Git archive.

The following code creates a new project in a local dir and initialize git tracking on that

In [1]:
import os
import mlrun
import warnings
warnings.filterwarnings("ignore")

# set project name and dir
project_name = 'sk-project-dask'
project_dir = './project'

# specify artifacts target location
artifact_path = mlrun.set_environment(api_path = mlrun.mlconf.dbpath or 'http://mlrun-api:8080',
                                      artifact_path = os.path.abspath('./'),
                                      project = project_name,)

# set project
sk_dask_proj = mlrun.new_project(project_name, project_dir, init_git=True)



## Init Dask Cluster

In [2]:
import mlrun
# set up function from local file
dsf = mlrun.new_function(name="mydask", kind="dask", image="mlrun/ml-models")

# set up function specs for dask
dsf.spec.remote = True
dsf.spec.replicas = 5
dsf.spec.service_type = 'NodePort'
dsf.with_limits(mem="6G")
dsf.spec.nthreads = 5

> 2020-11-23 12:59:31,056 [info] using in-cluster config.


In [3]:
dsf.save()



'060deda27db0d90a1ca0c0434ca4cad278b14877'

In [4]:
dsf.apply(mlrun.mount_v3io())

<mlrun.runtimes.daskjob.DaskCluster at 0x7f6ed3b96d50>

In [6]:
dsf.client

> 2020-11-23 13:03:30,184 [info] trying dask client at: tcp://mlrun-mydask-ce9d12ee-0.default-tenant:8786
> 2020-11-23 13:03:30,192 [info] using remote dask scheduler (mlrun-mydask-ce9d12ee-0) at: tcp://mlrun-mydask-ce9d12ee-0.default-tenant:8786


0,1
Client  Scheduler: tcp://mlrun-mydask-ce9d12ee-0.default-tenant:8786  Dashboard: http://mlrun-mydask-ce9d12ee-0.default-tenant:8787/status,Cluster  Workers: 5  Cores: 25  Memory: 30.00 GB


## Load and run a functions

load the function object from .py .yaml file or function hub (marketplace)<br>

### Function #1

In [7]:
# load function from local file
dsf = sk_dask_proj.set_function('/User/demos/demos/project/sklearn-classifier-dask.py', 
                                name='dask_classifier', 
                                kind="job",
                                image="mlrun/ml-models")

### Function #2

In [8]:
describejob = sk_dask_proj.set_function("/User/demos/demos/project/describe.py", 
                                          name='describe', 
                                          kind="job", 
                                          image="mlrun/ml-models")

## Create a Fully Automated ML Pipeline

#### Add more functions to our project to be used in our pipeline (from the functions hub/marketplace)

Describe data, train and eval model with dask

#### Define and save a pipeline 

The following workflow definition will be written into a file, it describes a Kubeflow execution graph (DAG)<br>
and how functions and data are connected  to form an end to end pipeline. 

* Ingest data
* Describe data
* Train, test and evaluate with dask

Check the code below to see how functions objects are initialized and used (by name) inside the workflow.<br>
The `workflow.py` file has two parts, initialize the function objects and define pipeline dsl (connect the function inputs and outputs).

> Note: the pipeline can include CI steps like building container images and deploying models as illustrated  in the following example.


In [9]:
%%writefile project/workflow.py
from kfp import dsl
from mlrun import mount_v3io

# params
funcs       = {}
LABELS      = "VendorID"
DROP        = 'congestion_surcharge'
#DATA_URL    = "/User/iris.csv"
DATA_URL    = "/User/yellow_tripdata.csv"
DASK_CLIENT = "tcp://mlrun-mydask-ce9d12ee-0.default-tenant:8786"

# init functions is used to configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(mount_v3io())
        pass
     
@dsl.pipeline(
    name="Demo training pipeline",
    description="Shows how to use mlrun"
)
def kfpipeline():
    
    # describe data
    describe = funcs['describe'].as_step(
        handler="describe",
        params={"dask_address"  : DASK_CLIENT},
        inputs={"dataset"       : DATA_URL}
    )
    
    # get data, train, test and evaluate 
    train = funcs['dask_classifier'].as_step(
        name="train-skrf",
        handler="train_model",
        params={"label_column"    : LABELS,
                "dask_address"    : DASK_CLIENT,
                "test_size"       : 0.10,
                "model_pkg_class" : "sklearn.ensemble.RandomForestClassifier",
                "drop_cols"       : DROP},
        inputs={"dataset"         : DATA_URL},
        outputs=['model', 'test_set']
    )
    
    train.after(describe)

Overwriting project/workflow.py


In [10]:
# register the workflow file as "main", embed the workflow code into the project YAML
sk_dask_proj.set_workflow('main', 'workflow.py', embed=True)

Save the project definitions to a file (project.yaml), it is recommended to commit all changes to a Git repo.

In [11]:
sk_dask_proj.save()

<a id='run-pipeline'></a>
## Run a pipeline workflow
use the `run` method to execute a workflow, you can provide alternative arguments and specify the default target for workflow artifacts.<br>
The workflow ID is returned and can be used to track the progress or you can use the hyperlinks

> Note: The same command can be issued through CLI commands:<br>
    `mlrun project my-proj/ -r main -p "v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/"`

The `dirty` flag allow us to run a project with uncommited changes (when the notebook is in the same git dir it will always be dirty)<br>
The `watch` flag will wait for the pipeline to complete and print results

In [12]:
artifact_path = os.path.abspath('./pipe/{{workflow.uid}}')
run_id = sk_dask_proj.run(
    'main',
    arguments={}, 
    artifact_path=artifact_path, 
    dirty=False, watch=True)



> 2020-11-23 13:03:41,610 [info] Pipeline run id=09454d62-623a-46f7-a0b5-0f0728f1fa2a, check UI or DB for progress
> 2020-11-23 13:03:41,610 [info] waiting for pipeline run completion


uid,start,state,name,results,artifacts
...0d88b337,Nov 23 13:04:07,completed,train-skrf,micro=0.9541915853120504macro=0.8253826095402947precision-2=0.8270983635403836precision-1=0.8389381392490554precision-4=0.06947890818858561recall-2=0.7464190812945299recall-1=0.9035754541403445recall-4=0.009504412763068567f1-2=0.7846903797355651f1-1=0.870057958851424f1-4=0.016721409375933114,ROCAUCClassificationReportConfusionMatrixFeatureImportancesmodelstandard_scalerlabel_encodertest_set
...338fb234,Nov 23 13:03:49,completed,describe-describe,,describe


**[back to top](#top)**