# Define and run a distributed training pipeline

In this notebook we will use **MLRun** to run all the functions we've written in the [mlrun-mpijob-classify](mlrun-mpijob-classify.ipynb) and [nuclio-serving-tf-images](nuclio-serving-tf-images.ipynb) in a **Kubeflow Pipeline**.

**Kubeflow Pipelines** will supply the orchestration to run the pipeline, while **MLRun** will supply an easy interface to define the pipeline and lunch the serving function at the end.

We will show how to:
* Run remote functions from notebooks using `code_to_function`
* Run saved functions from our DB using `import_function`
* How to define and lunch a Kubeflow Pipeline
* How to access the DB from the code and list the pipeline's entries

In [1]:
# nuclio: ignore
import nuclio

In [2]:
from mlrun import new_function, code_to_function, get_run_db, mount_v3io, mlconf, new_model_server, v3io_cred, import_function
import os
 
mlconf.dbpath = 'http://mlrun-api:8080'

  % (item.__module__, item.__name__)


In [3]:
base_dir = '/User/mlrun/examples'
images_path = os.path.join(base_dir, 'images')
model_name = 'cat_vs_dog_v1'

## Import and define ML functions for our pipeline (utils, training, serving)

Using `code_to_function` we parse the given python file and build a function from it

In [4]:
# data import and labeling 
utilsfn = code_to_function(name='file_utils', filename='./utils.py',
                           image='mlrun/mlrun:latest')
#utilsfn.deploy()

Using `import_function` we import the horovod training function from our DB.  
As we can see, all the function deployment parameters were saved, like Replicas, GPU Configuration, Mounts, Runtime and the code source.

In [5]:
# read the training function object from MLRun DB
trainer_fn = import_function('db://horovod-trainer')
trainer_fn.to_dict()

{'kind': 'mpijob',
 'metadata': {'name': 'horovod-trainer',
  'tag': 'latest',
  'hash': '9232685b13eda1a7ab3e8d09a3228c949e5c2c05',
  'project': 'default',
  'updated': 'Fri, 27 Dec 2019 09:54:56 GMT'},
 'spec': {'command': '/User/mlrun-demos/demos/image-classification/horovod_training.py',
  'args': [],
  'image': 'mlrun/mpijob:latest',
  'volumes': [{'flexVolume': {'driver': 'v3io/fuse',
     'options': {'accessKey': '275eeda5-5d83-427e-adda-ddb469370fb5',
      'container': 'users',
      'subPath': '/admin'}},
    'name': 'v3io'}],
  'volume_mounts': [{'mountPath': '/User', 'name': 'v3io'}],
  'env': [{'name': 'V3IO_API', 'value': 'v3io-webapi.default-tenant.svc:8081'},
   {'name': 'V3IO_USERNAME', 'value': 'admin'},
   {'name': 'V3IO_ACCESS_KEY',
    'value': '275eeda5-5d83-427e-adda-ddb469370fb5'}],
  'description': '',
  'replicas': 4,
  'image_pull_policy': 'Always',
  'build': {'commands': []}}}

Using `filename=<jupyter notebook file>` in the `new_model_server` we parse the given Jupyter Notebook and build our model server from it.

> All the annotations given in the notebook will be parsed and saved to the function normally

The model server will deploy the model given under `models={<model_name>:<model_file_path>}` as `model_class=<model_class_name>` .  
Just like any other MLRun function we can set our environment variables, workers and add mounts.

The model server will provide us with a `/<model_name>/predict` endpoint where we can query the model.

In [6]:
# inference function
fn = new_model_server('tf-images-server', 
                      filename='./nuclio-serving-tf-images.ipynb',
                      model_class='TFModel')
fn.set_env('classes_map', classes_map_filepath)
fn.with_http(workers=2)
fn.apply(mount_v3io())

<mlrun.runtimes.function.RemoteRuntime at 0x7fbaf0e2ccc0>

## Create and run the pipeline

In this part we define the Kubeflow Pipeline to run our process.  
MLRun helps us doing that by requiring us to only add `<fn>.as_step()` in order to turn our functions to a pipeline step for kubeflow.  All the parameters and inputs can be then set regularly and will be deployed as defined in the pipeline.  

The pipeline order is defined by the following:
* We can specify `<fn>.after(<previous fn>)`
* We can specify that a function has a parameter or input, taken from a previous function.  
  Ex: `models={'cat_vs_dog_v1': train.outputs['model']}` in the inference function definition, taking the model file from the training function.
  
Notice that you need to `log_artifact` in your function and write it's name in the function's `outputs` parameter to expose it to the pipeline for later use.

In [7]:
import kfp
from kfp import dsl

In [8]:
artifacts_path = 'v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/'

In [9]:
@dsl.pipeline(
    name='Image classification training pipeline',
    description='Shows how to use mlrun with horovod.'
)
def hvd_pipeline(
    images_path = '/User/mlrun/examples/images', 
    source_dir='/User/mlrun/examples/images/cats_n_dogs'
):
    open_archive = utilsfn.as_step(name='download', handler='open_archive',
                                   out_path=images_path, 
                                   params={'target_dir': images_path},
                                   inputs={'archive_url': 'http://iguazio-sample-data.s3.amazonaws.com/catsndogs.zip'},
                                   outputs=['content']).apply(mount_v3io())
              
    label = utilsfn.as_step(name='label', handler='categories_map_builder',
                            out_path=images_path,
                            params={'source_dir': source_dir}, 
                            outputs=['categories_map', 'file_categories']).apply(mount_v3io()).after(open_archive)
    
    train = trainer_fn.as_step(name='train', 
                               params = {'epochs' : 8,
                                         'checkpoints_dir' : '/User/mlrun/examples/checkpoints',
                                         'model_path' : '/User/mlrun/examples/models/cats_n_dogs.hd5'},
                               inputs = {'data_path' : source_dir,
                                         'categories_map': label.outputs['categories_map'],
                                         'file_categories': label.outputs['file_categories']},                               
                               out_path=images_path, 
                               outputs=['model']).apply(v3io_cred())

    # deploy the model using nuclio functions
    deploy = inference_function.deploy_step(project = 'horovod', models={'cat_vs_dog_v1': train.outputs['model']})


In [10]:
# for debug generate the pipeline dsl
kfp.compiler.Compiler().compile(hvd_pipeline, 'hvd_pipeline.yaml')

In [11]:
client = kfp.Client(namespace='default-tenant')
arguments = {}
run_result = client.create_run_from_pipeline_func(hvd_pipeline, arguments, experiment_name='horovod1')

In [14]:
# connect to the run db 
db = get_run_db().connect()

In [16]:
# query the DB with filter on workflow ID (only show this workflow) 
db.list_runs('', labels=f'workflow={run_result.run_id}').show()

uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...4dc101,0,Dec 27 11:28:40,completed,train,host=train-f5c53e73-worker-0kind=mpijobmlrun/job=train-f5c53e73owner=adminworkflow=4692bf15-8ca5-4db0-abff-41822481fe70,categories_mapdata_pathfile_categories,batch_size=64checkpoints_dir=/User/mlrun/examples/checkpointsepochs=8image_channels=3image_height=128image_width=128model_path=/User/mlrun/examples/models/cats_n_dogs.hd5,accuracy=0.8470312356948853loss=0.3525520624220371,modelsummary.html
...7cc3c8,0,Dec 27 11:28:02,completed,label,host=image-classification-training-pipeline-wrc4c-968241013kind=owner=adminworkflow=4692bf15-8ca5-4db0-abff-41822481fe70,,source_dir=/User/mlrun/examples/images/cats_n_dogs,,categories_mapfile_categories
...046e26,0,Dec 27 11:27:38,completed,download,host=image-classification-training-pipeline-wrc4c-887206401kind=owner=adminworkflow=4692bf15-8ca5-4db0-abff-41822481fe70,archive_url,target_dir=/User/mlrun/examples/images,,content
