# Using MLRUN function locally, as a Kubernetes Job, and in a Workflow

In [None]:
# for loading specific MLRUN git version 
!pip uninstall -y mlrun
!pip install git+https://github.com/mlrun/mlrun.git@development

In [1]:
# nuclio: ignore
# if the nuclio-jupyter package is not installed run !pip install nuclio-jupyter
import nuclio 

## Define function and its dependencies 

In [2]:
%nuclio cmd -c pip install pandas
%nuclio config spec.build.baseImage = "python:3.6-jessie"

%nuclio: setting spec.build.baseImage to 'python:3.6-jessie'


In [3]:
import os

def training(context, p1=1, p2=2):
    # access input metadata, values, and inputs
    print(f'Run: {context.name} (uid={context.uid})')
    print(f'Params: p1={p1}, p2={p2}')
    context.logger.info('started training')
    
    # do some training 
    
    # log the run results (scalar values)
    context.log_result('accuracy', p1 * 2)
    context.log_result('loss', p1 * 3)
    
    # add a lable/tag to this run 
    context.set_label('category', 'tests')
    
    # log a simple artifact + label the artifact 
    context.log_artifact('model.txt', body=b'abc is 123', labels={'framework': 'xgboost'})

def validation(context, model):
    # access input metadata, values, files, and secrets (passwords)
    print(f'Run: {context.name} (uid={context.uid})')
    #model = context.get_object('model', model)
    print('file - {}:\n{}\n'.format(model.url, model.get()))
    
    context.logger.info('started validation')
    
    context.log_artifact('validation.html', body=b'<b> validated </b>', viewer='web-app')

def listfiles(context, path='/'):
    l = os.listdir(path)
    print(l)
    return '{} contain {} files'.format(path, len(l))

## Load MLRUN and specify defaults 

In [4]:
# nuclio: end-code
# (end-code marker tells nuclio to stop parsing the notebook from this cell)

# set mlrun db path (can also be specified in run_start command)
%env MLRUN_DBPATH=/User/mlrun
#%env MLRUN_PACKAGE_PATH=git+https://github.com/mlrun/mlrun.git@development

from mlrun import new_function, code_to_function, NewTask
from mlrun.platforms import mount_v3io

env: MLRUN_DBPATH=/User/mlrun
env: MLRUN_PACKAGE_PATH=git+https://github.com/mlrun/mlrun.git@development


## Test the code locally
the functions above can be tested locally, parameters, inputs, and outputs can be specified in the API or the runspec object<br>
we create a `function` which defines the runtime environment (type, code, image, ..) and `run` a tasks/experiments using that function <br>
(we use the `local` runtime by default, later on we will use a `job` runtime for running containers, and can use other runners like MpiJob, Spark, Dask, Nuclio, ..)

in each run we can specify the function, inputs, parameters/hyper-parameters, etc. (check the `RunTemplate` class for details)<br>
in Jupyter runs print a summary table with metadata and links to data artifacts, this can be disabled with `visible=False` in `.run()`

In [5]:
fn = new_function()
list_run = fn.run(handler=listfiles, params={'path': '/User'})

[mlrun] 2019-09-30 23:21:12,522 starting run None uid=6b8b48659f514766b31e68d5089efba3
['.bash_history', '.bash_profile', '.bashrc', '.config', '.gitignore', '.igz', '.ipynb_checkpoints', '.ipython', '.jupyter', '.local', '.python_history', '.pythonlibs', '.viminfo', '.vimrc', '1', 'LICENSE', 'README.md', 'Untitled.ipynb', 'assets', 'demos', 'examples', 'experiment-tracking', 'getting-started', 'igz-tutorials-get.sh', 'mlrun', 'update-tutorials.ipynb', 'v3io', 'welcome.ipynb']



uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...9efba3,0,Sep 30 23:21:12,completed,,kind=handlerowner=iguaziohost=jupyter-h41nyj9oi0-ztkn5-7f7b6dbb85-h44sv,,path=/User,return=/User contain 28 files,


type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid 6b8b48659f514766b31e68d5089efba3 
[mlrun] 2019-09-30 23:21:12,682 run executed, status=completed


## Running and linking multiple tasks
in the next example we run two functions, `training` and `validation` and we pass the result from one to the other.<br>
we will see in the 'job' example that linking works even when the tasks run on different processes or containers, or in a workflow.

In [6]:
train_run = fn.run(handler=training, params={'p1': 5})
model_path = train_run.output('model.txt')
validation_run = fn.run(handler=validation, inputs={'model': model_path})

[mlrun] 2019-09-30 23:21:21,903 starting run None uid=359e8a98a83c4e9b95513c90150d1d32
[mlrun] 2019-09-30 23:21:21,934 started training
Run:  (uid=359e8a98a83c4e9b95513c90150d1d32)
Params: p1=5, p2=2



uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...0d1d32,0,Sep 30 23:21:21,completed,,kind=handlerowner=iguaziohost=jupyter-h41nyj9oi0-ztkn5-7f7b6dbb85-h44svcategory=tests,,p1=5,accuracy=10loss=15,model.txt


type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid 359e8a98a83c4e9b95513c90150d1d32 
[mlrun] 2019-09-30 23:21:22,073 run executed, status=completed
[mlrun] 2019-09-30 23:21:22,074 starting run None uid=6513327e0004420b8e87c46c6f548087
[mlrun] 2019-09-30 23:21:22,108 started validation
Run:  (uid=6513327e0004420b8e87c46c6f548087)
file - model.txt:
b'abc is 123'




uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...548087,0,Sep 30 23:21:22,completed,,kind=handlerowner=iguaziohost=jupyter-h41nyj9oi0-ztkn5-7f7b6dbb85-h44sv,model,,,validation.html


type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid 6513327e0004420b8e87c46c6f548087 
[mlrun] 2019-09-30 23:21:22,222 run executed, status=completed


## Define cluster jobs and build images 
in order to use in a cluster we need to package our code and dependencies<br>
the `code_to_function` call will automatically form a `Function` with list of dependencies and runtime configuration<br>
you can apply KubeFlow modifiers to configure resources like Volumes, `mount_v3io()` adds an iguazio v3io volume (Home of current user) to the function 

the `build()` command is optional it pre builds all the dependencies, so the runs will be faster. note the code and params can be updated per run

In [7]:
# create an ML function from the notebook, attache it to iguazio data fabric (v3io)
fn = code_to_function(runtime='job').apply(mount_v3io())

# prepare an image from the dependencies, so we wont need to build the image every run 
fn.build(image='mlrun/nuctest:latest')

[mlrun] 2019-09-16 07:28:03,331 building image (mlrun/nuctest:latest)
FROM python:3.6-jessie
WORKDIR /run
RUN pip install pandas
RUN pip install git+https://github.com/mlrun/mlrun.git@development
ENV PYTHONPATH /run
[mlrun] 2019-09-16 07:28:03,333 using in-cluster config.
[mlrun] 2019-09-16 07:28:03,349 Pod mlrun-build-j9bbw created
..
[36mINFO[0m[0000] Downloading base image python:3.6-jessie     
2019/09/16 07:28:05 No matching credentials were found, falling back on anonymous
[36mINFO[0m[0000] Unpacking rootfs as cmd RUN pip install pandas requires it. 
[36mINFO[0m[0010] Taking snapshot of full filesystem...        
[36mINFO[0m[0013] Skipping paths under /kaniko, as it is a whitelisted directory 
[36mINFO[0m[0013] Skipping paths under /empty, as it is a whitelisted directory 
[36mINFO[0m[0013] Skipping paths under /var/run, as it is a whitelisted directory 
[36mINFO[0m[0013] Skipping paths under /dev, as it is a whitelisted directory 
[36mINFO[0m[0013] Skipping paths

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7efe5ef862e8>

### Run the function on the cluster (build or use pre-built image)
note the listfiles call will return the same results as in the local run since the function shares the same filesystem <br>
`with_code()` will inject the latest code to the function, in case we made changes (it doesnt require a new build)

In [8]:
fn.run(handler=listfiles, params={'path': '/User'})

[mlrun] 2019-09-30 23:24:21,757 starting run nuclio-jobs uid=c5beec088ffd43e7b9555afa817c0405
[mlrun] 2019-09-30 23:24:21,784 using in-cluster config.
[mlrun] 2019-09-30 23:24:21,804 Pod nuclio-jobs-jzjn7 created
....
[mlrun] 2019-09-30 23:24:30,919 starting run nuclio-jobs uid=c5beec088ffd43e7b9555afa817c0405
['.bash_history', '.bash_profile', '.bashrc', '.config', '.gitignore', '.igz', '.ipynb_checkpoints', '.ipython', '.jupyter', '.local', '.python_history', '.pythonlibs', '.viminfo', '.vimrc', '1', 'LICENSE', 'README.md', 'Untitled.ipynb', 'assets', 'demos', 'examples', 'experiment-tracking', 'getting-started', 'igz-tutorials-get.sh', 'mlrun', 'update-tutorials.ipynb', 'v3io', 'welcome.ipynb']

type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid c5beec088ffd43e7b9555afa817c0405 
[mlrun] 2019-09-30 23:24:31,014 run executed, status=completed


uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...7c0405,0,Sep 30 23:24:30,completed,nuclio-jobs,kind=localowner=iguaziohost=nuclio-jobs-jzjn7,,path=/User,return=/User contain 28 files,


type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid c5beec088ffd43e7b9555afa817c0405 
[mlrun] 2019-09-30 23:24:36,057 run executed, status=completed


<mlrun.model.RunObject at 0x7ff4ccf8c940>

In [9]:
# define a run template, specify the artifacts output path, and add a label (can be used for search later)
run_base = NewTask(out_path='/User/mlrun/data').set_label('stage', 'dev')

In [10]:
# run our training task, with hyper params, and select the one with max accuracy
run = run_base.copy().with_hyper_params({'p1': [2,6,4]}, selector='max.accuracy')
train_run = fn.with_code().run(run, handler=training, name='my-training', params={'p1': 9})
model_path = train_run.output('model.txt')

[mlrun] 2019-09-30 23:25:08,874 starting run my-training uid=a7206ee62a544a759b9abc94beed6f9b
[mlrun] 2019-09-30 23:25:08,912 Pod my-training-v8vtd created
..
[mlrun] 2019-09-30 23:25:13,644 starting run my-training uid=a7206ee62a544a759b9abc94beed6f9b
[mlrun] 2019-09-30 23:25:13,716 started training
Run: my-training (uid=a7206ee62a544a759b9abc94beed6f9b-1)
Params: p1=2, p2=2

[mlrun] 2019-09-30 23:25:13,821 started training
Run: my-training (uid=a7206ee62a544a759b9abc94beed6f9b-2)
Params: p1=6, p2=2

[mlrun] 2019-09-30 23:25:13,927 started training
Run: my-training (uid=a7206ee62a544a759b9abc94beed6f9b-3)
Params: p1=4, p2=2

type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid a7206ee62a544a759b9abc94beed6f9b 
[mlrun] 2019-09-30 23:25:14,108 run executed, status=completed


uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...ed6f9b,0,Sep 30 23:25:13,completed,my-training,stage=devkind=localowner=iguazio,,p1=9,best_iteration=2accuracy=12loss=18,model.txtiteration_results


type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid a7206ee62a544a759b9abc94beed6f9b 
[mlrun] 2019-09-30 23:25:19,237 run executed, status=completed


In [11]:
# running validation, use the best model result from the previos step 
fn.run(run_base, handler=validation, name='my-validation', inputs={'model': model_path})

[mlrun] 2019-09-30 23:43:20,172 starting run my-validation uid=17769e1d5e7c4d1c92d7f6f808ac0c46
[mlrun] 2019-09-30 23:43:20,210 Pod my-validation-t4pgp created
..
[mlrun] 2019-09-30 23:43:24,943 starting run my-validation uid=17769e1d5e7c4d1c92d7f6f808ac0c46
[mlrun] 2019-09-30 23:43:25,017 started validation
Run: my-validation (uid=17769e1d5e7c4d1c92d7f6f808ac0c46)
file - /User/mlrun/data/2/model.txt:
b'abc is 123'


type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid 17769e1d5e7c4d1c92d7f6f808ac0c46 
[mlrun] 2019-09-30 23:43:25,079 run executed, status=completed


uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...ac0c46,0,Sep 30 23:43:25,completed,my-validation,stage=devkind=localowner=iguaziohost=my-validation-t4pgp,model,,,validation.html


type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid 17769e1d5e7c4d1c92d7f6f808ac0c46 
[mlrun] 2019-09-30 23:43:30,188 run executed, status=completed


<mlrun.model.RunObject at 0x7ff4f91e4198>

In [12]:
# list all jobs
!mlrun get po 

[mlrun] 2019-09-30 23:44:01,484 using in-cluster config.
state      started          type     name
Succeeded  Sep 24 13:10:58  build    mlrun-build-hp6tb
Succeeded  Sep 24 17:11:39  build    mlrun-build-jh5ff
Succeeded  Sep 30 23:22:43  build    mlrun-build-mdsjr
Succeeded  Sep 24 03:38:03  build    mlrun-build-trt6b
Succeeded  Sep 24 13:13:39  job      my-training-gkt7l
Succeeded  Sep 24 17:14:38  job      my-training-h6kl6
Succeeded  Sep 24 17:13:38  job      my-training-jw6sp
Succeeded  Sep 30 23:25:08  job      my-training-v8vtd
Succeeded  Sep 24 13:13:50  job      my-validation-hnwtr
Succeeded  Sep 30 23:43:20  job      my-validation-t4pgp
Succeeded  Sep 24 17:13:48  job      my-validation-v2dg5
Running    Sep 30 23:14:27  remote   mysrv2-69bcb5f8fb-dkffl
Running    Sep 24 06:38:47  remote   mysrv3-54d4c9d975-7262v
Succeeded  Sep 24 13:13:25  job      nuclio-jobs-42hl2
Succeeded  Sep 24 13:12:33  job      nuclio-jobs-48bvd
Succeeded  Sep 24 17:14:37  job      nuclio-jobs-8pk4l
Suc

In [13]:
# check job logs
!mlrun watch my-training-gkt7l

[mlrun] 2019-09-30 23:45:09,508 using in-cluster config.

[mlrun] 2019-09-24 13:13:45,340 starting run my-training uid=ca4c0d98365047969ae1e43b8ea53cd6
[mlrun] 2019-09-24 13:13:45,459 started training
Run: my-training (uid=ca4c0d98365047969ae1e43b8ea53cd6-1)
Params: p1=2, p2=2

[mlrun] 2019-09-24 13:13:45,555 started training
Run: my-training (uid=ca4c0d98365047969ae1e43b8ea53cd6-2)
Params: p1=6, p2=2

[mlrun] 2019-09-24 13:13:45,636 started training
Run: my-training (uid=ca4c0d98365047969ae1e43b8ea53cd6-3)
Params: p1=4, p2=2

type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid ca4c0d98365047969ae1e43b8ea53cd6 
[mlrun] 2019-09-24 13:13:45,808 run executed, status=completed
Pod my-training-gkt7l last status is: succeeded


## Create a KubeFlow Pipeline

In [14]:
import kfp
from kfp import dsl

In [15]:
artifacts_path = 'v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/'

In [16]:
@dsl.pipeline(
    name='job test',
    description='Shows how to use mlrun.'
)
def tr_pipeline(
   p1 = 9
):
    run = NewTask(handler='training', out_path=artifacts_path, outputs=['model.txt']).with_params(p1=p1)
    train = fn.to_step(run).apply(mount_v3io())

In [17]:
kfp.compiler.Compiler().compile(tr_pipeline, 'trpipe.yaml')

In [18]:
client = kfp.Client(namespace='default-tenant')
arguments = {'p1': 8}
run_result = client.create_run_from_pipeline_func(tr_pipeline, arguments, run_name='tr 1', experiment_name='tr')