## Command Line


This is the most fundamental way to deploy Dask on multiple machines. In production environments, this process is often automated by some other resource manager. Hence, it is rare that people need to follow these instructions explicitly. But since we want learn how to "build" a cluster we want study how to start it from the command line.

A ```dask.distributed``` network consists of one ```dask-scheduler``` process and several ```dask-worker``` processes that connect to that scheduler. These are normal Python processes that can be executed from the command line. We launch the dask-scheduler executable in one process and the dask-worker executable in several processes, possibly on different machines.

Hence, respect to the revious lectures, today we want to create a cluster "from scratch" using IP addresses and real workers and not the automatic "local cluster" created by dask.



At first let's see how works the ```dask-scheduler``` command

In [None]:
%%bash

dask-scheduler --help

In [None]:
%%bash 

#dask-scheduler run it inside a real bash terminal not from jupyter

The command ```dask-worker``` on the rest of the nodes. Let's see how it works:

In [None]:
%%bash

dask-worker --help

Thisn command must be run by providing the address to the node that hosts ```dask-scheduler```:


In [None]:
%%bash
#dask-worker 192.168.1.12:8687 run int inside the real command line not from jupyter

### Basic concepts
The scheduler and workers both need to accept TCP connections on an open port. By default, the scheduler binds to port 8786 and the worker binds to a random open port. If you are behind a firewall then you may have to open particular ports or tell Dask on a different port.

Dask workers are run within a *nanny* process that *monitors* the worker process and restarts it if necessary.

As we have saw last lecture, Dask schedulers and even workers host interactive diagnostic web servers using the Bokeh server. These are optional, but generally useful to users. The diagnostic server on the scheduler is particularly valuable, and is served on port 8787.

### Try to create a your first cluster

At first run the cell below in order to indentify your network-card and what your IP is:

In [None]:
%%bash 

ifconfig ##only for those of you that does not use the docker cluster

Now open you command line and run:

In [None]:
#dask-scheduler --host IP --port 8786

At this point open another command line and run this command:

In [None]:
#dask-worker IP:8786 --nprocs 2 #this command create 2 workers

when is all up to date try to connect to the dashboard and take a look to your new cluster!
If you have install all the packages in the correct way you should be able to access to the dashboard at: IP:8787

### Run an old exercise over the new cluster

Try to count how many words are present in all the documents over the cluster.

In [None]:
from sklearn.datasets import fetch_20newsgroups
from dask.distributed import Client
import time

categories = [
     'comp.graphics',
     'comp.os.ms-windows.misc',
     'comp.sys.ibm.pc.hardware',
     'comp.sys.mac.hardware',
     'comp.windows.x',
     'misc.forsale',
     'rec.autos',
     'rec.motorcycles',
     'rec.sport.baseball',
     'rec.sport.hockey',
     'sci.crypt',
     'sci.electronics',
     'sci.med',
     'sci.space'
]

dataset = fetch_20newsgroups(subset='train', categories=categories ).data

print("Texts document present on the dataset: "+str(len(dataset)))

def count_word_in_statement(text):
    """
    This function takes a text as input and return the number of the words that it contains
    """
    #time.sleep(0.1)
    splitted_words = text.split()
    return len(splitted_words)

Sequential code:

In [None]:
import time
start = time.time()


total_words_in_all_data = 0
for index in range(0, len(dataset)):
    total_words_in_all_data = total_words_in_all_data + count_word_in_statement(dataset[index])

    
end = time.time()
print("Total word in the dataset: {}".format(total_words_in_all_data))
print("Computation took {}s".format(end-start))

Distributed code:

In [None]:
client = Client('dask-scheduler:8786') #change your setting

In [None]:
import time
start = time.time()

#futures = client.map(count_word_in_statement, dataset)
futures = [client.submit(count_word_in_statement, data) for data in dataset]

futures = client.submit(sum, futures)
total_words_in_all_data = client.gather(futures)

    
end = time.time()
print("Total word in the dataset: {}".format(total_words_in_all_data))
print("Computation took {}s".format(end-start))


In [None]:
client.close()

### Why in this case sequential code took more than 10 times les than ditributed version?

In general when yuo have to deal with a cluster you have to think about the *overhead* You can image the overhead like the the computational time necessary process your data. 
Typically in a cluster there two kind of overhead:
+ scheduler overhead in serializing the objects that must be sent to workers
+ connection overhead. The speed of the network connection between the cluster nodes

In the first case, the scheduler adds about one millisecond of overhead per task or Future object. Despite this may sound fast or inconsequential, it's quite slow if you run a large number of tasks. Under this perspective, a larger number of the task means a larger amount of time to create the Future objects of the tasks. 
In the light of above, if your functions run faster than 100ms or so then you might not see any
speedup from using distributed computing, but even worse, probably you might see that the performances get worse.

In the second case things are different. The connection overhead may depends from several factors including the stability of the network, the type (wired or WiFi or optic fibe), and bandwith of the network.

This is what is happening in the previous example.

Let's try to introduce a simulation of intesive computation (a sleep of 10ms: 10 times less the the overhead generated by dask-scheduler):

In [None]:
def count_word_in_statement(text):
    """
    This function takes a text as input and return the number of the words that it contains
    """
    splitted_words = text.split()
    time.sleep(0.01)
    return len(splitted_words)

Try to run and wait the sequantial code:

In [None]:
import time
start = time.time()


total_words_in_all_data = 0
for index in range(0, len(dataset)):
    total_words_in_all_data = total_words_in_all_data + count_word_in_statement(dataset[index])
    end = time.time() - start
    if end >= 50:
        print("More than {}s of computation time...".format(end))

    
end = time.time()
print("Total word in the dataset: {}".format(total_words_in_all_data))
print("Computation took {}s".format(end-start))

Distributed version:

In [None]:
client = Client('dask-scheduler:8786') #change your setting

In [None]:
import time
start = time.time()

futures = [client.submit(count_word_in_statement, data) for data in dataset]
futures = client.submit(sum, futures)
total_words_in_all_data = client.gather(futures)

    
end = time.time()
print("Total word in the dataset: {}".format(total_words_in_all_data))
print("Computation took {}s".format(end-start))

In [None]:
client.close()

What happens if we increment the number of workers or threads per worker?

If you feel like a hero and you don't be afraid to become old by standing in front of the PC, you can try to compare the sequential code and the distributed code by increasing the sleep time 100ms or 1s

### How works the distribution and the scheduling of the processes?

#### How a worker is choosen?
Even though you can reduce and make some restrictions, e.g: restriction over worker resources, Dask automatically decides the suitable workers for your tasks by figuring out the optimized worker for each task.
This means that, if a task has significant data dependencies or if the workers are under heavy load then this choice of worker can strongly impact global performance because the decision becomes heavy.

Dask follows the following rules before to assign a task to a worker:
+ If the task has no major dependencies and no restrictions then we find the least occupied worker.

+ if a task has user-provided restrictions (for example it must run on a machine with a GPU) then we restrict the available pool of workers to just that set, otherwise, we consider all workers
+ from the pool of workers Dask determinates the workers to whom the least amount of data would need to be transferred (means less overhead on the cluster and hence computation optimization).
+  if some dependencies in the graph can be broken the will be assigned to the worker that currently has the fewest tasks.


Dask also allows modifying the worker decision function in order to be more flexible and to improve the customization of a cluster. This means that particular processes or particular computational fields in which performances can be improved by customizing and optimizing the task's assignation decision can be made more performant.

Breaking the dependencies in some cases is necessary, especially if each node has a lot of sons. In this case, each node with his sons must be removed from the graph and computed alone. This has a huge impact on performances and memory. On the other hand, this means that when a user submits a task, the computation graph must be scan to figuring out and optimizing this kind of dependencies.


#### How choose the next task?
Typically Dask follows those rules in order to choose the next task that must be executed:
+ Run tasks on a first-come-first-served basis for fairness between multiple clients
+ Run tasks that are part of the critical path in an effort to reduce total running time and minimize straggler workloads
+ Run tasks that allow us to release many dependencies in an effort to keep the memory footprint small
+ Run tasks that are related so that large chunks of work can be completely eliminated before running new chunks of work

As you can see a part of the overhead on a cluster is principally caused by the optimization of the execution task decision. Even though these are rules implemented by Dask, in general, the majority of the Cluster, even if they are based on other frameworks and other architectures, follow the same similar approaches.

On the other hand, some computational fields may require a different approach to decide which tasks can be executed, e.g: by using last-in-first-out approach or by giving a priority to each task in order to execute first some processes.

In some cases, Dask optimization exploit also a partially last-in-first-out approach. When a worker finishes a task the immediate dependencies of that task get top priority. This encourages a behavior of finishing ongoing work immediately before starting new work. This often conflicts with the first-come-first-served objective but often results in shorter total runtimes and significantly reduced memory footprints.


#### Where these decisions are made?

The decision are basically made ina small steps and in a different computation steps by client, scheduler, and workers:

+ As we submit a graph from the *client* to the scheduler we automatically assign a numeric priority to each task of that graph. This priority focuses on computing deeply before broadly, preferring critical paths, and preferring nodes with many dependencies.

+ When the graph reaches the scheduler the scheduler changes each of these numeric priorities into a tuple of two numbers, the first of which is an increasing counter, the second of which is the client-generated priority described above. This per-graph counter encourages a first-in-first-out policy between computations. All tasks from a previous call to compute have a higher priority than all tasks from a subsequent call to compute (or submit, persist, map, or any operation that generates futures).

+ Whenever a task is ready to run the scheduler assigns it to a worker. The scheduler does not wait based on priority. However when the worker receives these tasks it considers their priorities when determining which tasks to prioritize for communication or for computation. 


### Worker Resources

Let's suppose that you want to run a proces over a cluster, but only in those machine that has a GPU or have at least 16Gb of RAM. Now let's imagin that you have a cluster of ten computers in which four have a GPU while the others no. In this case we want to balance tasks across the cluster with these resource constraints in mind, allocating GPU-constrained tasks to GPU-enabled workers. Additionally we need to be sure to constrain the number of GPU tasks that run concurrently on any given worker to ensure that we respect the provided limits.
Clearly, this situation arises not only for GPUs but for many resources like tasks that require a large amount of memory at runtime, special disk access, or access to special hardware.

When you require workers with particular resources you must be sure that those resources are availables over the cluster.
Otherwise your processes should be never executed.

Let's try an example togheter: 

At first, stop the workers that you have in your cluster and the scheduler. Start again the scheduler and then turn up two workers with those commands:

+ ```dask-worker dask-scheduler:8786 --nprocs 1 --nthreads 1 --resources "GPU=2"```
+ ```dask-worker dask-scheduler:8786 --nprocs 1 --nthreads 1 --resources "GPU=1"```

In [None]:
import numpy as np
client = Client('dask-scheduler:8786') ##change your settings

In [None]:
client

In [None]:
matrices = []

for i in range(100):
    matrices.append(np.random.rand(4,3))

def compute_polynomial_kernel(matrix):
    polynomial_degree = 2
    return np.power((np.dot(matrix, matrix.T)+1), 2)

Since we are working on a matrix computation let's suppose that we want exploit the multi-gpu available only on some workers. 
Assume that we need to use 3 GPUs:

In [None]:
processed = [client.submit(compute_polynomial_kernel, matrix, resources={'GPU': 3}) for matrix in matrices]

kernels = client.gather(processed)

for i in kernels:
    print("Kernel is: {}".format(i))
    print()
client.close()

nothing is happening but the code still running...... Let's try to add on-the-fly a worker with 3 GPUs

```dask-worker 192.168.1.12:8786 --nprocs 1 --nthreads 1 --resources "GPU=3"```

### Exercise 1:

Compute the traces of all the generated matrix. Execute this code over 2 workers with 2 "SpecialCPU" each one.
You must use the ```map``` function.

In [None]:
matrices = [np.random.randint(low=m, high=m+1, size=(4, 3)) for m in (range(11))]

In [None]:
client = Clien

### Exercise 2:

Execute the code of the "howmany_within_range" exercise from the previous lecture, into a worker with 256Gb of RAM. Map function is not allowed.
    

    