# Scaling parallelization to servers, clusters and larger machines

- scaling up to high performance computing systems is very important in Data Science

- more data means more memory usage

- parallelizable code is crucial to getting things done

Small Astrophysics example:

A student wants to calculate complex models of an astrophysical object. The parameters to choose are not well known. What can we do?

One possibility is to fit the model to the data.
That however does not work because of the high dimensionality of the model which has 13 parameters. Of which 9 are not well known.

Another possibility is to calculate a lot of models across the range of parameters we know are possible.
After the calculation we can test for model/data agreement. (see chi2-testing)

But how many models do we need? If we want to have meaningful spacing across the whole parameter range we need to calculate $  \approx 10^{9}$ models.

One model evaluation takes about 30 seconds in optimized C code. Quick calculation tells us that this would take around 1000 years of computing time if done on a single CPU.

The following notebook will show how such a task can be send to a cluster to get the calculations done in reasonable time using dask.

### From the dask-project page

#### Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas but can operate in parallel on datasets that don’t fit into main memory. Dask’s high-level collections are alternatives to NumPy and Pandas for large datasets.

### What does that mean?

Sometimes our data is simply too large to fit into our computers/servers memory.
Dask can simplify our life by natively splitting data up so we do not have to worry about memory issues anymore.

### Let us do some stuff with those abstract objects before we scale up to clusters

In [1]:
import pandas as pd
import random
import dask.dataframe as ddf
import time

names = ["Albert","John","Richard","Henry","William"]
surnames = ["Goodman","Black","White","Green","Joneson"]
salaries = [500*random.randint(10,30) for _ in range(10)]

def generate_random_person(names, surnames, salaries):
    return {"name":random.sample(names,1)[0],
            "surname":random.sample(surnames,1)[0],
            "salary":random.sample(salaries,1)[0]}

def generate_people(k):
    return [generate_random_person(names, surnames, salaries) for _ in range(k)]

def benchmark(function, function_name):
    start = time.time()
    function()
    end = time.time()
    print("{0} seconds for {1}".format((end - start), function_name))


df = pd.DataFrame(generate_people(5000000),columns=["name","surname","salary"])

dd = ddf.from_pandas(df, npartitions=8) #this creates a dask dataframe from the pandas dataframe with 10 partitions

# add some columns
df['bonus'] = df['salary']*.5
dd['bonus'] = dd['salary']*.5

def get_bonus(df):
    df['bonus'] = df['salary']*.5

def test_1():
    get_bonus(df)
    
def test_2():
    get_bonus(dd)

benchmark(test_1, 'dask df')
benchmark(test_2, 'pandas df')

# for dask data frames we have to call the "compute" function to actually do stuff. This is called lazy execution. 
# https://en.wikipedia.org/wiki/Lazy_evaluation
%time dd['salary'].mean().compute()
%time df['salary'].mean()

0.03229212760925293 seconds for dask df
0.0043182373046875 seconds for pandas df
CPU times: user 3.81 s, sys: 4.97 s, total: 8.78 s
Wall time: 1.11 s
CPU times: user 28.7 ms, sys: 26.7 ms, total: 55.5 ms
Wall time: 6.94 ms


10599.9766

### This is not a speedup. Yet!

As we have seen in the last chapter distributing also takes time. This is what happens here. Maybe we just need more partitions. Note that at some point more partitions don't speed up anything anymore

In [2]:
dd1 = ddf.from_pandas(df, npartitions=16)
%time dd1['salary'].mean().compute()
dd2 = ddf.from_pandas(df, npartitions=24)
%time dd2['salary'].mean().compute()
dd3 = ddf.from_pandas(df, npartitions=40)
%time dd3['salary'].mean().compute()
dd4 = ddf.from_pandas(df, npartitions=80)
%time dd4['salary'].mean().compute()

CPU times: user 176 ms, sys: 162 ms, total: 339 ms
Wall time: 46.7 ms
CPU times: user 72.8 ms, sys: 6.93 ms, total: 79.7 ms
Wall time: 24.5 ms
CPU times: user 76.3 ms, sys: 986 µs, total: 77.3 ms
Wall time: 32 ms
CPU times: user 77.9 ms, sys: 11.2 ms, total: 89.1 ms
Wall time: 50.6 ms


10599.9766

### We are still slower than the built in pandas function. 
Maybe we are using the wrong functions of the dask data frame.
Write a function to do stuff on our data frames and use the apply method.

In [3]:
def f(x):
    return (13*x+5)%7

print('8 workers')
%time dd['salary'].apply(f, meta=('salary', 'int64')).compute() 
print('16 workers')
%time dd1['salary'].apply(f, meta=('salary', 'int64')).compute() 
print('24 workers')
%time dd2['salary'].apply(f, meta=('salary', 'int64')).compute() 
print('40 workers')
%time dd3['salary'].apply(f, meta=('salary', 'int64')).compute()
print('80 workers')
%time dd4['salary'].apply(f, meta=('salary', 'int64')).compute()

8 workers
CPU times: user 5.43 s, sys: 5.41 s, total: 10.8 s
Wall time: 2.47 s
16 workers
CPU times: user 1.74 s, sys: 310 ms, total: 2.05 s
Wall time: 1.43 s
24 workers
CPU times: user 1.63 s, sys: 410 ms, total: 2.04 s
Wall time: 1.42 s
40 workers
CPU times: user 1.36 s, sys: 28.7 ms, total: 1.38 s
Wall time: 1.37 s
80 workers
CPU times: user 1.38 s, sys: 25.6 ms, total: 1.41 s
Wall time: 1.39 s


0          5
1          6
2          2
3          6
4          3
5          2
6          4
7          5
8          5
9          6
10         6
11         2
12         3
13         5
14         2
15         3
16         4
17         5
18         2
19         6
20         2
21         3
22         5
23         2
24         4
25         3
26         2
27         2
28         2
29         6
          ..
4999970    5
4999971    5
4999972    5
4999973    6
4999974    2
4999975    2
4999976    2
4999977    2
4999978    5
4999979    5
4999980    2
4999981    5
4999982    5
4999983    2
4999984    5
4999985    2
4999986    2
4999987    2
4999988    3
4999989    4
4999990    5
4999991    5
4999992    2
4999993    6
4999994    4
4999995    2
4999996    2
4999997    5
4999998    3
4999999    2
Name: salary, Length: 5000000, dtype: int64

### How fast is the built in pandas version on one core?

In [4]:
print('1 core pandas')
%time df['salary'].apply(f)

1 core pandas
CPU times: user 1.3 s, sys: 141 ms, total: 1.44 s
Wall time: 1.44 s


0          5
1          6
2          2
3          6
4          3
5          2
6          4
7          5
8          5
9          6
10         6
11         2
12         3
13         5
14         2
15         3
16         4
17         5
18         2
19         6
20         2
21         3
22         5
23         2
24         4
25         3
26         2
27         2
28         2
29         6
          ..
4999970    5
4999971    5
4999972    5
4999973    6
4999974    2
4999975    2
4999976    2
4999977    2
4999978    5
4999979    5
4999980    2
4999981    5
4999982    5
4999983    2
4999984    5
4999985    2
4999986    2
4999987    2
4999988    3
4999989    4
4999990    5
4999991    5
4999992    2
4999993    6
4999994    4
4999995    2
4999996    2
4999997    5
4999998    3
4999999    2
Name: salary, Length: 5000000, dtype: int64

### We can see a speedup already. This will only get better with larger datasets

# Exercise 10.1
Use the built in dask.DataFrame functions for getting an overview over the data.
Very important functions are ```groupby```, ```loc```, ```merge```, ```corr``` and rowwise selections.

Given two dataframes, compare the runtimes of the pandas and dask functions. 
Also find out what the limitations to the parallelizability are.

In [None]:
### Your Code here

## Scale to clusters
Since we do not have a server cluster at hand we can create a cluster on our local machine.
We can now ```submit``` function calls to our cluster. These function calls are ```Futures``` and are possibly future results. They are computed when ```result``` is called

In [5]:
from dask.distributed import Client

def inc(x):
    return x + 1

def add(x, y):
    return x + y


client = Client(processes=False)  # start local workers as threads

a = client.submit(inc, 10)  # calls inc(10) in background thread or process
b = client.submit(inc, 20)  # calls inc(20) in background thread or process

a

In [6]:
a.result()

11

```Futures``` can be submitted to other tasks as input. These dependencies are handled automatically

In [7]:
c = client.submit(add, a, b)
print(c)
print(c.result())

<Future: status: pending, key: add-c1877172ef0a985e7ef7c1efeb50953c>
32


There is also a map function. This does have some overhead though

In [8]:
futures = client.map(inc, range(1000))

We can gather the results of such a map efficiently either via a list comprehension or with a built in function.

In [9]:
# results = [future.result() for future in futures]
results = client.gather(futures)  # this can be faster
print(results[:10])

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


### Working with bigger datasets. 
We can submit bigger dataframes as a normal input like before.
Or we can move our data to a worker of the cluster. This is useful when distributing work across multiple workers.

In [10]:
df = pd.DataFrame([range(100)])
remote_df = client.scatter(df)
remote_df

In [11]:
fut = client.submit(inc, remote_df)
fut

In [12]:
fut.result()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,90,91,92,93,94,95,96,97,98,99
0,1,2,3,4,5,6,7,8,9,10,...,91,92,93,94,95,96,97,98,99,100


In [13]:
client.close()

### There are more dask datatypes and functionalities. These are the basics for local cluster scheduling


### Exercise 10.2
Start a cluster and calculate the values of a polynomial analogously to Exercise 9.1

In [2]:
from dask.distributed import Client
import numpy as np
import pandas as pd
import multiprocessing as mp
from dask import compute, delayed
import dask.multiprocessing


def poly(x,a,b,c,d):
    return a*x**3 + b*x**2 + c*x + d

x = np.linspace(-2,2,50)
df = pd.read_csv('grid_params.csv')

### Your code here
client = Client(processes=False)
values = [delayed(poly)(x,*line) for line in df.values] #generator
results = compute(*values, scheduler='distributed') #do the work
client.close()
print(results[:1000])

(array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]), array([0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111,
       0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111,
       0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111,
       0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111,
       0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111,
       0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111,
       0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111,
       0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111,
       0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111,
       0.11111111, 0.11111111, 0.11111111, 0.11111111, 0.11111111]), array([0.22222222, 0.22222222, 0.22222222, 0.22222222, 0.22222222,
       0.22222222, 0.22222222, 0.2222

### Coming from joblib or scikit learn with parallelized workflows

In [6]:
from dask.distributed import Client
from sklearn.externals import joblib


client = Client()
with joblib.parallel_backend('dask'):
    # your scikit-learn code

IndentationError: unexpected indent (<ipython-input-6-a4a9e34e5399>, line 2)