# Welcome to the Dask Tutorial

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">
Dask is a parallel and distributed computing library that scales the existing Python and PyData ecosystem.

Dask can scale up to your full laptop capacity and out to a cloud cluster.

## What is [Dask](https://www.dask.org/)?

There are many parts to the "Dask" the project:
* Collections/API also known as "core-library".
* Distributed -- to create clusters
* Intergrations and broader ecosystem


### Dask Collections

Dask provides **multi-core** and **distributed+parallel** execution on **larger-than-memory** datasets

We can think of Dask's APIs (also called collections)  at a high and a low level:

<center>
<img src="images/high_vs_low_level_coll_analogy.png" width="75%" alt="High vs Low level clothes analogy">
</center>

*  **High-level collections:**  Dask provides high-level Array, Bag, and DataFrame
   collections that mimic NumPy, lists, and pandas but can operate in parallel on
   datasets that don't fit into memory.
* **Low-level collections:**  Dask also provides low-level Delayed and Futures
   collections that give you finer control to build custom parallel and distributed computations.

### Dask Cluster

Most of the times when you are using Dask, you will be using a distributed scheduler, which exists in the context of a Dask cluster. The Dask cluster is structured as:

<center>
<img src="images/distributed-overview.png" width="75%" alt="Distributed overview">
</center>

### Dask Ecosystem

In addition to the core Dask library and its distributed scheduler, the Dask ecosystem connects several additional initiatives, including:

- Dask-ML (parallel scikit-learn-style API)
- Dask-image
- Dask-cuDF
- Dask-sql
- Dask-snowflake
- Dask-mongo
- Dask-bigquery

Community libraries that have built-in dask integrations like:

- Xarray
- XGBoost
- Prefect
- Airflow

Dask deployment libraries
- Dask-kubernetes
- Dask-YARN
- Dask-gateway
- Dask-cloudprovider
- jobqueue

... When we talk about the Dask project we include all these efforts as part of the community. 

## Dask Use Cases

Dask is used in multiple fields such as:

* Geospatial
* Finance
* Astrophysics
* Microbiology
* Environmental science

Check out the Dask [use cases](https://stories.dask.org/en/latest/) page that provides a number of sample workflows.

## Useful Links

*  Reference
    *  [Docs](https://dask.org/)
    *  [Examples](https://examples.dask.org/)
    *  [Code](https://github.com/dask/dask/)
    *  [Blog](https://blog.dask.org/)
*  Ask for help
    *   [`dask`](http://stackoverflow.com/questions/tagged/dask) tag on Stack Overflow, for usage questions
    *   [github issues](https://github.com/dask/dask/issues/new) for bug reports and feature requests
    *   [discourse forum](https://dask.discourse.group/) for general, non-bug, questions and discussion
    *   Attend a live tutorial

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">


# Dask DataFrame - parallelized pandas

Looks and feels like the pandas API, but for parallel and distributed workflows. 

At its core, the `dask.dataframe` module implements a "blocked parallel" `DataFrame` object that looks and feels like the pandas API, but for parallel and distributed workflows. One Dask `DataFrame` is comprised of many in-memory pandas `DataFrame`s separated along the index. One operation on a Dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints.


<img src="https://docs.dask.org/en/stable/_images/dask-dataframe.svg"
     align="right"
     width="30%"
     alt="Dask DataFrame is composed of pandas DataFrames"/>

**Related Documentation**

* [DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)
* [DataFrame screencast](https://youtu.be/AT2XtFehFSQ)
* [DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html)
* [DataFrame examples](https://examples.dask.org/dataframe.html)
* [pandas documentation](https://pandas.pydata.org/pandas-docs/stable/)

## When to use `dask.dataframe`

pandas is great for tabular datasets that fit in memory. A general rule of thumb for pandas is:

> "Have 5 to 10 times as much RAM as the size of your dataset"
>
> ~ Wes McKinney (2017) in [10 things I hate about pandas](https://wesmckinney.com/blog/apache-arrow-pandas-internals/)

Here "size of dataset" means dataset size on _the disk_.

Dask becomes useful when the datasets exceed the above rule.

In this notebook, you will be working with the New York City Airline data. This dataset is only ~200MB, so that you can download it in a reasonable time, but `dask.dataframe` will scale to  datasets **much** larger than memory.



## Create datasets

Create the datasets you will be using in this notebook:

In [None]:
%run prep.py -d flights

## Set up your local cluster

Create a local Dask cluster and connect it to the client. Don't worry about this bit of code for now, you will learn more in the Distributed notebook.

In [None]:
from dask.distributed import Client

client = Client(n_workers=4)
client

### Dask Diagnostic Dashboard

Dask Distributed provides a useful Dashboard to visualize the state of your cluster and computations.

If you're on **JupyterLab or Binder**, you can use the [Dask JupyterLab extension](https://github.com/dask/dask-labextension) (which should be already installed in your environment) to open the dashboard plots:
* Click on the Dask logo in the left sidebar
* Click on the magnifying glass icon, which will automatically connect to the active dashboard (if that doesn't work, you can type/paste the dashboard link http://127.0.0.1:8787 in the field)
* Click on **"Task Stream"**, **"Progress Bar"**, and **"Worker Memory"**, which will open these plots in new tabs
* Re-organize the tabs to suit your workflow!

Alternatively, click on the dashboard link displayed in the Client details above: http://127.0.0.1:8787/status. It will open a new browser tab with the Dashboard.

**Note**: if you are running Dask on a machine that doesn't have a public IP, you can use the included `jupyter-server-proxy` to access the dashboard.
Example URL for dashboard behind proxy: **http://127.0.0.1:<jupyter-port\>/proxy/<dashboard-port\>/status**

## Reading and working with datasets

Let's read an extract of flights in the USA across several years. This data is specific to flights out of the three airports in the New York City area.

In [None]:
import os
import dask

By convention, we import the module `dask.dataframe` as `dd`, and call the corresponding `DataFrame` object `ddf`.

**Note**: The term "Dask DataFrame" is slightly overloaded. Depending on the context, it can refer to the module or the DataFrame object. To avoid confusion, throughout this notebook:
- `dask.dataframe` (note the all lowercase) refers to the API, and
- `DataFrame` (note the CamelCase) refers to the object.

The following filename includes a glob pattern `*`, so all files in the path matching that pattern will be read into the same `DataFrame`.

In [None]:
import dask.dataframe as dd

ddf = dd.read_csv(
    os.path.join("data", "nycflights", "*.csv"), parse_dates={"Date": [0, 1, 2]},
)
ddf

Dask has not loaded the data yet, it has:
- investigated the input path and found that there are ten matching files
- intelligently created a set of jobs for each chunk -- one per original CSV file in this case

Notice that the representation of the `DataFrame` object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.

### Lazy Evaluation

Most Dask Collections, including Dask `DataFrame` are evaluated lazily, which means Dask constructs the logic (called task graph) of your computation immediately but "evaluates" them  only when necessary. You can view this task graph using `.visualize()`.

You will learn more about this in the Delayed notebook, but for now, note that we need to call `.compute()` to trigger actual computations.

In [None]:
ddf.visualize()

Some functions like `len` and `head` also trigger a computation. Specifically, calling `len` will:
- load actual data, (that is, load each file into a pandas DataFrame)
- then apply the corresponding functions to each pandas DataFrame (also known as a partition)
- combine the subtotals to give you the final grand total

In [None]:
# load and count number of rows
len(ddf)

```python
len(ddf)

# ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

# +----------------+---------+----------+
# | Column         | Found   | Expected |
# +----------------+---------+----------+
# | CRSElapsedTime | float64 | int64    |
# | TailNum        | object  | float64  |
# +----------------+---------+----------+

# The following columns also raised exceptions on conversion:

# - TailNum
#   ValueError("could not convert string to float: 'N54711'")

# Usually this is due to dask's dtype inference failing, and
# *may* be fixed by specifying dtypes manually by adding:

# dtype={'CRSElapsedTime': 'float64',
#        'TailNum': 'object'}

# to the call to `read_csv`/`read_table`.

```

Unlike `pandas.read_csv` which reads in the entire file before inferring datatypes, `dask.dataframe.read_csv` only reads in a sample from the beginning of the file (or first file if using a glob). These inferred datatypes are then enforced when reading all partitions.

In this case, the datatypes inferred in the sample are incorrect. The first `n` rows have no value for `CRSElapsedTime` (which pandas infers as a `float`), and later on turn out to be strings (`object` dtype). Note that Dask gives an informative error message about the mismatch. When this happens you have a few options:

- Specify dtypes directly using the `dtype` keyword. This is the recommended solution, as it's the least error prone (better to be explicit than implicit) and also the most performant.
- Increase the size of the `sample` keyword (in bytes)
- Use `assume_missing` to make `dask` assume that columns inferred to be `int` (which don't allow missing values) are actually `floats` (which do allow missing values). In our particular case this doesn't apply.

In our case we'll use the first option and directly specify the `dtypes` of the offending columns. 

In [None]:
ddf = dd.read_csv(
    os.path.join("data", "nycflights", "*.csv"),
    parse_dates={"Date": [0, 1, 2]},
    dtype={"TailNum": str, "CRSElapsedTime": float, "Cancelled": bool},
)

In [None]:
# load and count number of rows
len(ddf)  # now works

You can view the start and end of the data as you would in pandas:

In [None]:
ddf.head()

In [None]:
ddf.tail() 

### Reading from remote storage

If you're thinking about distributed computing, your data is probably stored remotely on services (like Amazon's S3 or Google's cloud storage) and is in a friendlier format (like Parquet). Dask can read data in various formats directly from these remote locations **lazily** and **in parallel**.

Here's how you can read the NYC taxi cab data from Amazon S3:

```python
ddf = dd.read_parquet(
    "s3://nyc-tlc/trip data/yellow_tripdata_2012-*.parquet",
)
```

You can also leverage Parquet-specific optimizations like column selection and metadata handling, learn more in [the Dask documentation on working with Parquet files](https://docs.dask.org/en/stable/dataframe-parquet.html).

In [None]:
ddf_nyc = dd.read_parquet(
    "s3://dask-data/nyc-taxi/nyc-2015.parquet/part.*.parquet",
    columns=["passenger_count", "tip_amount"],
    storage_options={"anon": True},
)

In [None]:
result = ddf_nyc.groupby("passenger_count").tip_amount.mean().compute()
result

In [None]:
ddf_nyc.head()

In [None]:
(ddf_nyc.groupby("passenger_count").tip_amount.mean()).visualize(engine='cytoscape')

## Computations with `dask.dataframe`

Let's compute the maximum of the flight delay.

With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums.

```python
import pandas as pd

files = os.listdir(os.path.join('data', 'nycflights'))

maxes = []

for file in files:
    df = pd.read_csv(os.path.join('data', 'nycflights', file))
    maxes.append(df.DepDelay.max())
    
final_max = max(maxes)
```

`dask.dataframe` lets us write pandas-like code, that operates on larger-than-memory datasets in parallel.

In [None]:
%%time
result = ddf.DepDelay.max()
print(result.compute())

This creates the lazy computation for us and then runs it.  

**Note:** Dask will delete intermediate results (like the full pandas DataFrame for each file) as soon as possible. This means you can handle datasets that are larger than memory but, repeated computations will have to load all of the data in each time. (Run the code above again, is it faster or slower than you would expect?)

You can view the underlying task graph using `.visualize()`:

In [None]:
# notice the parallelism
result.visualize()

## Exercises

In this section you will do a few `dask.dataframe` computations. If you are comfortable with pandas then these should be familiar. You will have to think about when to call `.compute()`.

### 1. How many rows are in our dataset?

_Hint_: how would you check how many items are in a list?

In [None]:
# Your code here

In [None]:
# Solution
len(ddf)

### 2. In total, how many non-canceled flights were taken?

_Hint_: use [boolean indexing](https://pandas.pydata.org/pandas-docs/stable/indexing.html#boolean-indexing).

In [None]:
# Your code here

In [None]:
# Solution
len(ddf[~ddf.Cancelled])

### 3. In total, how many non-canceled flights were taken from each airport?

*Hint*: use [groupby](https://pandas.pydata.org/pandas-docs/stable/groupby.html).

In [None]:
# Your code here

In [None]:
# Solution
ddf[~ddf.Cancelled].groupby("Origin").Origin.count().compute()

### 4. What was the average departure delay from each airport?

In [None]:
# Your code here

In [None]:
# Solution
ddf.groupby("Origin").DepDelay.mean().compute()

### 5. What day of the week has the worst average departure delay?

In [None]:
# Your code here

In [None]:
# Solution
ddf.groupby("DayOfWeek").DepDelay.mean().idxmax().compute()

### 6. Let's say the distance column is erroneous and you need to add 1 to all values, how would you do this?

In [None]:
# Your code here

In [None]:
# Solution
ddf["Distance"].apply(
    lambda x: x + 1
).compute()  # don't worry about the warning, we'll discuss in the next sections

# OR

(ddf["Distance"] + 1).compute()

## Sharing Intermediate Results

When computing all of the above, we sometimes did the same operation more than once. For most operations, `dask.dataframe` stores the arguments, allowing duplicate computations to be shared and only computed once.

For example, let's compute the mean and standard deviation for departure delay of all non-canceled flights. Since Dask operations are lazy, those values aren't the final results yet. They're just the steps required to get the result.

If you compute them with two calls to compute, there is no sharing of intermediate computations.

In [None]:
non_canceled = ddf[~ddf.Cancelled]
mean_delay = non_canceled.DepDelay.mean()
std_delay = non_canceled.DepDelay.std()

In [None]:
%%time

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

### `dask.compute`

But let's try by passing both to a single `compute` call.

In [None]:
%%time

mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

Using `dask.compute` takes roughly 1/2 the time. This is because the task graphs for both results are merged when calling `dask.compute`, allowing shared operations to only be done once instead of twice. In particular, using `dask.compute` only does the following once:

- the calls to `read_csv`
- the filter (`df[~df.Cancelled]`)
- some of the necessary reductions (`sum`, `count`)

To see what the merged task graphs between multiple results look like (and what's shared), you can use the `dask.visualize` function (you might want to use `filename='graph.pdf'` to save the graph to disk so that you can zoom in more easily):

In [None]:
dask.visualize(mean_delay, std_delay, engine="cytoscape")

### `.persist()`

While using a distributed scheduler (you will learn more about schedulers in the upcoming tutorials), you can keep some _data that you want to use often_ in the _distributed memory_. 

`persist` generates "Futures" (more on this later as well) and stores them in the same structure as your output. You can use `persist` with any data or computation that fits in memory.

If you want to analyze data only for non-canceled flights departing from JFK airport, you can either have two compute calls like in the previous section:

In [None]:
non_cancelled = ddf[~ddf.Cancelled]
ddf_jfk = non_cancelled[non_cancelled.Origin == "JFK"]

In [None]:
%%time
ddf_jfk.DepDelay.mean().compute()
ddf_jfk.DepDelay.sum().compute()

Or, consider persisting that subset of data in memory.

See the "Graph" dashboard plot, the red squares indicate persisted data stored as Futures in memory. You will also notice an increase in Worker Memory (another dashboard plot) consumption.

In [None]:
ddf_jfk = ddf_jfk.persist()  # returns back control immediately

In [None]:
%%time
ddf_jfk.DepDelay.mean().compute()
ddf_jfk.DepDelay.std().compute()

Analyses on this persisted data is faster because we are not repeating the loading and selecting (non-canceled, JFK departure) operations.

To remove this from memory, we can do the following:

In [None]:
del ddf_jfk
# or: client.cancel(ddf_jfk), but this removes any scheduled jobs and all jobs/pointers to this result!
# The proper term instead of "jobs" would be "futures", but we'll get to that later

See https://distributed.dask.org/en/stable/memory.html#clearing-data for more details

## Custom code with Dask DataFrame

`dask.dataframe` only covers a small but well-used portion of the pandas API.

This limitation is for two reasons:

1.  The Pandas API is *huge*
2.  Some operations are genuinely hard to do in parallel, e.g, sorting.

Additionally, some important operations like `set_index` work, but are slower than in pandas because they include substantial shuffling of data, and may write out to disk.

**What if you want to use some custom functions that aren't (or can't be) implemented for Dask DataFrame yet?**

You can open an issue on the [Dask issue tracker](https://github.com/dask/dask/issues) to check how feasible the function could be to implement, and you can consider contributing this function to Dask.

In case it's a custom function or tricky to implement, `dask.dataframe` provides a few methods to make applying custom functions to Dask DataFrames easier:

- [`map_partitions`](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.map_partitions.html): to run a function on each partition (each pandas DataFrame) of the Dask DataFrame
- [`map_overlap`](https://docs.dask.org/en/latest/generated/dask.dataframe.rolling.map_overlap.html): to run a function on each partition (each pandas DataFrame) of the Dask DataFrame, with some rows shared between neighboring partitions
- [`reduction`](https://docs.dask.org/en/latest/generated/dask.dataframe.Series.reduction.html): for custom row-wise reduction operations.

Let's take a quick look at the `map_partitions()` function:

In [None]:
help(ddf.map_partitions)

The "Distance" column in `ddf` is currently in miles. Let's say we want to convert the units to kilometers and we have a general helper function as shown below. In this case, we can use `map_partitions` to apply this function across each of the internal pandas `DataFrame`s in parallel. 

In [None]:
def my_custom_converter(df, multiplier=1):
    return df * multiplier


meta = pd.Series(name="Distance", dtype="float64")

distance_km = ddf.Distance.map_partitions(
    my_custom_converter, multiplier=0.6, meta=meta
)

In [None]:
distance_km.visualize()

In [None]:
distance_km.head()

### What is `meta`?

Since Dask operates lazily, it doesn't always have enough information to infer the output structure (which includes datatypes) of certain operations.

`meta` is a _suggestion_ to Dask about the output of your computation. Importantly, `meta` _never infers with the output structure_. Dask uses this `meta` until it can determine the actual output structure.

Even though there are many ways to define `meta`, we suggest using a small pandas Series or DataFrame that matches the structure of your final output.

It's good practice to always close any Dask cluster you create:

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# Dask Arrays - parallelized numpy
Parallel, larger-than-memory, n-dimensional array using blocked algorithms. 

*  **Parallel**: Uses all of the cores on your computer
*  **Larger-than-memory**:  Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
*  **Blocked Algorithms**:  Perform large computations by performing many smaller computations.


<img src="https://docs.dask.org/en/stable/_images/dask-array.svg" width="40%" align="right">


In other words, Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.

In this notebook, we'll build some understanding by implementing some blocked algorithms from scratch.
We'll then use Dask Array to analyze large datasets, in parallel, using a familiar NumPy-like API.

**Related Documentation**

* [Array documentation](https://docs.dask.org/en/latest/array.html)
* [Array screencast](https://youtu.be/9h_61hXCDuI)
* [Array API](https://docs.dask.org/en/latest/array-api.html)
* [Array examples](https://examples.dask.org/array.html)

In [None]:
# Reuse client
client

## Blocked Algorithms in a nutshell

Let's do side by side the sum of the elements of an array using a NumPy array and a Dask array. 

In [None]:
import numpy as np
import dask.array as da

In [None]:
# NumPy array
a_np = np.ones(10)
a_np

We know that we can use `sum()` to compute the sum of the elements of our array, but to show what a blocksized operation would look like, let's do:

In [None]:
a_np_sum = a_np[:5].sum() + a_np[5:].sum()
a_np_sum

Now notice that each sum in the computation above is completely independent so they could be done in parallel. 
To do this with Dask array, we need to define our "slices", we do this by defining the amount of elements we want per block using the variable `chunks`. 

In [None]:
a_da = da.ones(10, chunks=5)
a_da

**Important!**

Note here that to get two blocks, we specify `chunks=5`, in other words, we have 5 elements per block. 

In [None]:
a_da_sum = a_da.sum()
a_da_sum

## Task Graphs

In general, the code that humans write rely on compilers or interpreters so the computers can understand what we wrote. When we move to parallel execution there is a desire to shift responsibility from the compilers to the human, as they often bring the analysis, optimization, and execution of code into the code itself. In these cases, we often represent the structure of our program explicitly as data within the program itself.

In Dask we use task scheduling, where we break our program into into many medium-sized tasks or units of computation.We represent these tasks as nodes in a graph with edges between nodes if one task depends on data produced by another. We call upon a task scheduler to execute this graph in a way that respects these data dependencies and leverages parallelism where possible, so multiple independent tasks can be run simultaneously.

In [None]:
# visualize the low level Dask graph using cytoscape
a_da_sum.visualize(engine="cytoscape")

In [None]:
a_da_sum.compute()

Performance comparison
------------------------------

Let's try a more interesting example. We will create a 20_000 x 20_000 array with normally distributed values, and take the mean along one of its axis.

**Note:**

If you are running on Binder, the Numpy example might need to be a smaller one due to memory issues. 

### Numpy version 

In [None]:
%%time
xn = np.random.normal(10, 0.1, size=(10_000, 10_000))
yn = xn.mean(axis=0)
yn

### Dask array version

In [None]:
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
xd

In [None]:
xd.nbytes / 1e9  # Gigabytes of the input processed lazily

In [None]:
yd = xd.mean(axis=0)
yd

In [None]:
%%time
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
yd = xd.mean(axis=0)
yd.compute()

**Questions to think about:**

* What happens if the Dask chunks=(10000,10000)?
* What happens if the Dask chunks=(30,30)?

**Exercise:** 

For Dask arrays, compute the mean along `axis=1` of the sum of the x array and its transpose. 

In [None]:
# Your code here

In [None]:
# Solution
x_sum = xd + xd.T
res = x_sum.mean(axis=1)
res.compute()

## Choosing good chunk sizes
This section was inspired on a Dask blog by Genevieve Buckley you can read it [here](https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes)

A common problem when getting started with Dask array is determine what is a good chunk size. But what is a good size, and how do we determine this? 


### Get to know the chunks 

We can think of Dask arrays as a big structure composed by chunks of a smaller size, where these chunks are typically an a single `numpy` array, and they are all arranged to form a larger Dask array. 

If you have a Dask array and want to know more information about chunks and their size, you can use the `chunksize` and `chunks` attributes to access this information. If you are in a jupyter notebook
you can also visualize the Dask array via its HTML representation. 

In [None]:
darr = da.random.random((1000, 1000, 1000))
darr

Notice that when we created the Dask array, we did not specify the `chunks`. Dask has set by default `chunks='auto'` which accommodates ideal chunk  sizes. To learn more on how auto-chunking works you can go to this documentation https://docs.dask.org/en/stable/array-chunks.html#automatic-chunking

`darr.chunksize` shows the largest chunk size. If you expect your array to have uniform chunk sizes this is a a good summary of the chunk size information. But if your array have irregular chunks, `darr.chunks` will show you the explicit sizes of all the chunks along all the dimensions of your dask array.

In [None]:
darr.chunksize

In [None]:
darr.chunks

Let's modify our example to see explore chunking a bit more. We can rechunk our array:

In [None]:
darr = darr.rechunk({0: -1, 1: 100, 2: "auto"})

In [None]:
darr

In [None]:
darr.chunksize

In [None]:
darr.chunks

**Exercise:**

- What does -1 do when specify as the chunk on a certain axis?

### Too small is a problem

If your chunks are too small, the amount of actual work done by every task is very tiny, and the overhead of coordinating all these tasks results in a very inefficient process. 

In general, the dask scheduler takes approximately one millisecond to coordinate a single task. That means we want the computation time to be comparatively large, i.e in the order of seconds. 

Intuitive analogy by Genevieve Buckley:

> Lets imagine we are building a house. It is a pretty big job, and if there were only one worker it would take much too long to build. So we have a team of workers and a site foreman. The site foreman is equivalent to the Dask scheduler: their job is to tell the workers what tasks they need to do.  
Say we have a big pile of bricks to build a wall, sitting in the corner of the building site. If the foreman (the Dask scheduler) tells workers to go and fetch a single brick at a time, then bring each one to where the wall is being built, you can see how this is going to be very slow and inefficient! The workers are spending most of their time moving between the wall and the pile of bricks. Much less time is going towards doing the actual work of mortaring bricks onto the wall.  
Instead, we can do this in a smarter way. The foreman (Dask scheduler) can tell the workers to go and bring one full wheelbarrow load of bricks back each time. Now workers are spending much less time moving between the wall and the pile of bricks, and the wall will be finished much quicker. 

### Too big is a problem

If your chunks are too big, this is also a problem because you will likely run out of memory. You will start seeing in the dashboard that data is being spill to disk and this will lead to performance decrements. 

If we load to much data into memory, Dask workers will start to spill data to disk to avoid crashing. Spilling data to disk will slow things down significantly, because of all the extra read and write operations to disk. This is definitely a situation that we want to avoid, to watch out for this you can look at the worker memory plot on the dashboard. Orange bars are a warning you are close to the limit, and gray means data is being spilled to disk. 

To watch out for this, look at the worker memory plot on the Dask dashboard. Orange bars are a warning you are close to the limit, and gray means data is being spilled to disk - not good! For more tips, see the section on using the Dask dashboard below. To learn more about the memory plot, check the [dashboard documentation](https://docs.dask.org/en/stable/dashboard.html#bytes-stored-and-bytes-per-worker).


### Rules of thumb

- Users have reported that chunk sizes smaller than 1MB tend to be bad. In general, a chunk size between **100MB and 1GB is good**, while going over 1 or 2GB means you have a really big dataset and/or a lot of memory available per worker.
- Upper bound: Avoid very large task graphs. More than 10,000 or 100,000 chunks may start to perform poorly.
- Lower bound: To get the advantage of parallelization, you need the number of chunks to at least equal the number of worker cores available (or better, the number of worker cores times 2). Otherwise, some workers will stay idle.
- The time taken to compute each task should be much larger than the time needed to schedule the task. The Dask scheduler takes roughly 1 millisecond to coordinate a single task, so a good task computation time would be in the order of seconds (not milliseconds).
- Chunks should be aligned with array storage on disk. Modern NDArray storage formats (HDF5, NetCDF, TIFF, Zarr) allow arrays to be stored in chunks so that the blocks of data can be pulled efficiently. However, data stores often chunk more finely than is ideal for Dask array, so it is common to choose a chunking that is a multiple of your storage chunk size, otherwise you might incur high overhead. For example,  if you are loading data that is chunked in blocks of (100, 100), the  you might might choose a chunking strategy more like (1000, 2000) that is larger but still divisible by (100, 100). 

For more more advice on chunking see https://docs.dask.org/en/stable/array-chunks.html

<img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# dask.delayed - parallelize any code

What if you don't have an array or dataframe? Instead of having blocks where the function is applied to each block, you can decorate functions with `@delayed` and _have the functions themselves be lazy_. 

This is a simple way to use `dask` to parallelize existing codebases or build [complex systems](https://blog.dask.org/2018/02/09/credit-models-with-dask). 

**Related Documentation**

* [Delayed documentation](https://docs.dask.org/en/latest/delayed.html)
* [Delayed screencast](https://www.youtube.com/watch?v=SHqFmynRxVU)
* [Delayed API](https://docs.dask.org/en/latest/delayed-api.html)
* [Delayed examples](https://examples.dask.org/delayed.html)
* [Delayed best practices](https://docs.dask.org/en/latest/delayed-best-practices.html)

As we'll see in the [distributed scheduler notebook](05_distributed.ipynb), Dask has several ways of executing code in parallel. We'll use the distributed scheduler by creating a `dask.distributed.Client`. For now, this will provide us with some nice diagnostics. We'll talk about schedulers in depth later.

In [None]:
# Reuse client
client

## A Typical Workflow

Typically if a workflow contains a for-loop it can benefit from delayed. The following example outlines a read-transform-write:

```python
import dask
    
@dask.delayed
def process_file(filename):
    data = read_a_file(filename)
    data = do_a_transformation(data)
    destination = f"results/{filename}"
    write_out_data(data, destination)
    return destination

results = []
for filename in filenames:
    results.append(process_file(filename))
    
dask.compute(results)
```

## Basics

First let's make some toy functions, `inc` and `add`, that sleep for a while to simulate work. We'll then time running these functions normally.

In the next section we'll parallelize this code.

In [None]:
from time import sleep


def inc(x):
    sleep(1)
    return x + 1


def add(x, y):
    sleep(1)
    return x + y

We time the execution of this normal code using the `%%time` magic, which is a special function of the Jupyter Notebook.

In [None]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)

### Parallelize with the `dask.delayed` decorator

Those two increment calls *could* be called in parallel, because they are totally independent of one-another.

We'll make the `inc` and `add` functions lazy using the `dask.delayed` decorator. When we call the delayed version by passing the arguments, exactly as before, the original function isn't actually called yet - which is why the cell execution finishes very quickly.
Instead, a *delayed object* is made, which keeps track of the function to call and the arguments to pass to it.


In [None]:
import dask


@dask.delayed
def inc(x):
    sleep(1)
    return x + 1


@dask.delayed
def add(x, y):
    sleep(1)
    return x + y

In [None]:
%%time
# This runs immediately, all it does is build a graph

x = inc(1)
y = inc(2)
z = add(x, y)

In [None]:
z

This ran immediately, since nothing has really happened yet.

To get the result, call `compute`. Notice that this runs faster than the original code.

In [None]:
%%time
# This actually runs our computation using a local thread pool

z.compute()

## What just happened?

The `z` object is a lazy `Delayed` object.  This object holds everything we need to compute the final result, including references to all of the functions that are required and their inputs and relationship to one-another.  We can evaluate the result with `.compute()` as above or we can visualize the task graph for this value with `.visualize()`.

In [None]:
z

In [None]:
# Look at the task graph for `z`
z.visualize()

Notice that this includes the names of the functions from before, and the logical flow of the outputs of the `inc` functions to the inputs of `add`.

### Some questions to consider:

-  Why did we go from 3s to 2s?  Why weren't we able to parallelize down to 1s?
-  What would have happened if the inc and add functions didn't include the `sleep(1)`?  Would Dask still be able to speed up this code?
-  What if we have multiple outputs or also want to get access to x or y?

## Exercise: Parallelize a for loop

`for` loops are one of the most common things that we want to parallelize.  Use `dask.delayed` on `inc` and `sum` to parallelize the computation below:

In [None]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
%%time
# Sequential code


def inc(x):
    sleep(1)
    return x + 1


results = []
for x in data:
    y = inc(x)
    results.append(y)

total = sum(results)

In [None]:
total

In [None]:
%%time
# Your parallel code here...

In [None]:
# Solution

@dask.delayed
def inc(x):
    sleep(1)
    return x + 1

@dask.delayed
def delay_sum(x):
    return sum(x)

results = []
for x in data:
    y = inc(x)
    results.append(y)

total = delay_sum(results)
print("Before computing:", total)  # Let's see what type of thing total is
result = total.compute()
print("After computing :", result)  # After it's computed

In [None]:
@dask.delayed
def inc(x):
    sleep(1)
    return x + 1

results = []
for x in data:
    y = inc(x)
    results.append(y)

total = sum(results)
print("Before computing:", total)  # Let's see what type of thing total is
result = total.compute()
print("After computing :", result)  # After it's computed

In [None]:
total.visualize()

How does the above graph visualization compare with the given solution? That is, with the `sum` function used directly rather than wrapped with `delayed`? Can you explain the why they are different? You might find the graph visualizations of the following expressions illuminating

```python
inc(1) + inc(2)
# and
sum([inc(1), inc(2)])
```

In [None]:
(inc(1) + inc(2)).visualize()

In [None]:
(sum([inc(1), inc(2)])).visualize()

## Exercise: Parallelize a for-loop code with control flow

Often we want to delay only *some* functions, running a few of them immediately.  This is especially helpful when those functions are fast and help us to determine what other slower functions we should call.  This decision, to delay or not to delay, is usually where we need to be thoughtful when using `dask.delayed`.

In the example below we iterate through a list of inputs.  If that input is even then we want to call `inc`.  If the input is odd then we want to call `double`.  This `is_even` decision to call `inc` or `double` has to be made immediately (not lazily) in order for our graph-building Python code to proceed.

In [None]:
def double(x):
    sleep(1)
    return 2 * x


def is_even(x):
    return not x % 2


data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
%%time
# Sequential code

results = []
for x in data:
    if is_even(x):
        y = double(x)
    else:
        y = inc(x)
    results.append(y)

total = sum(results)
print(total)

In [None]:
%%time
# Your parallel code here...
# TODO: parallelize the sequential code above using dask.delayed
# You will need to delay some functions, but not all

In [None]:
# Solution
@dask.delayed
def double(x):
    sleep(1)
    return 2 * x


results = []
for x in data:
    if is_even(x):  # even
        y = double(x)
    else:  # odd
        y = inc(x)
    results.append(y)

total = dask.delayed(sum)(results)
total.compute()

In [None]:
total.visualize()

### Some questions to consider:

-  What are other examples of control flow where we can't use delayed?
-  What would have happened if we had delayed the evaluation of `is_even(x)` in the example above?
-  What are your thoughts on delaying `sum`?  This function is both computational but also fast to run.

## Exercise: Parallelize a Pandas Groupby Reduction

In this exercise we read several CSV files and perform a groupby operation in parallel.  We are given sequential code to do this and parallelize it with `dask.delayed`.

The computation we will parallelize is to compute the mean departure delay per airport from some historical flight data.  We will do this by using `dask.delayed` together with `pandas`.  In a future section we will do this same exercise with `dask.dataframe`.

### Inspect data

In [None]:
import os

sorted(os.listdir(os.path.join("data", "nycflights")))

### Read one file with `pandas.read_csv` and compute mean departure delay

In [None]:
import pandas as pd

df = pd.read_csv(os.path.join("data", "nycflights", "1990.csv"))
df.head()

In [None]:
# What is the schema?
df.dtypes

In [None]:
# What originating airports are in the data?
df.Origin.unique()

In [None]:
# Mean departure delay per-airport for one year
df.groupby("Origin").DepDelay.mean()

### Sequential code: Mean Departure Delay Per Airport

The above cell computes the mean departure delay per-airport for one year. Here we expand that to all years using a sequential for loop.

In [None]:
from glob import glob

filenames = sorted(glob(os.path.join("data", "nycflights", "*.csv")))

In [None]:
%%time

sums = []
counts = []
for fn in filenames:
    # Read in file
    df = pd.read_csv(fn)

    # Groupby origin airport
    by_origin = df.groupby("Origin")

    # Sum of all departure delays by origin
    total = by_origin.DepDelay.sum()

    # Number of flights by origin
    count = by_origin.DepDelay.count()

    # Save the intermediates
    sums.append(total)
    counts.append(count)

# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights

In [None]:
mean

### Parallelize the code above

Use `dask.delayed` to parallelize the code above.  Some extra things you will need to know.

1.  Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.
    
2.  Calling the `.compute()` method works well when you have a single output.  When you have multiple outputs you might want to use the `dask.compute` function. This way Dask can share the intermediate values.
    
So your goal is to parallelize the code above (which has been copied below) using `dask.delayed`.  You may also want to visualize a bit of the computation to see if you're doing it correctly.

In [None]:
%%time
# your code here

In [None]:
# Solution

# This is just one possible solution, there are
# several ways to do this using `dask.delayed`


@dask.delayed
def read_file(filename):
    # Read in file
    return pd.read_csv(filename)


sums = []
counts = []
for fn in filenames:
    # Delayed read in file
    df = read_file(fn)

    # Groupby origin airport
    by_origin = df.groupby("Origin")

    # Sum of all departure delays by origin
    total = by_origin.DepDelay.sum()

    # Number of flights by origin
    count = by_origin.DepDelay.count()

    # Save the intermediates
    sums.append(total)
    counts.append(count)

# Combine intermediates to get total mean-delay-per-origin
total_delays = sum(sums)
n_flights = sum(counts)
mean, *_ = dask.compute(total_delays / n_flights)

In [None]:
(total_delays / n_flights).visualize(engine="cytoscape")

In [None]:
# ensure the results still match
mean

### Some questions to consider:

- How much speedup did you get? Is this how much speedup you'd expect?
- Experiment with where to call `compute`. What happens when you call it on `sums` and `counts`? What happens if you wait and call it on `mean`?
- Experiment with delaying the call to `sum`. What does the graph look like if `sum` is delayed? What does the graph look like if it isn't?
- Can you think of any reason why you'd want to do the reduction one way over the other?

### Learn More

Visit the [Delayed documentation](https://docs.dask.org/en/latest/delayed.html). In particular, this [delayed screencast](https://www.youtube.com/watch?v=SHqFmynRxVU) will reinforce the concepts you learned here and the [delayed best practices](https://docs.dask.org/en/latest/delayed-best-practices.html) document collects advice on using `dask.delayed` well.

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# Distributed - spread your data and computation across a cluster

As we covered at the beginning Dask has the ability to run work on multiple machines using the distributed scheduler.

Until now we have actually been using the distributed scheduler for our work, but just on a single machine.

When we instantiate a `Client()` object with no arguments it will attempt to locate a Dask cluster. It will check your local Dask config and environment variables to see if connection information has been specified. If not it will create an instance of `LocalCluster` and use that.

*Specifying connection information in config is useful for system administrators to provide access to their users. We do this in the [Dask Helm Chart for Kubernetes](https://github.com/dask/helm-chart/blob/master/dask/templates/dask-jupyter-deployment.yaml#L46-L48), the chart installs a multi-node Dask cluster and a Jupyter server on a Kubernetes cluster and Jupyter is preconfigured to discover the distributed cluster.*

## Local Cluster

Let's explore the `LocalCluster` object ourselves and see what it is doing.

In [None]:
from dask.distributed import LocalCluster, Client

In [None]:
# Remove our original client + cluster
client.shutdown()
del(client)

cluster = LocalCluster()
cluster

Creating a cluster object will create a Dask scheduler and a number of Dask workers. If no arguments are specified then it will autodetect the number of CPU cores your system has and the amount of memory and create workers to appropriately fill that.

You can also specify these arguments yourself. Let's have a look at the docstring to see the options we have available.

*These arguments can also be passed to `Client` and in the case where it creates a `LocalCluster` they will just be passed on down the line.*

In [None]:
?LocalCluster

Our cluster object has attributes and methods which we can use to access information about our cluster. For instance we can get the log output from the scheduler and all the workers with the `get_logs()` method.

In [None]:
cluster.get_logs()

We can access the url that the Dask dashboard is being hosted at.

In [None]:
cluster.dashboard_link

In order for Dask to use our cluster we still need to create a `Client` object, but as we have already created a cluster we can pass that directly to our client.

In [None]:
client = Client(cluster)
client

In [None]:
# Shutdown for next example
client.shutdown()
del client, cluster

## Remote clusters via SSH

A common way to distribute your work onto multiple machines is via SSH. Dask has a cluster manager which will handle creating SSH connections for you called `SSHCluster`.

```python
from dask.distributed import SSHCluster
```

When constructing this cluster manager we need to pass a list of addresses, either hostnames or IP addresses, which we will SSH into and attempt to start a Dask scheduler or worker on.

```python
cluster = SSHCluster(["localhost", "hostA", "hostB"])
cluster
```

When we create our `SSHCluster` object we have given a list of three hostnames.

The first host in the list will be used as the scheduler, all other hosts will be used as workers. If you're on the same network it wouldn't be unreasonable to set your local machine as the scheduler and then use other machines as workers.

If your servers are remote to you, in the cloud for instance, you may want the scheduler to be a remote machine too to avoid network bottlenecks.

## Scalable clusters

Both of the clusters we have seen so far are fixed size clusters. We are either running locally and using all the resources in our machine, or we are using an explicit number of other machines via SSH.

With some cluster managers it is possible to increase and descrease the number of workers either by calling `cluster.scale(n)` in your code where `n` is the desired number of workers. Or you can let Dask do this dynamically by calling `cluster.adapt(minimum=1, maximum=100)` where minimum and maximum are your preferred limits for Dask to abide to.

It is always good to keep your minimum to at least 1 as Dask will start running work on a single worker in order to profile how long things take and extrapolate how many additional workers it thinks it needs. Getting new workers may take time depending on your setup so keeping this at 1 or above means this profilling will start immediately.

We currently have cluster managers for [Kubernetes](https://kubernetes.dask.org/en/latest/), [Hadoop/Yarn](https://yarn.dask.org/en/latest/), [cloud platforms](https://cloudprovider.dask.org/en/latest/) and [batch systems including PBS, SLURM and SGE](http://jobqueue.dask.org/en/latest/).

These cluster managers allow users who have access to resources such as these to bootstrap Dask clusters on to them. If an institution wishes to provide a central service that users can request Dask clusters from there is also [Dask Gateway](https://gateway.dask.org/).

In [None]:
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(cores=8,
                       memory="16GB",
                       walltime="01:00:00",
                       queue="normal",
                       interface="ib0")

In [None]:
cluster

### Scaling
You may have noticed that we have zero workers. Let's ask for one worker. This will give us the resources we requested in the `SLURMCluster` call

In [None]:
cluster.scale(1)
client = Client(cluster)
client

In [None]:
# We could also do this adaptively with `cluster.adapt`
cluster.adapt?

Dask will automatically write and submit a SLURM jobscript for you

In [None]:
print(cluster.job_script())

As an example, let's compute the same mean of the large array from earlier using our SLURM cluster

In [None]:
%%time
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
yd = xd.mean(axis=0)
yd.compute()

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# Futures - non-blocking distributed calculations

Submit arbitrary functions for computation in a parallelized, eager, and non-blocking way. 

The `futures` interface (derived from the built-in `concurrent.futures`) provide fine-grained real-time execution for custom situations. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with `submit()` and `map()`. The call returns immediately, giving one or more *futures*, whose status begins as "pending" and later becomes "finished". There is no blocking of the local Python session.

This is the important difference between futures and delayed. Both can be used to support arbitrary task scheduling, but delayed is lazy (it just constructs a graph) whereas futures are eager. With futures, as soon as the inputs are available and there is compute available, the computation starts. 

**Related Documentation**

* [Futures documentation](https://docs.dask.org/en/latest/futures.html)
* [Futures screencast](https://www.youtube.com/watch?v=07EiCpdhtDE)
* [Futures examples](https://examples.dask.org/futures.html)

In [None]:
# Reuse client
client

## A Typical Workflow

This is the same workflow that we saw in the delayed notebook. It is for-loopy and the data is not necessarily an array or a dataframe. The following example outlines a read-transform-write:

```python
def process_file(filename):
    data = read_a_file(filename)
    data = do_a_transformation(data)
    destination = f"results/{filename}"
    write_out_data(data, destination)
    return destination

futures = []
for filename in filenames:
    future = client.submit(process_file, filename)
    futures.append(future)
    
futures
```

## Basics

Just like we did in the delayed notebook, let's make some toy functions, `inc` and `add`, that sleep for a while to simulate work. We'll then time running these functions normally.

In [None]:
from time import sleep


def inc(x):
    sleep(1)
    return x + 1


def double(x):
    sleep(2)
    return 2 * x


def add(x, y):
    sleep(1)
    return x + y

We can run these locally

In [None]:
import dask.array as da
rng = da.random.default_rng()
vals = rng.standard_normal(10)

Or we can submit them to run remotely with Dask. This immediately returns a future that points to the ongoing computation, and eventually to the stored result.

In [None]:
future = client.submit(inc, 1)  # returns immediately with pending future
future

If you wait a second, and then check on the future again, you’ll see that it has finished.

In [None]:
future

You can block on the computation and gather the result with the `.result()` method.

In [None]:
future.result()

#### Other ways to wait for a future
```python
from dask.distributed import wait, progress
progress(future)
```

shows a progress bar in *this* notebook, rather than having to go to the dashboard. This progress bar is also asynchronous, and doesn't block the execution of other code in the meanwhile.

```python
wait(future)
```
blocks and forces the notebook to wait until the computation pointed to by `future` is done. However, note that if the result of `inc()` is sitting in the cluster, it would take **no time** to execute the computation now, because Dask notices that we are asking for the result of a computation it already knows about. More on this later.

#### Other ways to gather results
```python
client.gather(futures)
```

gathers results from more than one future.

## `client.compute`

Generally, any Dask operation that is executed using `.compute()` or `dask.compute()` can be submitted for asynchronous execution using `client.compute()` instead.

Here is an example from the delayed notebook:

In [None]:
import dask
import dask.array as da
rng = da.random.default_rng()
vals = rng.standard_normal(10)


@dask.delayed
def inc(x):
    sleep(1)
    return x + 1


@dask.delayed
def add(x, y):
    sleep(1)
    return x + y


x = inc(1)
y = inc(2)
z = add(x, y)

So far we have a regular `dask.delayed` output. When we pass `z` to `client.compute` we get a future back and Dask starts evaluating the task graph. 

In [None]:
# notice the difference from z.compute()
# notice that this cell completes immediately
future = client.compute(z)
future

In [None]:
future.result()  # waits until result is ready

When using futures, the *computation moves to the data* rather than the other way around, and the client, in the local Python session, need never see the intermediate values.

## `client.submit`

`client.submit` takes a function and arguments, pushes these to the cluster, returning a `Future` representing the result to be computed. The function is passed to a worker process for evaluation. This looks a lot like doing `client.compute()`, above, except now we are passing the function and arguments directly to the cluster.

In [None]:
def inc(x):
    sleep(1)
    return x + 1


future_x = client.submit(inc, 1)
future_y = client.submit(inc, 2)
future_z = client.submit(sum, [future_x, future_y])
future_z

In [None]:
future_z.result()  # waits until result is ready

The arguments to`client.submit` can be regular Python functions and objects, futures from other submit operations or `dask.delayed` objects.

### How does it work?

Each future represents a result held, or being evaluated by the cluster. Thus we can control caching of intermediate values - when a future is no longer referenced, its value is forgotten. In the solution, above, futures are held for each of the function calls. These results would not need to be re-evaluated if we chose to submit more work that needed them.

We can explicitly pass data from our local session into the cluster using `client.scatter()`, but usually it is better to construct functions that do the loading of data within the workers themselves, so that there is no need to serialize and communicate the data. Most of the loading functions within Dask, such as `dd.read_csv`, work this way. Similarly, we normally don't want to `gather()` results that are too big in memory.

## Example: Sporadically failing task

Let's imagine a task that sometimes fails. You might encounter this when dealing with input data where sometimes a file is malformed, or maybe a request times out.

In [None]:
def flaky_inc(i):
    val = np.random.random(1)
    print(val)
    if val < 0.5:
        raise ValueError(f"You hit the error {val=}!")
    return i + 1

If you run this function over and over again, it will sometimes fail. 

```python
>>> flaky_inc(2)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Input In [65], in <cell line: 1>()
----> 1 flaky_inc(2)

Input In [61], in flaky_inc(i)
      3 def flaky_inc(i):
      4     if random() < 0.5:
----> 5         raise ValueError("You hit the error!")
      6     return i + 1

ValueError: You hit the error!
```

We can run this function on a range of inputs using `client.map`.

In [None]:
futures = client.map(flaky_inc, range(10))

Notice how the cell returned even though some of the computations failed. We can inspect these futures one by one and find the ones that failed:

In [None]:
for i, future in enumerate(futures):
    print(i, future.status)

You can rerun those specific futures to try to get the task to successfully complete:

In [None]:
futures[3].retry()

In [None]:
for i, future in enumerate(futures):
    print(i, future.status)

A more concise way of retrying in the case of sporadic failures is by setting the number of retries in the `client.compute`, `client.submit` or `client.map` method.

**Note**: In this example we also need to set `pure=False` to let Dask know that the arguments to the function do not totally determine the output.

In [None]:
futures = client.map(flaky_inc, range(10), retries=5, pure=False)
future_z = client.submit(sum, futures)
future_z.result()

You will see a lot of warnings, but the computation should eventually succeed.

In [None]:
for i, future in enumerate(futures):
    print(i, future.status)

## Why use Futures?

The futures API offers a work submission style that can easily emulate the map/reduce paradigm. If that is familiar to you then futures might be the simplest entrypoint into Dask. 

The other big benefit of futures is that the intermediate results, represented by futures, can be passed to new tasks without having to pull data locally from the cluster. New operations can be setup to work on the output of previous jobs that haven't even begun yet.