In [1]:
import warnings
warnings.filterwarnings("ignore")

from dask.distributed import Client, progress

client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client

0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:61195/status,

0,1
Status: running,Using processes: True
Dashboard: http://127.0.0.1:61195/status,Workers: 4
Total threads:  8,Total memory:  7.45 GiB

0,1
Comm: tcp://127.0.0.1:61196,Workers: 4
Dashboard: http://127.0.0.1:61195/status,Total threads:  8
Started:  Just now,Total memory:  7.45 GiB

0,1
Comm: tcp://127.0.0.1:61233,Total threads: 2
Dashboard: http://127.0.0.1:61239/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:61198,
Local directory: C:\Users\harri\Data\First_Dask_Projects\Working_with_Dask_Arrays\dask-worker-space\worker-q6xny8ru,Local directory: C:\Users\harri\Data\First_Dask_Projects\Working_with_Dask_Arrays\dask-worker-space\worker-q6xny8ru

0,1
Comm: tcp://127.0.0.1:61235,Total threads: 2
Dashboard: http://127.0.0.1:61238/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:61199,
Local directory: C:\Users\harri\Data\First_Dask_Projects\Working_with_Dask_Arrays\dask-worker-space\worker-zbdfmlgc,Local directory: C:\Users\harri\Data\First_Dask_Projects\Working_with_Dask_Arrays\dask-worker-space\worker-zbdfmlgc

0,1
Comm: tcp://127.0.0.1:61234,Total threads: 2
Dashboard: http://127.0.0.1:61237/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:61200,
Local directory: C:\Users\harri\Data\First_Dask_Projects\Working_with_Dask_Arrays\dask-worker-space\worker-zkpy3b6a,Local directory: C:\Users\harri\Data\First_Dask_Projects\Working_with_Dask_Arrays\dask-worker-space\worker-zkpy3b6a

0,1
Comm: tcp://127.0.0.1:61236,Total threads: 2
Dashboard: http://127.0.0.1:61240/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:61201,
Local directory: C:\Users\harri\Data\First_Dask_Projects\Working_with_Dask_Arrays\dask-worker-space\worker-llq5amuv,Local directory: C:\Users\harri\Data\First_Dask_Projects\Working_with_Dask_Arrays\dask-worker-space\worker-llq5amuv


In [4]:
import dask.array as da
import numpy as np

Below, create a 10000x10000 array of random numbers. Note that the `chunks` parameter is set to `(1000, 1000)`. This is different from what you would normally do when generating random arrays with NumPy. By setting chunks, you tell Dask that it should represent as many NumPy arrays of size 1000x1000 (or smaller, if the array cannot be divided evenly). In this case, there will be 100 NumPy arrays of size 1000x1000.

The code below does the following:

1. Creates a random Dask array of size 10000x10000.
2. Adds this array to its transpose.
3. Filters the resulting array and calculates its mean.

As usual, call `compute()` to make Dask evaluate the results. Note that you can calculate the runtime of the following cell using Jupyter Notebook's `%%time` command.

In [5]:
%%time
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z.compute()

Wall time: 1.4 s


array([0.99642456, 1.00330299, 0.99330855, ..., 1.00003267, 0.99461197,
       0.99903126])

First, notice that the code above is almost identical to what you would write by using NumPy. The only difference is that you set the `chunks` parameter when generating a random Dask array.

Second, the code block took 427 milliseconds to run.

Now, do the same thing using NumPy arrays:

In [6]:
%%time
x = np.random.random((10000, 10000))
y = x + x.T
z = y[::2, 5000:].mean(axis=1)

Wall time: 11.6 s


## Persisting data in memory

So far, you've seen that you can parallelize computations if you use Dask arrays instead of NumPy arrays. Moreover, using Dask, you can even work with data that doesn't fit into the memory. However, if you have the available memory for an array and just want to speed up the computations using Dask, then you can persist the data in memory and take advantage of the memory speed. If you do this, all of the future computations on the persisted array will be much faster.

You can see this demonstrated if you do the same computations before and after you persist your Dask array into the memory. First, make your computations without persisting the array:

In [11]:
x = da.random.random((10000, 10000), chunks=(1000, 1000))

In [12]:
%%time
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z.compute()

Wall time: 672 ms


array([1.00011085, 1.0010304 , 1.0044223 , ..., 0.99001502, 1.0140081 ,
       1.00272311])

In [13]:
x = da.random.random((10000, 10000), chunks=(1000, 1000))
# This persists the 'x' array into the memory
x.persist()

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (10000, 10000) (1000, 1000) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


In [14]:
%%time
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z.compute()

Wall time: 1.05 s


array([1.00017381, 0.99760467, 0.99788435, ..., 0.99282811, 0.99918092,
       0.99486942])

Didn't quite get the expected outcome. When persisting the data in memory the computation should run faster rather than slower than above.

## Drawbacks of Dask arrays

As is the case for Dask DataFrames, the `dask.array` package doesn't implement the entire NumPy interface. The main differences are the following:

1. The Dask project is an ongoing one, and NumPy has a huge API. So, implementing them takes time.

2. Some operations, like sorting, are difficult to parallelize, as discussed in the previous lesson. So, some functionalities around sorting are deliberately not supported in Dask.

3. If an operation's results depend on the values in the inputs, then Dask doesn't implement these operations. This is because of Dask's lazy evaluation strategy.

That being said, many of the most commonly used functionalities of NumPy are available in the Dask DataFrames. 

# Assignment
## 1. Change the code you worked with above to chunks=(250, 250). How long does it take to run?

In [18]:
%%timeit
x = da.random.random((10000, 10000), chunks=(250, 250))
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z.compute()

2.57 s ± 282 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Answer: The code took ~2.5 seconds per loop

## 2. Change the code to chunks=(500, 500). How long does it take to run?

In [20]:
%%timeit
x = da.random.random((10000, 10000), chunks=(500, 500))
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z.compute()

1.08 s ± 91.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Answer: The code took ~1 second  per loop. The code was faster when the chunk size was bigger, this is because when dealing with smaller chunk size Dask needs to manage a higher number of chunks which is computationally intensive.

In [22]:
client.close()