# Running Dask on the cluster with mlrun

The dask frameworks enables users to parallelize their python code and run it as a distributed process on Iguazio cluster and dramatically accelerate their performance. <br>
In this notebook you'll learn how to create a dask cluster and then an mlrun function running as a dask client. <br>
It also demonstrates how to run parallelize custom algorithm using Dask Delayed option

For more information on dask over kubernetes: https://kubernetes.dask.org/en/latest/

## Basic configuration

Import mlrun and dask. nuclio is used just to convert the code into an mlrun function

In [1]:
#Make sure thar mlrun is installed. if it's already installed then skip this step
#to instlal mlrun run the following

!/User/align_mlrun.sh

Both server & client are aligned (0.7.1).


## Load sample data

In [2]:
!mkdir -p /User/examples/

In [3]:
import mlrun
import requests

source_url = mlrun.get_sample_path("/data/Taxi/yellow_tripdata_2019-01_subset.csv")
response = requests.get(source_url, allow_redirects=True)
with open('/User/examples/ytrip.csv', 'wb') as csv_file:
    csv_file.write(response.content)

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 84.9M  100 84.9M    0     0  21.7M      0  0:00:03  0:00:03 --:--:-- 21.7M


In [4]:
# nuclio: ignore
import nuclio 

In [5]:
from mlrun.execution import MLClientCtx
from mlrun.datastore import DataItem

from dask.distributed import Client
from dask import delayed
from dask import dataframe as dd

import warnings
import numpy as np
import os

warnings.filterwarnings("ignore")

### Create a python function

This simple function reads a csv file using dask dataframe and run group by and describe function on the dataset <br>
It also shows how to use the dask delayed function to run a python API that is not natively supported by Dask and leverage dask to run it as a distributed process . <br>
In this case we run numpy asmatrix which Interpret the input as a matrix. Using Dask Delayed it runs it in parallel

In [6]:
def test_dask(context: MLClientCtx,
              dataset: DataItem,
              dask_client: str=None) -> None:
    
    if dask_client:
        client = Client(dask_client)
    else:
        client = Client()
        
    df = dataset.as_df(df_module=dd)
    df_describe = df.describe().compute()
    df_grpby = df.groupby("VendorID").count().compute()
    df_matrix = delayed(np.asmatrix)(df).compute()

In [7]:
# nuclio: end-code

### Set up the enviroment

In [8]:
import mlrun
artifact_path = mlrun.set_environment(api_path = mlrun.mlconf.dbpath or 'http://mlrun-api:8080',
                                      artifact_path = os.path.abspath('./'))

> 2021-10-12 09:43:54,745 [info] loaded project default from MLRun DB


### Convert the code to MLRun function

Use code_to_function to convert the code to MLRun and specify the configuration for the dask process (e.g. replicas, memory etc) <br>
Note that the resource configurations are per worker

In [9]:
fn = mlrun.code_to_function("test_dask",  kind='job', handler="test_dask").apply(mlrun.mount_v3io())

### Init dask cluster

In [10]:
dsf = mlrun.new_function("dask_init", kind='dask', image='mlrun/ml-models').apply(mlrun.mount_v3io())

In [11]:
dsf.spec.remote = True
dsf.spec.replicas = 2
dsf.spec.max_replicas = 4
dsf.spec.service_type = "NodePort"
dsf.with_requests(mem='2G', cpu='2')

In [12]:
client = dsf.client
client

> 2021-10-12 09:44:06,095 [info] trying dask client at: tcp://mlrun-dask-init-b76f44f5-0.default-tenant:8786
> 2021-10-12 09:44:06,116 [info] using remote dask scheduler (mlrun-dask-init-b76f44f5-0) at: tcp://mlrun-dask-init-b76f44f5-0.default-tenant:8786


0,1
Client  Scheduler: tcp://mlrun-dask-init-b76f44f5-0.default-tenant:8786  Dashboard: http://mlrun-dask-init-b76f44f5-0.default-tenant:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 8.34 GB


## Replace the Dask_Clinet with the client scheduler address (see above)

In [13]:
DATA_URL = '/User/examples/ytrip.csv'
DASK_CLIENT = client.scheduler.address
# e.g. DASK_CLIENT = 'tcp://mlrun-dask-init-9d8122b2-b.default-tenant:8786'

### Run the function

When running the function you would see a link as part of the result. click on this link takes you to the dask monitoring dashboard

In [14]:
fn.run(handler = test_dask,
       inputs={"dataset": DATA_URL},
       params={"dask_client": DASK_CLIENT})

> 2021-10-12 09:44:06,154 [info] starting run test-dask-test_dask uid=3d3ec4b558a140b5849700dc5d77c97c DB=http://mlrun-api:8080
> 2021-10-12 09:44:06,317 [info] Job is running in the background, pod: test-dask-test-dask-w28ff
> 2021-10-12 09:44:20,436 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
default,...5d77c97c,0,Oct 12 09:44:11,completed,test-dask-test_dask,v3io_user=danikind=jobowner=danihost=test-dask-test-dask-w28ff,dataset,dask_client=tcp://mlrun-dask-init-b76f44f5-0.default-tenant:8786,,





> 2021-10-12 09:44:25,557 [info] run executed, status=completed


<mlrun.model.RunObject at 0x7fe5ec4a1d50>

## Track the progress in the UI

Users can view the progress and detailed information in the mlrun UI by clicking on the uid above. <br>
Also, to track the dask progress in the dask UI click on the "dashboard link" above the "client" section