# Dask Experiments

## Setting Up
The following page explain in more detail how to set up Dask on a variety of local and distributed hardware.
https://docs.dask.org/en/latest/setup.html

There are two ways to set up Dask on a local computer. Both of these ways are explored in the notebook.
Setting up on a local computer is important to grasp the concepts even if the end objective is to use a multi PC configuration.

There are several multi-computer set up techniques. This document explores two techniques: manual set up and SSH set up.


In [1]:
from dask.distributed import Client, LocalCluster


## Single Machine (local PC)

The `dask.distributed` scheduler works well on a single machine. It is sometimes preferred over the default scheduler. 
You can create a `dask.distributed` scheduler by importing and creating a Client with no arguments. This overrides whatever default was previously set.  In the context of this section, the word distributed is understood to be distributed programmatically on the local physical computer (not distributed in the physical sense).

The `Client()` call used here  is shorthand for creating a `LocalCluster` and then passing that to your client.
You may want to look at the wider range  keyword arguments available on `LocalCluster` to understand the options available to you on handling the mixture of threads and processes, like specifying explicit ports, and so on. For example, if you use the `localCluster` approach you can set the number of workers.

Note that `Client()` and `LocalCluster()` take many optional arguments, to configure the server.

You can navigate to `http://localhost:8787/status` to see the diagnostic dashboard if you have Bokeh installed.

Once the local client is started it will start a local cluster, but all working on the local computer.

The `client.scheduler_info()` command provides information about the client, server and worker setup.

The `client.shutdown()` command can be used to shut down the client on the server

https://docs.dask.org/en/latest/setup/single-distributed.html   
https://distributed.dask.org/en/latest/api.html#distributed.Client   

Dask is not the only means to do parallel processing.  See also this tutorial on multiprocessing.  
https://sebastianraschka.com/Articles/2014_multiprocessing.html

In [2]:
# to use use local host
useSimpleClient = True

if useSimpleClient:
    # simplified setup, less control
    client = Client() 
else:
    # Setup a local cluster, more control
    # By default this sets up 1 worker per core    
    cluster = LocalCluster(n_workers=1)
    client = Client(cluster)

client.get_versions(check=True)

client

0,1
Client  Scheduler: tcp://127.0.0.1:64865  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 34.00 GB


In [3]:
client.scheduler_info()

{'type': 'Scheduler',
 'id': 'Scheduler-e40d7349-ee4d-481f-b416-bc9b75b87b92',
 'address': 'tcp://127.0.0.1:64865',
 'services': {'dashboard': 8787},
 'workers': {'tcp://127.0.0.1:64887': {'type': 'Worker',
   'id': 3,
   'host': '127.0.0.1',
   'resources': {},
   'local_directory': 'V:\\work\\WorkN\\miscellania\\dask\\dask-worker-space\\worker-sz1aoteq',
   'name': 3,
   'nthreads': 2,
   'memory_limit': 8500740096,
   'last_seen': 1586869485.3058014,
   'services': {},
   'metrics': {'cpu': 0.0,
    'memory': 52654080,
    'time': 1586869485.2592325,
    'read_bytes': 0.0,
    'write_bytes': 0.0,
    'executing': 0,
    'in_memory': 0,
    'ready': 0,
    'in_flight': 0,
    'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}},
   'nanny': 'tcp://127.0.0.1:64868'},
  'tcp://127.0.0.1:64889': {'type': 'Worker',
   'id': 2,
   'host': '127.0.0.1',
   'resources': {},
   'local_directory': 'V:\\work\\WorkN\\miscellania\\dask\\dask-worker-space\\worker-a3868zvm',
   'name': 2,

In [4]:
# client.shutdown()

In [5]:
#client.get_versions(check=True)

There are a few different ways to interact with the cluster through the client:

1. The Client satisfies most of the standard concurrent.futures - PEP-3148 interface with `.submit`, `.map` functions and `Future` objects, allowing the immediate and direct submission of tasks https://docs.python.org/3/library/concurrent.futures.html.
1. The Client registers itself as the default Dask scheduler, and so runs all dask collections like `dask.array`, `dask.bag`, `dask.dataframe` and `dask.delayed`
1. The Client has additional methods for manipulating data remotely. See the full API for a thorough list https://distributed.dask.org/en/latest/api.html.

### Dask Dashboard

The Dask dashboard is a bokeh-based display of what is taking place in the server and its workers. For this to work the Python bokeh package must be installed.

Workers capture durations associated to tasks. For each task that passes through a worker we record start and stop times for each of the following:

- Serialization (gray)
- Dependency gathering from peers (red)
- Disk I/O to collect local data (orange)
- Execution times (colored by task)

The main way to observe these times is with the task stream plot on the scheduler's /status page where the colors of the bars correspond to the colors listed above.

https://docs.dask.org/en/latest/diagnostics-distributed.html  
https://distributed.dask.org/en/latest/diagnosing-performance.html  
https://medium.com/@kartikbhanot/dask-scheduler-dashboard-understanding-resource-and-task-allocation-in-local-machines-bc5aa60eca6e  
https://www.youtube.com/watch?v=N_GqzcuGLCY  

There is also a Dask extension for Jupyter notebooks `dask-labextension`.  
https://github.com/dask/dask-labextension

In [6]:
# to obtain the dashboard for the client
def clientDashboardURI(client):
    """Extract the bokeh dashboard URI from the client's scheduler info
    """
    
    dashboardURI = client.scheduler_info()['address'].rsplit(':',1)[0]+':'
    dashboardURI += str(client.scheduler_info()['services']['dashboard'])
    
    return dashboardURI.replace('tcp:','http:')

print(clientDashboardURI(client))

http://127.0.0.1:8787


Open a browser window with the above URI (for the currently running client). Set the `if` condition to `True` and execute the code below to observe the dashboard in action.
The code below is not meaningful it is just meant to keep Dask busy for a while.

While the code is executing, click on the different tabs in the dashboard to learn about the different displays.

In [7]:
if False:
    import dask.array as da
    x = da.random.random((10000, 10000,10), chunks=(1000,1000,5))
    y = da.random.random((10000, 10000,10), chunks=(1000,1000,5))
    z = (da.arcsin(x)+da.arccos(y)).sum(axis=(1,2))
    z.compute()

### Preparing functions

Create a few functions that will be used in the examples.

In [8]:
# to create simple task to experiment with
def inc(x):
    return x + 1

def add(x, y):
    return x + y


### Single function calls

Using the client created above, a single function and a single data (datum?) item will be dispatched to the scheduler and worker. All of the scheduling work is transparent.

We can submit individual function calls with the `client.submit` method.  
The simple example here will execute much faster in normal in-line Python code, the idea here is to show the Dask method. 

The `submit` function returns a `Future`, which refers to a (future) remote result. This result may not yet be completed. Eventually it will complete. The result stays in the remote thread/process/worker until you ask for it back explicitly by calling the `result` method on the future itself (not a client method as for the map case below).

https://distributed.dask.org/en/latest/client.html

In [9]:
# to create a single future
x1 = client.submit(inc, 10)
print(x1)

<Future: pending, key: inc-083b5e2ba45c380966bcb963d4544e5e>


In [10]:
# to see what a future looks like
print(x1)

<Future: finished, type: builtins.int, key: inc-083b5e2ba45c380966bcb963d4544e5e>


In [11]:
# to retrieve a result from a future
x1r = x1.result()
print(x1r)

11


In [12]:
# to check if a future resets or disappears after its initial retrieval
print(x1)
x1r = x1.result()
print(x1r)

<Future: finished, type: builtins.int, key: inc-083b5e2ba45c380966bcb963d4544e5e>
11


You can pass futures as inputs to submit. Dask automatically handles dependency tracking; once all input futures have completed, they will be moved onto a single worker (if necessary), and then the computation that depends on them will be started. You do not need to wait for inputs to finish before submitting a new task; Dask will handle this automatically:

In [13]:
# to create a graph of futures
x1 = client.submit(inc, 10)
x2 = client.submit(inc, 10)
print(x1)
print(x2)
xs = client.submit(add,x1,x2)
print(xs)

<Future: finished, type: builtins.int, key: inc-083b5e2ba45c380966bcb963d4544e5e>
<Future: finished, type: builtins.int, key: inc-083b5e2ba45c380966bcb963d4544e5e>
<Future: pending, key: add-6e6cf98db4b0523bbdc831467bd5720b>


In [14]:
# to print the result of the graph final output
xsr = xs.result()
print(xsr)

22


### Multiple function calls


Using the client created above, a single function and multiple data items will be dispatched to the scheduler and worker. All of the scheduling work is transparent.

Similar to Python's `map`, you can use `Client.map` to call the same function and many inputs.

The returns a list of futures.
These results live on the distributed workers.

We can submit tasks on futures. The function will go to the machine where the futures are stored and run on the result once it has completed.In the example below the list of futures is sent to the built-in Python `sum()` function, which adds the elements of an iterable and returns the sum. 


In [15]:
# to create a map of input values for a function
futures  = client.map(inc, [2, 4, 6])
for future in futures:
    print(future)

<Future: pending, key: inc-423e89f2660795305fd6c03c55d1de5d>
<Future: pending, key: inc-98feef5b8b4598158d0e47a6d4acde9d>
<Future: pending, key: inc-d016ddc28876119154a07cfbe98a9ed7>


The results stay in the remote thread/process/worker until you ask for it back explicitly by calling the client `gather` method (not the future's result method) because here a list must be processed.

In [16]:
# to gather the list of results
futuresr = client.gather(futures)
print(futuresr)

[3, 5, 7]


In [17]:
# to graph a list of futures into another submit
total = client.submit(sum, futures)
print(total)
 
totalr = total.result()
print(totalr)

<Future: pending, key: sum-c34d59da0efacac3f60627d03084e274>
15


## Multiple Machines (local PC and Remote Server(s))
### Setting up


For the sake of these discussions suppose the computers have the following  IP addresses (replace with the IP addresses on your system):
1. Local computer running the `dask.distributed.Client`:  `146.64.202.163`
1. Computer running the scheduler: `146.64.246.94`
1. Computer running the workers: `146.64.246.94`

In this case the scheduler and worker are run on the same server, but could be ran on different computers.

All three computers must have exactly the same versions of Python, dask, pickle, etc., otherwise the following will not work. If required versions are available in a Python environment, the environment must be activated.

Start the scheduler and workers:

1. **Start the scheduler on the server**. Log in to the server (`146.64.246.94`), then activate the Python environment and start the scheduler:

        conda activate devpy37
        dask-scheduler

    If the server already has the necessary Python, dask and pickle versions in the root Python environment there is no need to first activate the Python environment.  In this case the scheduler can be started from any computer by specifying the hostname. So on the local computer the following command will start the scenduler (if versions are correct in the root Python):

        dask-scheduler --host 146.64.246.94

    Either of the above should start the scheduler:
    
        (devpy37) username@servername:~$ dask-scheduler
        distributed.scheduler - INFO - -----------------------------------------------
        distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-mxiy5sli
        distributed.scheduler - INFO - -----------------------------------------------
        distributed.scheduler - INFO - Clear task state
        distributed.scheduler - INFO -   Scheduler at:  tcp://146.64.246.94:8786
        distributed.scheduler - INFO -   dashboard at:                     :8787
        distributed.scheduler - INFO - Register worker <Worker 'tcp://146.64.246.94:45347', name: tcp://146.64.246.94:45347, memory: 0, processing: 0>
        distributed.scheduler - INFO - Starting worker compute stream, tcp://146.64.246.94:45347
        distributed.core - INFO - Starting established connection

2. **Start the worker on the server**  Log in to the server (`146.64.246.94`), then activate the Python environment and start the worker:

        dask-worker tcp://146.64.246.94:8786
    
    where the IP address and port number must correspond to the scheduler IP and port address. This should start the worker task. Note that the dashboard IP address is also given when the workers start. Note that this server has 32 cores and 32 GB memory.
    
        (devpy37) username@servername:~$ dask-worker tcp://146.64.246.94:8786
        distributed.nanny - INFO -         Start Nanny at: 'tcp://146.64.246.94:35459'
        distributed.worker - INFO -       Start worker at:  tcp://146.64.246.94:45347
        distributed.worker - INFO -          Listening to:  tcp://146.64.246.94:45347
        distributed.worker - INFO -          dashboard at:        146.64.246.94:42791
        distributed.worker - INFO - Waiting to connect to:   tcp://146.64.246.94:8786
        distributed.worker - INFO - -------------------------------------------------
        distributed.worker - INFO -               Threads:                         32
        distributed.worker - INFO -                Memory:                   33.71 GB
        distributed.worker - INFO -       Local Directory: /home/username/dask-worker-space/worker-lz08iq0s
        distributed.worker - INFO - -------------------------------------------------
        distributed.worker - INFO -         Registered to:   tcp://146.64.246.94:8786
        distributed.worker - INFO - -------------------------------------------------
        distributed.core - INFO - Starting established connection

3. **Start the client**  When the client is initiated, pass the scheduler IP:port address:

In [18]:
# to start a local client with a remote scheduler
client = Client('146.64.246.94:8786') 
client.get_versions(check=True)


{'scheduler': {'host': (('python', '3.7.6.final.0'),
   ('python-bits', 64),
   ('OS', 'Linux'),
   ('OS-release', '4.9.0-8-amd64'),
   ('machine', 'x86_64'),
   ('processor', ''),
   ('byteorder', 'little'),
   ('LC_ALL', 'None'),
   ('LANG', 'en_ZA.UTF-8')),
  'packages': {'dask': '2.12.0',
   'distributed': '2.12.0',
   'msgpack': '0.6.1',
   'cloudpickle': '1.3.0',
   'tornado': '6.0.4',
   'toolz': '0.10.0',
   'numpy': '1.18.1',
   'lz4': None,
   'blosc': None}},
 'workers': {'tcp://146.64.246.94:45347': {'host': (('python',
     '3.7.6.final.0'),
    ('python-bits', 64),
    ('OS', 'Linux'),
    ('OS-release', '4.9.0-8-amd64'),
    ('machine', 'x86_64'),
    ('processor', ''),
    ('byteorder', 'little'),
    ('LC_ALL', 'None'),
    ('LANG', 'en_ZA.UTF-8')),
   'packages': {'dask': '2.12.0',
    'distributed': '2.12.0',
    'msgpack': '0.6.1',
    'cloudpickle': '1.3.0',
    'tornado': '6.0.4',
    'toolz': '0.10.0',
    'numpy': '1.18.1',
    'lz4': None,
    'blosc': None}}},

In [19]:
client.scheduler_info()

{'type': 'Scheduler',
 'id': 'Scheduler-c18c6181-421b-4826-aea7-b1bba21efeb0',
 'address': 'tcp://146.64.246.94:8786',
 'services': {'dashboard': 8787},
 'workers': {'tcp://146.64.246.94:45347': {'type': 'Worker',
   'id': 'tcp://146.64.246.94:45347',
   'host': '146.64.246.94',
   'resources': {},
   'local_directory': '/home/dgriffith/dask-worker-space/worker-lz08iq0s',
   'name': 'tcp://146.64.246.94:45347',
   'nthreads': 32,
   'memory_limit': 33712070656,
   'last_seen': 1586869487.8488212,
   'services': {'dashboard': 42791},
   'metrics': {'cpu': 2.0,
    'memory': 107896832,
    'time': 1586869487.3504004,
    'read_bytes': 16650.21565100753,
    'write_bytes': 1014.9159528091943,
    'num_fds': 25,
    'executing': 0,
    'in_memory': 0,
    'ready': 0,
    'in_flight': 0,
    'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}},
   'nanny': 'tcp://146.64.246.94:35459'}}}

### Running workers remotely

Now execute the same graph as on the single computer before, but now on the remote server.

To see if the server is running, put the `client()` call in a Python exception.


In [20]:
# to run the graph on a remove server

try:
    client = Client('tcp://146.64.246.94:8786', timeout='2s')
    x1 = client.submit(inc, 10)
    x2 = client.submit(inc, 10)
    print(x1)
    print(x2)
    xs = client.submit(add,x1,x2)
    print(xs)
    # to print the result of the graph final output
    xsr = xs.result()
    print(xsr)
except TimeoutError:
    print('dask scheduler server is not responding, probably not running.')


<Future: pending, key: inc-083b5e2ba45c380966bcb963d4544e5e>
<Future: pending, key: inc-083b5e2ba45c380966bcb963d4544e5e>
<Future: pending, key: add-6e6cf98db4b0523bbdc831467bd5720b>
22


### Debugging

If the scheduler fails to start with messages such as shown below, check to see

1. If starting the scheduler remotely with `dask-scheduler --host 146.64.246.94`: Does the server's root Python have the correct software and versions? If the scheduler is started remotely the server's root Python is used.

1. If the server is started locally on the server with `dask-scheduler `: Does the currently active Python environment (root or other) have the correct software and versions? 

    dask-scheduler --host 146.64.202.118
    distributed.scheduler - INFO - -----------------------------------------------
    distributed.dashboard.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
    distributed.scheduler - INFO - Local Directory: C:\Users\NWillers\AppData\Local\Temp\scheduler-b856sir6
    distributed.scheduler - INFO - -----------------------------------------------
    distributed.scheduler - INFO - Clear task state
    Traceback (most recent call last):
      File "C:\ProgramData\Anaconda3\lib\site-packages\distributed\cli\dask_scheduler.py", line 237, in main
        loop.run_sync(run)
      File "C:\ProgramData\Anaconda3\lib\site-packages\tornado\ioloop.py", line 532, in run_sync
        return future_cell[0].result()
      File "C:\ProgramData\Anaconda3\lib\site-packages\distributed\cli\dask_scheduler.py", line 233, in run
        await scheduler
      File "C:\ProgramData\Anaconda3\lib\site-packages\distributed\scheduler.py", line 1424, in start
        await self.listen(addr, listen_args=self.listen_args)
      File "C:\ProgramData\Anaconda3\lib\site-packages\distributed\core.py", line 319, in listen
        connection_args=listen_args,
      File "C:\ProgramData\Anaconda3\lib\site-packages\distributed\comm\core.py", line 170, in _
        await self.start()
      File "C:\ProgramData\Anaconda3\lib\site-packages\distributed\comm\tcp.py", line 411, in start
        self.port, address=self.ip, backlog=backlog
      File "C:\ProgramData\Anaconda3\lib\site-packages\tornado\netutil.py", line 174, in bind_sockets
        sock.bind(sockaddr)
    OSError: [WinError 10049] The requested address is not valid in its context

In [21]:
# to get software versions
# https://github.com/rasbt/watermark
# An IPython magic extension for printing date and time stamps, version numbers, and hardware information. 
# you only need to do this once
#!pip install watermark

%load_ext watermark
%watermark -v -m -p numpy,pandas,dask,distributed,msgpack,cloudpickle,tornado,toolz,lz4,blosc -g 


CPython 3.7.6
IPython 7.13.0

numpy 1.18.1
pandas 1.0.3
dask 2.12.0
distributed 2.12.0
msgpack 0.6.1
cloudpickle 1.3.0
tornado 6.0.4
toolz 0.10.0
lz4 not installed
blosc not installed

compiler   : MSC v.1916 64 bit (AMD64)
system     : Windows
release    : 7
machine    : AMD64
processor  : Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
CPU cores  : 8
interpreter: 64bit
Git hash   : 5e3e2b79edd9f6ff5b240b3a63c899c8de8f283c
