# Using MLRUN with Dask Distributed Jobs

In [1]:
# recommended, installing the exact package versions as we use in the worker
#!pip install dask==2.12.0 distributed==2.14.0

## Writing a function code

In [2]:
# function that will be distributed
def inc(x):
    return x + 2

The MLRun context in the case of Dask will have an extra param `dask_client`
which is initialized based on the function spec (below), and can be used to submit Dask commands.

In [3]:
# wrapper function, uses the dask client object
def hndlr(context, x=1, y=2):
    context.logger.info("params: x={},y={}".format(x, y))
    print("params: x={},y={}".format(x, y))
    x = context.dask_client.submit(inc, x)
    print(x)
    print(x.result())
    context.log_result("y", x.result())

In [4]:
# mlrun: end-code
# marks the end of a code section, do not delete

In [6]:
from mlrun import new_function, mlconf, code_to_function, mount_v3io, new_task

# mlconf.dbpath = 'http://mlrun-api:8080'

## Define a Dask function object
dask functions can be local (local workers), or remote (use containers in the cluster),
in the case of `remote` users can specify the number of replica (optional) or leave blank for auto-scale.

We use `code_to_function()` which packs the function code into the function object/yaml (eliminate the need to update the function image), we can use `new_function()` if the function code is part of the image or can be remotely mounted (e.g. via v3io mount).

Dask function spec have several unique attributes:
* **.remote** - bool, use local or clustered dask
* **.replicas** - number of desired replicas, keep 0 for auto-scale
* **.min_replicas, .max_replicas** - set replicas range for auto-scale
* **.scheduler_timeout** - cluster will be killed after timeout (inactivity), default is `'60 minutes'`
* **.nthreads** - number of worker threads
* **.kfp_image** - optional, container image to use by KFP Pipeline runner (default to mlrun/dask)

If you want to access the dask dashboard or scheduler from remote you need to use `NodePort` service type (set **.service_type** to 'NodePort'), and the external IP need to be specified in mlrun configuration (`mlconf.remote_host`), this will be set automatically if you are running on an Iguazio cluster.

In [7]:
# create the function from the notebook code + annotations, add volumes
dsf = code_to_function("mydask", kind="dask").apply(mount_v3io())

In [8]:
dsf.spec.image = "mlrun/ml-models"
dsf.spec.remote = True
dsf.spec.replicas = 1
dsf.spec.service_type = "NodePort"

## Build the function with extra packages
We can skip the build section if we don't add packages (instead need to specify the image e.g. `dsf.spec.image='mlrun/ml-models'` which contains most of the packages you may need) 

In [9]:
# uncomment if you want to add packages to the workers and build a new image
# dsf.build_config(base_image='mlrun/ml-models', commands=['pip install pandas'])
# dsf.deploy()

## Run a task using our distributed dask function (cluster)

In [10]:
myrun = dsf.run(handler=hndlr, params={"x": 12})

In [11]:
# get the function status and addresses
dsf.status.to_dict()

### Accessing the dask client directly
You can get the dask client object and cluster information by reading the client object.

> Note: the cluster can timeout, when you call the client MLRun will also verify the cluster is live and active and if not it will restart the dask cluster and refresh the function object with the latest addresses and status.

In [12]:
c = dsf.client

### Access a dask function using the DB
If we want to access the dask function (or its cluster), we can load the function object from the DB (assuming we already .run() or .save() it).

This can be useful if we want to load the same function in a different notebook or container, or if we restarted our notebook

In [14]:
from mlrun import import_function

# Functions url: db://<project>/<name>[:tag]
dsf_obj = import_function("db://default/mydask")
c = dsf_obj.client

## Building a Pipeline using dask functions

In [15]:
import kfp
from kfp import dsl
from mlrun import run_pipeline

In [16]:
@dsl.pipeline(name="dask_pipeline")
def dask_pipe(x=1, y=10):
    # use_db option will use a function (DB) pointer instead of adding the function spec to the YAML
    myrun = dsf.as_step(
        new_task(handler=hndlr, name="dask_pipeline", params={"x": x, "y": y}),
        use_db=True,
    )

    # if the step (dask client) need v3io access u should add: .apply(mount_v3io())

    # if its a new image we may want to tell Kubeflow to reload the image
    # myrun.container.set_image_pull_policy('Always')

In [17]:
# for pipeline debug
kfp.compiler.Compiler().compile(dask_pipe, "daskpipe.yaml", type_check=False)

In [18]:
arguments = {"x": 4, "y": -5}
artifact_path = "/User/test"
run_id = run_pipeline(
    dask_pipe,
    arguments,
    artifact_path=artifact_path,
    run="DaskExamplePipeline",
    experiment="dask pipe",
)

In [None]:
from mlrun import wait_for_pipeline_completion, get_run_db

wait_for_pipeline_completion(run_id)
db = get_run_db().connect()
db.list_runs(project="default", labels=f"workflow={run_id}").show()