


**Author:** Steffen Schober



## Acknowlegment



This notebook is based on the DASK tutorial.



## Prepare Data



Prepare the data, make sure that `prep.py` is the same directory than this notebook.



In [1]:
%run prep.py -d flights

In [2]:
%run prep.py -d accounts

## Setup



In [3]:
import dask
from dask.distributed import Client

client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 15.61 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45545,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 15.61 GiB

0,1
Comm: tcp://127.0.0.1:44977,Total threads: 3
Dashboard: http://127.0.0.1:37417/status,Memory: 3.90 GiB
Nanny: tcp://127.0.0.1:44201,
Local directory: /tmp/dask-scratch-space/worker-lhj8rlxl,Local directory: /tmp/dask-scratch-space/worker-lhj8rlxl

0,1
Comm: tcp://127.0.0.1:44765,Total threads: 3
Dashboard: http://127.0.0.1:37175/status,Memory: 3.90 GiB
Nanny: tcp://127.0.0.1:40205,
Local directory: /tmp/dask-scratch-space/worker-gl9ddnjm,Local directory: /tmp/dask-scratch-space/worker-gl9ddnjm

0,1
Comm: tcp://127.0.0.1:45443,Total threads: 3
Dashboard: http://127.0.0.1:39599/status,Memory: 3.90 GiB
Nanny: tcp://127.0.0.1:42631,
Local directory: /tmp/dask-scratch-space/worker-qwrmaijm,Local directory: /tmp/dask-scratch-space/worker-qwrmaijm

0,1
Comm: tcp://127.0.0.1:37467,Total threads: 3
Dashboard: http://127.0.0.1:44139/status,Memory: 3.90 GiB
Nanny: tcp://127.0.0.1:40631,
Local directory: /tmp/dask-scratch-space/worker-jl9nl__u,Local directory: /tmp/dask-scratch-space/worker-jl9nl__u


You can access the dashboard using your web browser, the linke is also found here:



In [4]:
print(client.cluster.dashboard_link)

http://127.0.0.1:8787/status


Explore the dashboard, you can find a lot of information there.
Note that under `Info` you find information about
the TCP endpoint of the scheduler (you can use this to connect to the cluster via the `Client`.).



## First Example - Loading CSV file



In [5]:
import os
import dask
filename = os.path.join('data', 'accounts.*.csv')
filename

'data/accounts.*.csv'

In [6]:
import dask.dataframe as dd
df = dd.read_csv(filename)
df.head()

2023-10-28 12:03:37,768 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/tkandemir/miniconda3/envs/adm/lib/python3.11/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "/home/tkandemir/miniconda3/envs/adm/lib/python3.11/site-packages/msgpack/fallback.py", line 136, in unpackb
    raise ExtraData(ret, unpacker._get_extradata())
msgpack.exceptions.ExtraData: unpack(b) received extra data.
2023-10-28 12:03:37,771 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/home/tkandemir/miniconda3/envs/adm/lib/python3.11/site-packages/distributed/core.py", line 924, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/home/tkandemir/miniconda3/envs/adm/lib/python3.11/site-packages/distributed/scheduler.py", line 5449, in add_client
    await self.handle_stream(comm=comm, extr

CancelledError: ('head-1-5-read-csv-72a064c11098e1c7ee6f777aff1b56ac', 0)

In [None]:
# load and count number of rows
len(df)

## Flights Data Set



In [None]:
# load and count number of rows
df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
  	       parse_dates={'Date': [0, 1, 2]},
  	       dtype={'TailNum': str,
  		      'CRSElapsedTime': float,
  		      'Cancelled': bool}
)
df

Notice that the representation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.
We enforce the dtype for three columns, because those do not contain data in the first rows, hence,
type inference will fail&#x2026; (you can check this by omitting the `dtype` in `read_csv()`).



In [None]:
df.dtypes

In [None]:
df.head()

Unlike `pandas.read_csv` which reads in the entire file before inferring datatypes,
`dask.dataframe.read_csv` only reads in a sample from the beginning of the file (or first file if using a glob).
These inferred datatypes are then enforced when reading all partitions.



### Some Analysis



We compute the maximum of the `DepDelay` column. With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums

    maxes = []
    for fn in filenames:
        df = pd.read_csv(fn)
        maxes.append(df.DepDelay.max())
    
    final_max = max(maxes)

We could wrap that `pd.read_csv` with `dask.delayed` so that it runs in parallel.
Regardless, we’re still having to think about loops, intermediate results (one per file) and the final reduction (max of the intermediate maxes).

    df = pd.read_csv(filename, dtype=dtype)
    df.DepDelay.max()

`dask.dataframe` lets us write pandas-like code, that operates on larger than memory datasets in parallel.
Here we compute the max of `DepDelay`:



In [None]:
%time df.DepDelay.max().compute()

Let's visualize the graph:



In [None]:
# notice the parallelism
df.DepDelay.max().visualize()

## Exercises



Try to answer the following questions:

1.  How many rows are in our dataset?
2.  In total, how many non-canceled flights were taken?
3.  In total, how many non-cancelled flights were taken from each airport?
4.  What day of the week has the worst average departure delay?

Hint for the third question:
use `groupby` with the aggregate function `count`.
See [https://pandas.pydata.org/pandas-docs/stable/groupby.html](https://pandas.pydata.org/pandas-docs/stable/groupby.html).



## Sharing Intermediate Results



When computing all of the above, we sometimes did the same operation more than once.
For most operations, `dask.dataframe` hashes the arguments, allowing duplicate computations to be shared, and only computed once.

For example, lets compute the mean and standard deviation for departure delay of all non-canceled flights.
Since dask operations are lazy, those values aren’t the final results yet. They’re just the recipe required to get the result.

If we compute them with two calls to compute, there is no sharing of intermediate computations.



In [None]:
non_cancelled = df[~df.Cancelled]
mean_delay = non_cancelled.DepDelay.mean()
std_delay = non_cancelled.DepDelay.std()

In [None]:
%%time

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

But let’s try by passing both to a single compute call.



In [None]:
%%time
mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

The task graphs for both results are merged when calling `dask.compute`, allowing shared operations to only be done once instead of twice.

