In [None]:
!conda install dask

Fetching package metadata .............
Solving package specifications: .

Package plan for installation in environment /Users/iveslab/anaconda3:

The following packages will be UPDATED:

    anaconda:  5.0.1-py36h6e48e2d_1  --> custom-py36ha4fed55_0
    dask:      0.15.3-py36hc3ad2d6_0 --> 0.15.4-py36h886f2ba_0
    dask-core: 0.15.3-py36hc0be6b7_0 --> 0.15.4-py36h8292370_0

Proceed ([y]/n)? 

## The following material are mainly borrowed from [Andrea Zonka](https://github.com/sdsc/sdsc-summer-institute-2017/tree/master/hpc3_python_hpc/dask_array)

# Array Multicore

Instead of trivially parallel independent tasks here we want to use multiple threads to process simultaneously different parts of the same array. `dask` automatically provides this feature by replacing the `numpy` function with `dask` functions. The key concept is a chunk, each chunk of data is executed separately by different threads. For example for a matrix we define a 2D block size and each of those blocks can be executed independently and then the results accumulated to get to the final answer. See <http://dask.pydata.org/>

In [1]:
import numpy as np
import dask.array as da

In [17]:
A = np.random.rand(20000,100000) #2000 rows * 10000 columns

In [18]:
A.size / 1e6

2000.0

In [8]:
A

array([[ 0.26533543,  0.94998648,  0.54994589, ...,  0.39632963,
         0.45731833,  0.18297459],
       [ 0.92654543,  0.48855329,  0.38547281, ...,  0.2091409 ,
         0.31271892,  0.29216024],
       [ 0.85106591,  0.25506796,  0.37556645, ...,  0.62049405,
         0.99839855,  0.1635665 ],
       ..., 
       [ 0.10126687,  0.22915469,  0.84885799, ...,  0.1570216 ,
         0.30999634,  0.37005143],
       [ 0.65590774,  0.03027586,  0.30563167, ...,  0.58972369,
         0.48380212,  0.68404124],
       [ 0.22055967,  0.05540368,  0.06977018, ...,  0.99643675,
         0.38992508,  0.28613238]])

In [None]:
%time B = A**2 + np.sin(A) * A * np.log(A)

In [10]:
A_dask = da.from_array(A, chunks=(1000, 2000))

In [11]:
A_dask.numblocks

(2, 5)

In [15]:
%time B_dask = (A_dask**2 + da.sin(A_dask) * A_dask * da.log(A_dask)).compute()

CPU times: user 629 ms, sys: 154 ms, total: 782 ms
Wall time: 322 ms


In [16]:
assert np.allclose(B, B_dask)

In [8]:
from concurrent.futures import ProcessPoolExecutor

def sum_row(line):
    return sum([int(x) for x in line.split()])



In [16]:
%%time
with ProcessPoolExecutor(max_workers=4) as executor:
    with open('numbers.txt') as fh:
        b = sum(executor.map(sum_row,fh,chunksize = 2))

CPU times: user 882 ms, sys: 290 ms, total: 1.17 s
Wall time: 1.06 s


In [17]:
%%time
with ProcessPoolExecutor(max_workers=4) as executor:
    with open('numbers.txt') as fh:
        b = sum(executor.map(sum_row,fh,chunksize = 20))

CPU times: user 100 ms, sys: 44.2 ms, total: 145 ms
Wall time: 133 ms
