# [10 minutes to Dask](https://docs.dask.org/en/latest/10-minutes-to-dask.html)

## Scheduling and diagnostics

Sometimes, it comes great to have a diagnostics dashboard where you can see your tasks as they are processed.

By default when you call `compute` on a **Dask object** (i.e. after you have generated a **task graph**), Dask uses the thread pool on your computer to run computations in parallel. But if you want to get access to the **diagnostics dashboard**, then use **distributed scheduler** instead.

Despite having “distributed” in it’s name, the distributed scheduler works well on both single and multiple machines. Think of it as the “advanced scheduler”.

In [None]:
from dask.distributed import Client

client = Client()
client

This is how you connect to a cluster that is already running:

```python
from dask.distributed import Client

client = Client("<url-of-scheduler>")
client
```

Output: `<Client: 'tcp://127.0.0.1:41703' processes=4 threads=12, memory=31.08 GiB>`

There are a variety of ways to set up a remote cluster.
Refer to [how to deploy dask clusters](https://docs.dask.org/en/latest/how-to/deploy-dask-clusters.html) for more information.

# Dask Collections

**Dask Collections** are composed of **Dask DataFrames**, **Dask Arrays** and **Dask Bags**.

# Dask DataFrames

In [None]:
import numpy as np
import pandas as pd

import dask.dataframe as dd

Let's start with a **Pandas DataFrame**

In [None]:
index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400), "b": list("abcaddbe" * 300)}, index=index)
df

You can make a **Dask DataFrame** from scratch by supplying existing data (i.e. **df**) and optionally including information about how the chunks should be structured (i.e. the number of partitions: **npartitions**).

In [None]:
ddf = dd.from_pandas(df, npartitions=10)
ddf

Now we have a DataFrame with 2 columns and 2400 rows composed of 10 partitions where each partition has 240 rows.

Each partition represents a piece of the data.

Here are some key properties of a DataFrame:

In [None]:
# Check the index values covered by each partition
ddf.divisions

In [None]:
# Access a particular partition
ddf.partitions[1]

### Dask DataFrames - Indexing

**Dask DataFrames** are indexed the same way as **Pandas DataFrames**:

In [None]:
df["2021-10-01": "2021-10-09 5:00"]

In [None]:
ddf["2021-10-01": "2021-10-09 5:00"]

### Dask DataFrames - Computation

But remember:

> **Dask is lazily evaluated**. The result from a computation isn’t computed until you ask for it. Instead, a **Dask task graph** for the computation is produced.

Anytime you have a Dask object and you want to get the result, call `compute` method:

In [None]:
ddf["2021-10-01": "2021-10-09 5:00"].compute()

**Dask DataFrames** match existing **Pandas methods**, so they should feel familiar.

Take a look on all implemented methods on [Dask DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html).

Call the method to set up the task graph, and then call `compute` to get the result.

In [None]:
ddf.a.mean()

In [None]:
ddf.a.mean().compute()

In [None]:
ddf.b.unique()

In [None]:
ddf.b.unique().compute()

Methods can be chained together just like in **Pandas**:

In [None]:
result = ddf["2021-10-01": "2021-10-09 5:00"].a.cumsum() - 100
result

In [None]:
result.compute()

In addition to triggering computation, we can inspect the **task graph** to figure out what’s going on

> Please note that you need to install **graphviz** package (https://graphviz.org/download/) and then to install python wrapper (`pip install graphviz`) in order to visualize **task graph**

In [None]:
result.dask

In [None]:
result.visualize()

# Dask Arrays

In [None]:
import numpy as np
import pandas as pd

import dask.array as da

Let's start with a **Numpy Array**

In [None]:
data = np.arange(100000).reshape(200, 500)
data

You can make a **Dask Array** from scratch by supplying existing data (i.e. **data**) and optionally including information about how the chunks should be structured:

In [None]:
a = da.from_array(data, chunks=(100, 100))
a

Now we have a 2D array with the shape (200, 500) composed of 10 chunks where each chunk has the shape (100, 100). Each chunk represents a piece of the data.

Here are some key properties of an Array:

In [None]:
# Inspect the chunks
a.chunks

In [None]:
# Access a particular block of data
a.blocks[1, 3]

### Dask Arrays - Indexing

**Dask Arrays** are indexed the same way as **Numpy Arrays**:

In [None]:
a[:50, 200]

### Dask Arrays - Computation

But remember:

> **Dask is lazily evaluated**. The result from a computation isn’t computed until you ask for it. Instead, a **Dask task graph** for the computation is produced.

Anytime you have a Dask object and you want to get the result, call `compute` method:

In [None]:
a.mean()

In [None]:
a.mean().compute()

**Dask Arrays** match existing **Numpy methods**, so they should feel familiar.

Take a look on all implemented methods on [Dask Array API](https://docs.dask.org/en/latest/array-api.html).

Call the method to set up the task graph, and then call `compute` to get the result.

In [None]:
np.sin(a)

In [None]:
np.sin(a).compute()

In [None]:
a.T

In [None]:
a.T.compute()

Methods can be chained together just like in **Numpy**:

In [None]:
b = a.max(axis=1)[::-1] + 10
b

In [None]:
b[:10].compute()

In addition to triggering computation, we can inspect the **task graph** to figure out what’s going on

> Please note that you need to install **graphviz** package (https://graphviz.org/download/) and then to install python wrapper (`pip install graphviz`) in order to visualize **task graph**

In [None]:
b.dask

In [None]:
b.visualize()

# Dask Bags

In [None]:
import numpy as np
import pandas as pd

import dask.bag as db

In [None]:
b = db.from_sequence([1, 2, 3, 4, 5, 6, 2, 1], npartitions=2)
b

Now we have a sequence with 8 items composed of 2 partitions where each partition has 4 items in it. Each partition represents a piece of the data.

### Dask Bags - Indexing

A Bag is an unordered collection allowing repeats. So it is like a list, but it doesn’t guarantee an ordering among elements. There is no way to index Bags since they are not ordered.

### Dask Bags - Computation

But remember:

> **Dask is lazily evaluated**. The result from a computation isn’t computed until you ask for it. Instead, a **Dask task graph** for the computation is produced.

Anytime you have a Dask object and you want to get the result, call `compute` method:

In [None]:
b.compute()

Dask Bag implements operations like `map`, `filter`, `fold`, and `groupby` on collections of generic Python objects:

In [None]:
b.filter(lambda x: x % 2)

In [None]:
b.filter(lambda x: x % 2).compute()

In [None]:
b.distinct()

In [None]:
b.distinct().compute()

Methods can be chained together:

In [None]:
c = db.zip(b, b.map(lambda x: x * 10))
c

In [None]:
c.compute()

In addition to triggering computation, we can inspect the **task graph** to figure out what’s going on

> Please note that you need to install **graphviz** package (https://graphviz.org/download/) and then to install python wrapper (`pip install graphviz`) in order to visualize **task graph**

In [None]:
c.dask

In [None]:
c.visualize()

# Low-Level Interfaces

Often when parallelizing existing code bases or building custom algorithms, you run into code that is parallelizable, but isn’t just a big **DataFrame** or **Array** ... Then it comes **Dask Delayed** and **Dask Futures**

# Dask Delayed

**Dask Delayed** let you to wrap individual function calls into a lazily constructed **task graph**:

In [None]:
import dask

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def add(x, y):
    return x + y

a = inc(1)       # no work has happened yet
b = inc(2)       # no work has happened yet
c = add(a, b)    # no work has happened yet

result = c.compute()  # This triggers all of the above computations
result

In [None]:
c.visualize()

# Dask Futures

Unlike the interfaces described so far, **Futures** are **eager**. Computation starts as soon as the function is submitted:

In [None]:
from dask.distributed import Client

client = Client()

def inc(x):
    return x + 1

def add(x, y):
    return x + y

a = client.submit(inc, 1)     # work starts immediately
b = client.submit(inc, 2)     # work starts immediately
c = client.submit(add, a, b)  # work starts immediately

result = c.result()                # block until work finishes, then gather result
result

In [None]:
c.visualize()