# Parquet to Dask
load a parquet dataset into as dask cluster

Once the client connects to the dask cluster we can check for package consistency by calling `dask_client.get_versions(check=True)`.  Scheduler and worker
nodes are guaranteed to have the same package versions in `mlrun` since mismatches would likely cause serialization failures.  The client (in this notebook or in some other process) may have different versions and this mismatch is **less likely** to cause issues.  The following installs can be performed to ensure this mismatch becomes **unlikely** (as of this writing). **This may need to be done periodically as the dockerfiles refresh, unless you specifiy an image tag.**

    !python -m pip install --upgrade pip
    !python -m pip uninstall -y cmake
    !python -m pip install cmake
    !python -m pip install scikit-build

    !python -m pip install lz4
    !python -m pip install git+https://github.com/Blosc/python-blosc.git@v1.8.3
    !python -m pip install cloudpickle==1.2.2

    !python -m pip install numpy==1.17.4

## clean up
delete stale pods

In [1]:
# !mlrun clean -p -r

In [2]:
import mlrun

mlrun.mlconf.dbpath = 'http://mlrun-api:8080'

## parameters


In [3]:
DESCRIPTION        = 'load parquet dataset into a dask cluster'       

IMAGE              = 'yjbds/mlrun-dask:dev'                           # custom image
JOB_KIND           = 'dask'                                           

TASK_NAME          = 'user-task-parq-to-dask'                         # whatever

ARTIFACTS_PATH     = '/User/repos/demos/dask/artifacts'
YAML_PATH          = '/User/repos/demos/dask/yaml'

DATA_PATH          = '/User/repos/demos/dask/dataset/partitions'
DOWNSAMPLE         = 0.01
PARTITION_COLS     = ['Year', 'Month']
PARTITION_COLS     = None

DASK_SHARDS        = 8
DASK_THREADS_PER   = 8
MEMORY_LIMIT       = '5GB'

DASK_KEY           = 'airlines'

## load and configure function

In [4]:
FUNC_PY   = '/User/repos/demos/dask/code/parquet-to-dask.py'
FUNC_YAML = '/User/repos/demos/dask/yaml/parquet-to-dask.yaml'

HANDLER   = 'parquet_to_dask'

#### some of the following cells can be commented out if this notebook has already been run.

In [5]:
# load function from a local Python file
parq2dask = mlrun.new_function(command=FUNC_PY, 
                               image=IMAGE,
                               kind=JOB_KIND)

parq2dask.spec.remote = True
parq2dask.spec.replicas = DASK_SHARDS 
parq2dask.spec.max_replicas = DASK_SHARDS
parq2dask.spec.service_type = 'NodePort'
parq2dask.spec.image_pull_policy = 'Always'
parq2dask.spec.build.image = IMAGE

parq2dask.export(FUNC_YAML)
# parq2dask = mlrun.import_function(FUNC_YAML)

[mlrun] 2020-02-17 14:25:32,208 function spec saved to path: /User/repos/demos/dask/yaml/parquet-to-dask.yaml


In [6]:
parq2dask.apply(mlrun.mount_v3io())
parq2dask.deploy()

# create and run the task
parq_to_dask_task = mlrun.NewTask(
    TASK_NAME, 
    handler=HANDLER,  
    params={
        'parquet_url'      : DATA_PATH,
        'sample'           : DOWNSAMPLE,
        'shards'           : DASK_SHARDS,
        'threads_per'      : DASK_THREADS_PER,
        'memory_limit'     : MEMORY_LIMIT,
        'dask_key'         : DASK_KEY,
        'target_path'      : ARTIFACTS_PATH})

# run
rn = parq2dask.run(parq_to_dask_task)

[mlrun] 2020-02-17 14:25:32,604 starting remote build, image: yjbds/mlrun-dask:dev
[mlrun] 2020-02-17 14:25:32,894 starting run user-task-parq-to-dask uid=08ab94fc8039498e81c52326fd9a8f03  -> http://mlrun-api:8080
[mlrun] 2020-02-17 14:25:34,266 saving function: parquet-to-dask, tag: latest
[mlrun] 2020-02-17 14:25:43,075 using remote dask scheduler (mlrun-parquet-to-dask-8f759519-5) at: tcp://mlrun-parquet-to-dask-8f759519-5.default-tenant:8786



lz4
+-----------+---------+
|           | version |
+-----------+---------+
| client    | 3.0.2   |
| scheduler | 2.2.1   |
+-----------+---------+

msgpack
+-----------+---------+
|           | version |
+-----------+---------+
| client    | 0.6.1   |
| scheduler | 0.6.2   |
+-----------+---------+


[mlrun] 2020-02-17 14:25:43,076 sleeping...
[mlrun] 2020-02-17 14:27:43,176 found cluster...
[mlrun] 2020-02-17 14:27:43,176 <Client: 'tcp://10.233.64.32:8786' processes=8 threads=8, memory=32.87 GB>
[mlrun] 2020-02-17 14:29:53,820 column header ['Year' 'Month' 'DayofMonth' 'DayOfWeek' 'CRSDepTime' 'UniqueCarrier'
 'ArrDelay' 'Origin' 'Dest' 'Distance']
[mlrun] 2020-02-17 14:29:54,484 log artifact scheduler at /User/repos/demos/dask/artifacts/scheduler.json, size: None, db: Y

[mlrun] 2020-02-17 14:29:54,514 run ended with state 


  pd.set_option('display.max_colwidth', -1)


uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...9a8f03,0,Feb 17 14:25:34,completed,user-task-parq-to-dask,kind=daskowner=adminhost=jupyter-1-56795c755c-pn656,,parquet_url=/User/repos/demos/dask/dataset/partitionssample=0.01shards=8threads_per=8memory_limit=5GBdask_key=airlinestarget_path=/User/repos/demos/dask/artifacts,,scheduler


to track results use .show() or .logs() or in CLI: 
!mlrun get run 08ab94fc8039498e81c52326fd9a8f03  , !mlrun logs 08ab94fc8039498e81c52326fd9a8f03 
[mlrun] 2020-02-17 14:29:54,562 run executed, status=completed


### the scheduler artifact can be shared

In [7]:
print(rn.metadata.to_yaml())

uid: 9f721ce6c1b54eec947812b2dcf4077c
name: user-task-parq-to-dask
project: ''
labels:
  kind: dask
  owner: admin
  host: jupyter-1-56795c755c-bss6b
iteration: 0



In [8]:
rn.metadata.uid

'9f721ce6c1b54eec947812b2dcf4077c'

In [9]:
rn.outputs['scheduler']

'/User/repos/demos/dask/artifacts/scheduler.json'

#### What's the scheduler address?

In [10]:
import json
json.load(open(rn.outputs['scheduler']))

{'type': 'Scheduler',
 'id': 'Scheduler-dbf3387b-ef6f-4d2a-88d3-0d00c2f9cbf5',
 'address': 'tcp://10.233.64.49:8786',
 'services': {},
 'workers': {'tcp://10.233.64.52:46074': {'type': 'Worker',
   'id': 'tcp://10.233.64.52:46074',
   'host': '10.233.64.52',
   'resources': {},
   'local_directory': '/worker-rn044qhe',
   'name': 'tcp://10.233.64.52:46074',
   'nthreads': 1,
   'memory_limit': 4109136640,
   'last_seen': 1581934194.464667,
   'services': {},
   'metrics': {'cpu': 2.0,
    'memory': 92114944,
    'time': 1581934194.4638023,
    'read_bytes': 285.78142099415084,
    'write_bytes': 811.3794190463303,
    'num_fds': 22,
    'executing': 0,
    'in_memory': 0,
    'ready': 0,
    'in_flight': 0,
    'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}},
   'nanny': 'tcp://10.233.64.52:42829'},
  'tcp://10.233.64.53:37758': {'type': 'Worker',
   'id': 'tcp://10.233.64.53:37758',
   'host': '10.233.64.53',
   'resources': {},
   'local_directory': '/worker-uuxi6ni8',

### notebook cluster

Let's load the scheduler file into a cluster in this notebook using our MLRun artifact:

In [11]:
# import dask
# import dask.dataframe as dd
# from dask.distributed import Client, LocalCluster

#client = Client(scheduler_file=rn.outputs['scheduler'])

#df = client.get_dataset(DASK_KEY)

### create a component 'on the fly' to summarise the table

The nice thing about having a dask cluster loaded with all your data is that you can write _quick and dirty_ jobs either in your notebook, a local file, or a github repo. Here we use a local file:

In [12]:
# write up function in local directory
summ = mlrun.new_function(command='/User/repos/demos/dask/code/describe.py', 
                          image=IMAGE,
                          kind='job')

summ.spec.build.image = IMAGE

summ.export('/User/repos/demos/dask/yaml/describe.yaml')

summ.apply(mlrun.mount_v3io())

summ.deploy(skip_deployed=True, with_mlrun=False)

# create the task
summ_task = mlrun.NewTask(
    'user-task-my-sum', 
    handler='table_summary',  
    params={
        'dask_key'   :  DASK_KEY,
        'dask_client': 'scheduler.json',
        'target_path':  ARTIFACTS_PATH,
        'name'       : 'table-summary.csv',
        'key'        : 'table-summary'})

# run
rn2 = summ.run(summ_task)

rn2.outputs

[mlrun] 2020-02-17 11:24:37,259 function spec saved to path: /User/repos/demos/dask/yaml/describe.yaml
[mlrun] 2020-02-17 11:24:37,267 starting run user-task-my-sum uid=953a38322f474c958be0044685a6280d  -> http://mlrun-api:8080
[mlrun] 2020-02-17 11:24:37,334 Job is running in the background, pod: user-task-my-sum-zk96n
[mlrun] 2020-02-17 11:33:16,255 log artifact table-summary at /User/repos/demos/dask/artifacts/table-summary.csv, size: None, db: Y

[mlrun] 2020-02-17 11:33:16,282 run executed, status=completed
final state: succeeded


uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...a6280d,0,Feb 17 11:24:45,completed,describe,host=user-task-my-sum-zk96nkind=jobowner=admin,,dask_client=scheduler.jsondask_key=airlineskey=table-summaryname=table-summary.csvtarget_path=/User/repos/demos/dask/artifacts,,table-summary


to track results use .show() or .logs() or in CLI: 
!mlrun get run 953a38322f474c958be0044685a6280d  , !mlrun logs 953a38322f474c958be0044685a6280d 
[mlrun] 2020-02-17 11:33:18,866 run executed, status=completed


{'table-summary': '/User/repos/demos/dask/artifacts/table-summary.csv'}

### tests

In [13]:
import pandas as pd
pd.read_csv(rn2.outputs['table-summary'])

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,CRSDepTime,ArrDelay,Distance
0,1235350.0,1235350.0,1235350.0,1235350.0,1235350.0,1209458.0,1233356.0
1,1998.624,6.553846,15.71438,3.940669,1334.374,7.015957,701.7913
2,6.226752,3.444445,8.786459,1.989997,476.5202,30.75238,551.7167
3,1987.0,1.0,1.0,1.0,0.0,-1287.0,11.0
4,1995.0,5.0,11.0,3.0,1127.5,-4.0,405.0
5,2003.0,9.0,20.0,5.0,1530.0,4.25,696.0
6,2008.0,12.0,31.0,7.0,2063.75,65.5,2490.5
7,2008.0,12.0,31.0,7.0,2400.0,1634.0,4983.0


distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/conda/lib/python3.6/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "/conda/lib/python3.6/site-packages/distributed/client.py", line 1306, in _close
    await asyncio.wait_for(asyncio.gather(*coroutines), 2)
  File "/conda/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
    yield from waiter
  File "/conda/lib/python3.6/asyncio/futures.py", line 327, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/conda/lib/python3.6/asyncio/tasks.py", line 250, in _wakeup
    future.result()
  File "/conda/lib/python3.6/asyncio/futures.py", line 238, in result
    raise CancelledError
concurrent.futures._base.CancelledError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/conda/lib/python3.6/site-packages/distributed/utils.py", line 662, in lo