In [1]:
import pandas as pd
import numpy as np
import sys
sys.path.append(r'D:\notebook-dir\00 - PYTHON\COMMON-FUNCS')
import func_pandas_dask

## Ex - 1

In [2]:
df = pd.DataFrame({'X':np.random.randint(1000, size=100000),
                   'Y':np.random.randint(1000, size=100000)})
df.shape

(100000, 2)

In [3]:
df.head()

Unnamed: 0,X,Y
0,611,326
1,987,438
2,662,466
3,729,788
4,307,10


In [4]:
def add_squares(df):
    return df.X**2 + df.Y**2

In [5]:
%%time
df['add_squares'] = df.apply(add_squares, axis=1)

Wall time: 4.25 s


using `DASK` to improve

In [6]:
import dask.dataframe as dd

ddf = dd.from_pandas(df, npartitions=4)

In [7]:
ddf.shape

(Delayed('int-5f19ef0e-941b-4204-b945-51ba1eb0101a'), 3)

In [8]:
%%time
ddf['z'] = ddf.map_partitions(add_squares, meta=(None, 'int64')).compute()

Wall time: 23 ms


## Ex 2

In [10]:
%time func_pandas_dask.costly_simulation([1, 2, 3, 4])

Wall time: 312 ms


10

#### Define the set of input parameters to call the function

In [11]:
input_params = pd.DataFrame(np.random.random(size=(500, 4)),
                            columns=['param_a', 'param_b', 'param_c', 'param_d'])
input_params.head()

Unnamed: 0,param_a,param_b,param_c,param_d
0,0.342824,0.176634,0.31082,0.217479
1,0.123808,0.688909,0.136515,0.339706
2,0.232922,0.433013,0.999422,0.799384
3,0.32108,0.88765,0.3398,0.399582
4,0.598161,0.672402,0.744704,0.513175


Without using Dask

In [12]:
results = []

In [13]:
%%time
for parameters in input_params.values[:10]:
    result = func_pandas_dask.costly_simulation(parameters)
    results.append(result)

Wall time: 6.34 s


In [14]:
results

[1.0477570000020648,
 1.28893852021938,
 2.464741109456914,
 1.9481112892351282,
 2.528442098091119,
 1.925597382087811,
 2.2679394859872275,
 3.150628176096079,
 2.294002387413728,
 2.387377257401147]

#### Optimize : using python `multiprocessing` feature 

In [22]:
import multiprocessing
from multiprocessing import Pool
from tqdm import *

print("Number of cpu : ", multiprocessing.cpu_count())

Number of cpu :  4


In [37]:
%%time
if __name__ == '__main__':  
    p = Pool(processes = multiprocessing.cpu_count())
    
    output = []   
    url_per_chunk = 1    
    for result in tqdm(p.imap(func=func_pandas_dask.costly_simulation, iterable=input_params.values[:10], chunksize=url_per_chunk), 
                       total=len(input_params.values[:10])/url_per_chunk):
        output.append(result)

100%|████████████████████████████████████████████████████████████████████████████████| 10/10.0 [00:01<00:00,  5.18it/s]

Wall time: 2.01 s





In [48]:
%%time
if __name__ == '__main__':  
    p = Pool(processes = multiprocessing.cpu_count())
    
    output = []   
    url_per_chunk = 1    
    for result in tqdm(p.imap(func=func_pandas_dask.costly_simulation, iterable=input_params.values, chunksize=url_per_chunk), 
                       total=len(input_params.values)/url_per_chunk):
        output.append(result)

100%|██████████████████████████████████████████████████████████████████████████████| 500/500.0 [01:05<00:00,  7.62it/s]

Wall time: 1min 5s





#### Optimize : Use Dask `Delayed`

We can call `dask.delayed` on our funtion to make it lazy. 

Rather than compute its result immediately, it records what we want to compute as a `task into a graph` that we’ll run later on parallel hardware. 

Using dask.delayed is a relatively straightforward way to parallelize an existing code base

In [25]:
import dask

In [38]:
%%time
lazy_results = []
for parameters in input_params.values[:10]:
    
    lazy_result = dask.delayed(func_pandas_dask.costly_simulation)(parameters)
    
    lazy_results.append(lazy_result)

Wall time: 1 ms


In [39]:
lazy_results[0]

Delayed('costly_simulation-e226acfe-50c8-44cf-bcab-4dd1ff438c56')

#### Run in parallel
The `lazy_results` list contains information about ten calls to `costly_simulation` that have not yet been run. 

Call `.compute()` when you want your result as normal Python objects.

If you started Client() above then you may want to watch the status page during computation.

In [40]:
%time dask.compute(*lazy_results)

Wall time: 1.87 s


(1.0477570000020648,
 1.28893852021938,
 2.464741109456914,
 1.9481112892351282,
 2.528442098091119,
 1.925597382087811,
 2.2679394859872275,
 3.150628176096079,
 2.294002387413728,
 2.387377257401147)

run this on all of our input parameters:

In [49]:
lazy_results = []

for parameters in input_params.values:
    
    lazy_result = dask.delayed(func_pandas_dask.costly_simulation)(parameters)
    
    lazy_results.append(lazy_result)

In [51]:
%%time
#futures = dask.persist(*lazy_results)  # trigger computation in the background
futures = dask.compute(*lazy_results)  

Wall time: 1min 2s
