# Some more features

We quickly summarize here some more features that might be of interest even for beginners.

In [2]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:63517")
client

0,1
Client  Scheduler: tcp://127.0.0.1:63517  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 17.18 GB


## Calculating multiple outputs

Sometimes we need multiple outputs from a computation. However until now all we have seen are series of delayed computations and final ```compute()``` call. It is however possible to recover **multiple** intermediate results and to do that **without computational penalty***. Let's consider this example:

In [2]:
import dask
import dask.array as da

In [3]:
my_array = da.random.random((1000,1000,250))

In [4]:
my_array

Unnamed: 0,Array,Chunk
Bytes,2.00 GB,125.00 MB
Shape,"(1000, 1000, 250)","(250, 250, 250)"
Count,16 Tasks,16 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.00 GB 125.00 MB Shape (1000, 1000, 250) (250, 250, 250) Count 16 Tasks 16 Chunks Type float64 numpy.ndarray",250  1000  1000,

Unnamed: 0,Array,Chunk
Bytes,2.00 GB,125.00 MB
Shape,"(1000, 1000, 250)","(250, 250, 250)"
Count,16 Tasks,16 Chunks
Type,float64,numpy.ndarray


We want to calculate the difference between max and min projections along the third axis. But we also want to check the maximum projection.

In [5]:
maxproj = my_array.min(axis = 2)
meanproj = my_array.max(axis = 2)
difference = maxproj - meanproj

If we caclulate things separately, the maximum projection is done twice:

In [31]:
%%time
maxp = maxproj.compute()
diffp = difference.compute()

CPU times: user 7.7 s, sys: 915 ms, total: 8.61 s
Wall time: 3.16 s


Whereas with ```dask.compute``` we do it only once because Dask knows to smartly re-use computations:

In [32]:
%%time
result = dask.compute(maxproj, difference)

CPU times: user 4 s, sys: 548 ms, total: 4.54 s
Wall time: 1.76 s


In [33]:
result[0].shape

(1000, 1000)

In [34]:
result[1].shape

(1000, 1000)

## Delayed as decorators

We have seen that we can use ```delayed()``` to create Dask versions of regular functions, and use it like this:

In [6]:
from dask import delayed

In [7]:
def inc(x):
    return x+1

In [8]:
val = delayed(inc)(10)

In [9]:
val.compute()

11

If we need to re-use that same function multiple times and we want to avoid using ```delayed()``` over and over, or if we want a simple way to run code with or without dask we can use ```delayed()``` as a decorator since it is applied to a **function***:

In [10]:
@delayed
def inc2(x):
    return x+1

In [11]:
val = inc2(10)

In [12]:
val

Delayed('inc2-2434875f-4f74-48d4-8047-9d9279a03c3f')

## Dask bags

There's one more "container" type like dask-arrays and dask-dataframes that one can use: the dask bag. It is mainly used to handle unstructured data or semi-structured data. These are very specific applications, so we won't go into details. However one can also create a dask-bag e.g. from a list:

In [13]:
import dask.bag as db
import numpy as np

In [14]:
mylist = np.random.random(1000000)

In [15]:
my_bag = db.from_sequence(mylist,partition_size=100)

Then one can do standard operations like map, groupby etc.:

In [16]:
squared = my_bag.map(lambda x:x**2).compute()

As you see this is not particularly efficient. One should use bags mostly to import data and then transform the input into a dask-array or -dataframe.

An interesting features for import is the possibility to read zip files:

In [17]:
test = db.read_text('../Data/Chicago_taxi/chicago_taxi_trips_2016_01.csv.zip')

In [18]:
myfile = test.compute()

## Peristing in memory

Until now, the only time where we actually accessed the data was when calling ```compute()```. If at some point we have a dask object that fits into RAM and that we don't want to recalculate everytime, we can make it **persist**. For example:

In [28]:
my_array = da.random.random((1000,1000,2500))

In [31]:
my_array

Unnamed: 0,Array,Chunk
Bytes,20.00 GB,125.00 MB
Shape,"(1000, 1000, 2500)","(250, 250, 250)"
Count,160 Tasks,160 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 20.00 GB 125.00 MB Shape (1000, 1000, 2500) (250, 250, 250) Count 160 Tasks 160 Chunks Type float64 numpy.ndarray",2500  1000  1000,

Unnamed: 0,Array,Chunk
Bytes,20.00 GB,125.00 MB
Shape,"(1000, 1000, 2500)","(250, 250, 250)"
Count,160 Tasks,160 Chunks
Type,float64,numpy.ndarray


In [43]:
max_proj = my_array.max(axis = 2)

If we want to access a given element of the resulting matrix, Dask has to perfrom the whole calculation:

In [44]:
%%time
max_proj[0,0].compute()

CPU times: user 10.3 ms, sys: 2.39 ms, total: 12.7 ms
Wall time: 1.12 s


0.9999381693346302

In [46]:
%%time
meanval = max_proj.mean()
meanval.compute()

CPU times: user 277 ms, sys: 19.5 ms, total: 297 ms
Wall time: 13.7 s


0.9996004565004427

Now we can make this object perist:

In [47]:
max_proj = max_proj.persist()

The object is still as dask object, so we can still use it in further calculations:

In [48]:
max_proj

Unnamed: 0,Array,Chunk
Bytes,8.00 MB,500.00 kB
Shape,"(1000, 1000)","(250, 250)"
Count,16 Tasks,16 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 8.00 MB 500.00 kB Shape (1000, 1000) (250, 250) Count 16 Tasks 16 Chunks Type float64 numpy.ndarray",1000  1000,

Unnamed: 0,Array,Chunk
Bytes,8.00 MB,500.00 kB
Shape,"(1000, 1000)","(250, 250)"
Count,16 Tasks,16 Chunks
Type,float64,numpy.ndarray


However now, any down-stream operation is much faster:

In [52]:
meanval = max_proj.mean()

In [54]:
%%time
meanval = max_proj.mean()
meanval.compute()

CPU times: user 13.3 ms, sys: 5.05 ms, total: 18.3 ms
Wall time: 73.7 ms


0.9996004565004427

However now, if we want to access a given value, it's much faster:

In [55]:
%%time
max_proj[0,0].compute()

CPU times: user 4.97 ms, sys: 1.89 ms, total: 6.86 ms
Wall time: 11.2 ms


0.9999381693346302