# **DASK DISTRIBUTED** 

Dask.distributed is a `centrally managed`, `distributed`, `dynamic task scheduler`. The central `dask-scheduler` process coordinates the actions of several `dask-worker` processes spread across multiple machines and the concurrent requests of several clients.

![DASK DISTRIBUTED](img/distributed-overview.svg)

## **DASK DISTRIBUTED SCHEDULER**

In [6]:
import dask
from dask.distributed import Client
import numpy as np


In [9]:
client = Client(n_workers=4, processes=4,threads_per_worker=4, memory_limit='1GB')
# client = Client()
client


Perhaps you already have a cluster running?
Hosting the HTTP server on port 40611 instead


0,1
Client  Scheduler: tcp://127.0.0.1:44479  Dashboard: http://127.0.0.1:40611/status,Cluster  Workers: 4  Cores: 16  Memory: 4.00 GB


#### **WORKING OF THE DISTRIBUTED SCHEDULER**

To begin, the futures interface (derived from the built-in `concurrent.futures`) allow map-reduce like functionality. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with `submit()` and `map()`. Notice that the call returns immediately, giving one or more futures, whose status begins as "pending" and later becomes "finished". There is no blocking of the local Python session.

In [6]:
def func(x):
    return x + 1

fut = client.submit(func, 9)
fut

In [5]:
fut

In [7]:
x = client.submit(np.random.random, (10000, 10000))
type(x)

distributed.client.Future

In [8]:
x.result().shape #slow as Gather the numpy array to the local process, access the .shape attribute

(10000, 10000)

In [9]:
client.submit(lambda a: a.shape, x).result() #fast as  Send a lambda function up to the cluster to compute the shape

(10000, 10000)

#### **SCHEDULING STATE AND POLICIES**

Internally, the scheduler moves tasks between a fixed set of states, notably released, waiting, no-worker, processing, memory, error.

Tasks flow along the following states with the following allowed transitions:

![DASK DISTRIBUTED](img/task-state.svg)

    * Released: Known but not actively computing or in memory
    * Waiting: On track to be computed, waiting on dependencies to arrive in memory
    * No-worker: Ready to be computed, but no appropriate worker exists (for example because of resource restrictions, or because no worker is connected at all).
    * Processing: Actively being computed by one or more workers
    * Memory: In memory on one or more workers
    * Erred: Task computation, or one of its dependencies, has encountered an error
    * Forgotten (not actually a state): Task is no longer needed by any client or dependent task]

[Indepth scheduling and worker state documentation](https://distributed.dask.org/en/latest/scheduling-state.html)

#### Choosing workers:
    1. If the task has no major dependencies and no restrictions then we find the least occupied worker.
    2. Otherwise, if a task has user-provided restrictions (for example it must run on a machine with a GPU) then we restrict the available pool of workers to just that set, otherwise we consider all workers
    3. From among this pool of workers we determine the workers to whom the least amount of data would need to be transferred.
    4. We break ties by choosing the worker that currently has the fewest tasks, counting both those tasks in memory and those tasks processing currently.

#### **WORKER**

#### The two main functions of worker:
    1. Compute tasks as directed by the scheduler
    2. Store and serve computed results to other workers or clients (Data is stored locally in a dictionary in the .data attribute that maps keys to the results of function calls. i.e worker.data)

### **Thread Pooling tip**:
    1. **CASE-1** : Run dask-worker with many threads and one process
        * Where computations are mostly numeric in nature(NumPy and Pandas computations
        * Releases the GIL
        * Computations occur in the same process as the Worker communication server so that they can access and share data efficiently between each other.
        *Reduces communication costs and generally simplifies deployment.
        
    2. **CASE-2** : Run dask-worker with many process and one thread per process
        * Where computations are mostly Python code
        * Don't release the GIL
        * Not much inter-worker communication required.

        

### Worker Stealing
     Computation to Communication Ratio
     Saturated Worker Burden
     Replicate Popular Data
     Steal from the Rich
     Restrictions

Note: We can disable work stealing by setting `DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING="False"` on ` dask-scheduler`
     
    

### **DASK COMPUTATION**
Data and Computation in Dask.distributed are always in one of three states



![title](img/dask-computation.png)



### **Build scalable custom applications** 

    
    • Dynamically submit tasks
    • Millisecond latencies
    • Coroutines with async/await
    • high level coordination
        ◦ Locks
        ◦ Queues
        ◦ Pub/Sub
        ◦ Actors
    • Integrates with any asyncio/tornado

[Dask limitations](https://distributed.dask.org/en/latest/limitations.html)