# A 5-min Quickstart to dask

**Dask makes it easy to parallelize your code**. Dask can help you in the following cases:

- Speed up computations that are currently just using one of your cores.
- Compute on data that doesn't fit in memory, but does fit on disk.
- Perform computations in a distributed cluster of computers.


##**Install:**

Dask is easy installable through conda and pip:

    conda install dask
    #or
    pip install dask

##**Collections:**

There are three default collections, or interfaces, to dask: ``dask.array``, ``dask.dataframe`` and ``dask.bag``. Users coming from the PyData and SciPy ecosystem are probably already familiar with ``numpy.array`` and ``pandas.Dataframe``. Their dask collections equivalents provide a very similar interface while allowing parallel computations. The third one, ``dask.bag``, is primarly used when dealing with semi-structured data, like JSON blobs or log files.

### dask.array

If you are familiar with ``numpy.array``, you'll notice that ``dask.array`` is very similar. When creating an array with dask, one of the differences with numpy is that you'll to specify the block size for your chunks. For more information on chunks read the [FAQ](http://dask.readthedocs.org/en/latest/faq.html): *In ``dask.array`` what is ``chunks``?* and *How do I select a good value for ``chunks``?*.


In [1]:
import dask.array as da
import numpy as np

In [2]:
# Create a numpy array of ones

In [3]:
np_ones = np.ones((5000, 1000))

In [4]:
np_ones

array([[ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       ..., 
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.]])

In [5]:
# Create a dask array of ones
da_ones = da.ones((5000, 1000), chunks=(1000, 1000))

Another difference is that in dask one triggers computation by calling the ``.compute()`` method.

In [6]:
da_ones.compute()

array([[ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       ..., 
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.]])

You can interact with a ``dask.array`` similarly to a ``numpy.array``:

In [7]:
# Interact with a numpy array
np_y = np.log(np_ones + 1)[:5].sum(axis=1)

In [8]:
np_y

array([ 693.14718056,  693.14718056,  693.14718056,  693.14718056,
        693.14718056])

In [9]:
# Interact with a dask array
da_y = da.log(da_ones + 1)[:5].sum(axis=1)

If the result fits in memory, you can return it as a ``numpy.array``:

In [10]:
# Return the result in a numpy.array (fits in memory)
np_da_y = np.array(da_y)

In [11]:
np_da_y

array([ 693.14718056,  693.14718056,  693.14718056,  693.14718056,
        693.14718056])

If it doesn't, you can store the result to a file:

In [12]:
# Store the result to disk (for results that don't fit in memory)
import h5py

da_y.to_hdf5('myfile.hdf5', 'result')

In [13]:
!ls

dask-quickstart.ipynb          log.workers
environment.yml                myfile.hdf5
iris.csv                       sample.json
log.client                     twitter_2014-10-10_13h.json.gz
log.scheduler


In [14]:
# Read results from file
f = h5py.File('myfile.hdf5')
dset = f['/result']
da_result = da.from_array(dset, chunks=(1000, 1000))

In [15]:
da_result

dask.array<x_7, shape=(5,), chunks=((5,)), dtype=float64>

In [16]:
# Get values of da_result
da_result.compute()

array([ 693.14718056,  693.14718056,  693.14718056,  693.14718056,
        693.14718056])

For more information on ``dask.array`` visit the [array](http://dask.readthedocs.org/en/latest/array-overview.html) section. The reference guide for the ``dask.array`` [API](http://dask.readthedocs.org/en/latest/array-api.html) provides a detailed description of all the available methods and their arguments.

### dask.dataframe

Equivalently, a ``dask.dataframe`` is similar to a ``pandas.Dataframe`` with some slight alterations due to the parallel nature of dask.

In [17]:
import dask.dataframe as dd
import pandas as pd

In [18]:
df = pd.read_csv('iris.csv')

In [19]:
ddf = dd.read_csv('iris.csv')

You can then interact with the dataframe in a similar manner.

In [20]:
# In pandas
df.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa


In [21]:
# In dask
ddf.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa
5,5.4,3.9,1.7,0.4,setosa
6,4.6,3.4,1.4,0.3,setosa
7,5.0,3.4,1.5,0.2,setosa
8,4.4,2.9,1.4,0.2,setosa
9,4.9,3.1,1.5,0.1,setosa


With all dask collections (e.g. Array, Bag, DataFrame) one triggers computation by calling the ``.compute()`` method.

In [22]:
# In pandas
max_sepal_length_setosa = df[df.species == 'setosa'].sepal_length.max()

In [23]:
max_sepal_length_setosa

5.7999999999999998

In [24]:
# In dask
d_max_sepal_length_setosa = ddf[ddf.species == 'setosa'].sepal_length.max()

In [25]:
d_max_sepal_length_setosa.compute()

5.7999999999999998

The ``pandas.DataFrame`` API is huge and dask currently doesn't support it entirely, but implements a subset in ``dask.dataframe``. For a list of the current supported API, take a look at the [*What definetely works*](http://dask.readthedocs.org/en/latest/dataframe.html#what-definitely-works) section. The reference guide for the ``dask.dataframe`` [API](http://dask.readthedocs.org/en/latest/dataframe-api.html) provides a detailed description of all the available methods and their arguments.

###dask.bag

If you are working with semi-structure data, like JSON blobs or log files, ``dask.bag`` will provide a nice interface to perform parallel computations on it.

In [26]:
import dask.bag as db
import json

In [27]:
# Get tweets as a dask.bag from compressed json files
b = db.from_filenames('*.json.gz').map(json.loads)

In [28]:
# Take two items in dask.bag
b.take(2)

({u'contributors': None,
  u'coordinates': None,
  u'created_at': u'Fri Oct 10 17:19:35 +0000 2014',
  u'entities': {u'hashtags': [],
   u'symbols': [],
   u'trends': [],
   u'urls': [],
   u'user_mentions': []},
  u'favorite_count': 0,
  u'favorited': False,
  u'filter_level': u'medium',
  u'geo': None,
  u'id': 520624700389871616,
  u'id_str': u'520624700389871616',
  u'in_reply_to_screen_name': None,
  u'in_reply_to_status_id': None,
  u'in_reply_to_status_id_str': None,
  u'in_reply_to_user_id': None,
  u'in_reply_to_user_id_str': None,
  u'lang': u'pt',
  u'place': None,
  u'possibly_sensitive': False,
  u'retweet_count': 0,
  u'retweeted': False,
  u'source': u'<a href="http://www.facebook.com/twitter" rel="nofollow">Facebook</a>',
  u'text': u'Ebola entrou no Brasil pelo Paran\xe1 e rapidinho chegou ao RJ.\n\nEnquanto isso, minhas compras do Ebay t\xe3o travadas na alf\xe2ndega h\xe1 3 meses.',
  u'timestamp_ms': u'1412961575973',
  u'truncated': False,
  u'user': {u'contributor

In [29]:
# Count the frequencies of user locations
freq = b.pluck('user').pluck('location').frequencies()

In [30]:
df = freq.to_dataframe()

In [31]:
df

dd.DataFrame<bag-8, divisions=(None, None)>

In [32]:
df.compute()

Unnamed: 0,0,1
0,,20916
1,Natal,2
2,Planet earth. Sheffield.,1
3,"Mad, USERA",1
4,Brasilia DF - Brazil,2
5,Rondonia Cacoal,1
6,msftsrep || 4/5.,1
7,"México, D.F.",3
8,Paris 18e,1
9,"Scottsbluff, NE",3


## Distributed:

###dask.distributed

If you'd like to run your computations with a distributed cluster, instead of a single workstation, you can use ``dask.distributed``. There are many possible options for configuring your cluster. A good full example on how to use dask.distributed with Anaconda Cluster can be found in this [blogpost](http://continuum.io/blog/dask-distributed-cluster).

Once you have a cluster setup, you'll need to set up the ``dask.distributed.Client`` and pass it to ``compute()``. This is taken from the blogpost an example:

```python
>>> import dask
>>> from dask.distributed import Client

# client connected to 50 nodes, 2 workers per node.
>>> dc = Client('tcp://localhost:9000') 
>>> b = db.from_s3('githubarchive-data', '2015-*.json.gz').map(json.loads)

# use single node scheduler
>>> top_commits.compute()

# use client with distributed cluster              
>>> top_commits.compute(get=dc.get)
[(u'mirror-updates', 1463019),
 (u'KenanSulayman', 235300),
 (u'greatfirebot', 167558),
 (u'rydnr', 133323),
 (u'markkcc', 127625)]

```

## Final remarks

This 5-min quickstart guide is aimed at dask **users** and just provides a very general introduction. Dask is more than a set of collections. Under the hood, operations on those collections are represented as a task graph and dask's schedulers know how to execute them. More advanced users might be interested in writing their own [custom](http://dask.readthedocs.org/en/latest/custom-graphs.html) graphs, understading in more detail the shared memory [scheduler](http://dask.readthedocs.org/en/latest/shared.html) or the [distribuded](http://dask.readthedocs.org/en/latest/distributed.html) one, or profile the execution of the graph through dask's [diagnostics](http://dask.readthedocs.org/en/latest/diagnostics.html) utilities.