# Lesson 2: Dask DataFrames

## Creating `dask` `DataFrames`

* read_table/read_csv
* read_parquet
* read_json
* from_pandas
* read_sql_table


* from_delayed
* from_dask_array
* `Bag.to_dataframe`

If you're using Dask, it's likely your data does not fit in memory.

## Lazily read the taxi data

In [1]:
import dask.dataframe as dd

In [2]:
from dask.distributed import Client
client = Client()

In [3]:
trips = dd.read_parquet("data/taxi-small/trips.parq/")

Check the `read_parquet` parameters from time-to-time. Due to ongoing `pyarrow` improvements, parquet functionality is changing rapidly!

In [4]:
trips.npartitions

31

In [5]:
trips.head(compute=False)

Unnamed: 0_level_0,medallion_id,hack_license_id,rate_code,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2013-12-01,uint16,uint16,uint8,uint32,uint32,float64,float64,float64
2013-12-02,...,...,...,...,...,...,...,...


We now have 32 tasks (31 partitions + 1 `head` task) that have not yet been evaluated.

## Selecting data

### Selection on the index is fast

In [6]:
xmas = trips.loc["2013-12-25", :]

In [7]:
%time xmas_df = xmas.compute()

CPU times: user 23.8 ms, sys: 5.2 ms, total: 29 ms
Wall time: 443 ms


In [8]:
xmas['trip_distance'].mean().compute()

3.154237343317597

In [9]:
first_half = trips.loc["2013-12-01":"2013-12-15", :]
first_half["trip_distance"].mean().compute()

2.9317822294093703

### Filter conditions parallelized across partitions

In [10]:
trips[trips.passenger_count > 4].shape[0].compute()

33249

In [None]:
# TODO: Show how, when brought back to the driver, can plot

### `iloc` doesn't work

Dask does not keep track of partition length [(link)](https://docs.dask.org/en/latest/dataframe-indexing.html#positional-indexing).

In [None]:
try:
    trips.iloc[:5, :]
except NotImplementedError as e:
    print(e)

## Groupby/Apply

Depending on how you're grouping, dask can take a slow or fast path.

Dask can efficiently perform split-apply-combinations the following conditions:

* group-on-index -> aggregate or apply
  * group-on-index + other columns -> aggregate or apply
* group -> aggregate

### Fast Groupby Aggregate/Apply

In [11]:
trips.index.name

'pickup_datetime'

In [49]:
hack_avg = trips.groupby('hack_license_id')['trip_distance'].mean().compute()

hack_license_id
6     2.368889
7     2.989091
8     1.785000
15    4.947143
16    3.150667
Name: trip_distance, dtype: float64

Grouping along the index (or a computation on the index), is fast.

In [26]:
trips_1d_v1 = trips.groupby(trips.index.dt.date).size().compute()

CPU times: user 89.8 ms, sys: 7.26 ms, total: 97.1 ms
Wall time: 258 ms


In [16]:
trips_1d_v2 = trips.resample('1D').size().compute()

CPU times: user 110 ms, sys: 9.81 ms, total: 120 ms
Wall time: 228 ms


### Index + other columns is still fast

In [35]:
%%time 
trips_1w = (trips.groupby([trips.index.dt.weekofyear, 'rate_code'])
    .mean().compute())

CPU times: user 197 ms, sys: 15 ms, total: 212 ms
Wall time: 425 ms


In [37]:
# you can switch the order in this case and perf is the same
# trips.groupby(['rate_code', trips.index.dt.weekofyear]).mean().compute()

TODO: Show pd.grouper

### Fast apply

In [43]:
(trips.groupby(trips.index.dt.weekofyear)
    .apply(lambda df: df[['trip_time_in_secs', 'trip_distance']].corr())
    .compute())

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  (trips.groupby(trips.index.dt.weekofyear)


ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.

### Slow aggregation

In [None]:
trips.groupby(['pickup', 'x']).apply()

## Using the `index`

In [None]:
# fast
dd.merge(df1, df2, left_index=True, right_index=True)

In [None]:
# fast
dd.merge(df1, df2, on=['idx', 'x']) # idx is index for both

## Querying the data

## Merge/Join

In [None]:
# fast
dd.merge(df1, df2, left_index=True, right_index=True)

In [None]:
# fast
dd.merge(df1, df2, on=['idx', 'x']) # idx is index for both

In [None]:
# join against another DataFrame
dd.merge(df1, df2, on='id')

## General computations

In [None]:
df1.x + df2.y # fast

In [None]:
df.rolling()

In [None]:
df.where(df.x > 5, np.nan) # keep > 5 only, others nan
df.mask(df.5 < 5) # < 5 is nan

## Optimizing

Use categories if possible

## Don't do the shuffle

In [None]:
df.set_index(df.x)

In [None]:
dd.merge(df1, df2, on='not_index')

## Reshape/Pivot

* get_dummies
* pivot_table
* melt

## Dask Specific

df.map_partitions(

In [None]:
repartition(divisions, npartitions, freq, partition_size) # one of these

In [None]:
df.random_split([0.8, 0.2])

In [None]:
df.rolling.map_overlap

# https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.rolling.map_overlap

### Series

In [None]:
map_overlap

In [None]:
nunique_approx

`split_every`: Group partitions into groups of this size while performing a tree-reduction. If set to False, no tree-reduction will be used. Default is 8.

## Save Dataframes

* to_csv
* to_parquet
* to_hdf
* to_json

### From Dask DataFrames
* to_delayed
* to_records
* to_bag
