# Analytics with Dask Dataframe

In [None]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='700MiB')
client = Client(cluster)
client

Next, we'll use Dask dataframe to access the data

In [None]:
import dask.dataframe as ddf

loans = ddf.read_parquet('data/checkouts-micro')
loans

__This looks a bit different from a Pandas dataframe ... so:__

## What *is* a Dask dataframe?

A Dask dataframe is a (virtual) collection of Pandas dataframes, divided along the index. You can picture it like this:

<img src='images/dask-dataframe.svg' width=400>

The smaller Pandas dataframes which make up the larger, virtual Dask dataframe, are called *partitions*

So, at the top of the following output, the label __npartitions=__ refers to the number of constituent Pandas dataframes. You'll notice that Dask automatically chose a number of partitions to use, although you can customize that if you want to.

In [None]:
loans

You won't often need to interact with individual partitions, but you can if you need to:

In [None]:
loans.partitions[2]

Wait ... I thought you said the partition was a __Pandas__ dataframe!

It is ... but we haven't computed it yet.

In order to minimize extra computation, data movement, memory, and time, Dask's data structures try to be *lazy*

This allows them to optimize their operation: for example, maybe you end up needing just 2 columns out of a 900-column-wide table ... in that case, it makes sense to see what's really needed before loading all of the data

When we want to materialize a local, Python object, we add `.compute()` to API call

So, to tell Dask that we want to load up that partition locally, we could type

In [None]:
loans.partitions[2].compute()

That looks a lot like Pandas output, but we can check to be sure:

In [None]:
type(loans.partitions[2].compute())

We can use some "regular" Python operators as well

In [None]:
len(loans)

## Wait, wait ... I thought you just said I needed `.compute()`!

#### __When do I need `.compute()` and how can I tell?__

__Do call__ `.compute()` when you want a full Pandas object -- Dataframe or Series -- calculated for you *and* you want it loaded up in your local Python process (where your `Client` object lives).

This is typical for small, report-type outputs, like we'll do later in this notebook.

__Don't__ call `.compute()` on a huge Dask dataframe, because it likely won't fit in local memory anyway

__Don't__ call `.compute()` if the goal is to write out a large dataset (perhaps one that you've transformed) to disk. There are APIs for doing that directly from the cluster, in parallel, so that your local process doesn't have to deal with all that data.

__Don't__ call `.compute()` if there are simpler APIs designed for human, interactive consumption that might be more efficient, like `.head()` or `len()`

Having a local result is convenient, but if we are generating large results, we may want (or need) to produce output in parallel to the filesystem, instead. 

There are writing counterparts to read methods which we can use:

- `read_csv` \ `to_csv`
- `read_hdf` \ `to_hdf`
- `read_json` \ `to_json`
- `read_parquet` \ `to_parquet`
- and more ... you can bookmark https://docs.dask.org/en/latest/dataframe-api.html for the full API

## What can I do with Dask dataframe and how do I do it?

If you're used to Pandas, it takes a little adjustment to get used to working with data without seeing all those nice rows and columns on the screen. But most of the operations you're used to -- selecting and transforming columns, filtering rows, grouping and aggregating -- still work.

In our first library task, we need to throw out the __Publisher__ column as well as "old" data.

First, let's drop the columns

In [None]:
loans2 = loans.drop(columns=['Publisher'])
loans2

But what is "old" data? Let's find all of the years in the dataset

In [None]:
loans2['CheckoutYear'].unique()

Hmm... this is a "lazy" Dask Series, but we really want the actual, concrete Series of unique years.
We know this will be a small collection, so __it's time for `.compute()`__

In [None]:
loans.CheckoutYear.unique().compute()

In [None]:
loans2['CheckoutType'].unique().compute()

Let's keep records from 2010 onward, and we omit the 2020 data since it's ... anomalous. 

So we can filter that whole dataset using a Pandas-style filter

In [None]:
loans3 = loans2[(loans2['CheckoutYear'] >= 2010) & (loans2['CheckoutYear'] < 2020)]
len(loans3)

But it's more *memory* efficient to use Pandas' `query`

In [None]:
loans3 = loans3.query('CheckoutYear >= 2010 & CheckoutYear < 2020')
len(loans3)

Next, we've been asked to *drop all incomplete records* and then write out the cleaned, post-2010 dataset in Apache Parquet format, *partitioned by CheckoutYear*.

>
> __Aside: Why Apache Parquet?__
>
> Apache Parquet is one of the most popular, efficient, and performant formats for large-scale structured data. 
>
> Why? Because Parquet is a compressed, self-describing, binary *columnar* data format, which means that each column is stored apart from the others. So when we need to query just a few columns in a wide table, we can physically access just the ones we need on disk. The fastest data to process is the data you never load in the first place!
>
> Moreover, if we know what sorts of queries we will need to perform, we can *partition* by those values on disk as well. In our case, since we're partitioning by CheckoutYear, if we subsequently need records from 2016, we can access those and only those directly from the disk.
>

Let's `dropna()` and write out our data

In [None]:
loans3.dropna().to_parquet('cleaned-loans', write_index=False)

__Progress Dashboard__

That seems to take a little while (well, a few seconds, at least). Let's open another dashboard view that will let us track progress.

From the Dask dashboard palette, click `Progress` and drag that to snap at the bottom of the JupyterLab window.

<img src='images/progress.png' width=901>

You should see several colored progress bars. The colors correspond to specific functions being run (when those are functions you've defined, they'll show your function names; in this case, they're function names from the Dask dataframe library).

Let's look at trends in physical vs. digital loans ("UsageClass")

In [None]:
report = ddf.read_parquet('cleaned-loans/').groupby(['CheckoutYear', 'UsageClass']).agg({'Checkouts': 'sum'}).compute()
report

That may have stalled ... when Dask just stalls, it is usually a memory starvation issue, and the first things to try are 
* smaller partitions
* more memory for your workers

In [None]:
report = ddf.read_parquet('cleaned-loans/').repartition(50).groupby(['CheckoutYear', 'UsageClass']).agg({'Checkouts': 'sum'}).compute()
report

And since this is just a Pandas dataframe, we can plot it

In [None]:
report.unstack(level=1).plot()

__Task Stream Dashboard__

Let's look at one more dashboard. From the palette, choose Task Stream, and snap it somewhere convenient.

With the Task Stream open, re-run the previous reports. Zoom in to where you can see individual tasks across your cluster cores -- color coded to match the other views like Progress -- as well as time spent transferring data (the red "overlay" boxes")

<img src='images/taskstream.png' width=901>

This sort of X-ray vision into what's happening in the cluster makes tuning and troubleshooting a lot easier than doing so with log messages and summary stats.

## Additional key features of Dask and Dask dataframe

### Caching

One benefit of using a cluster is having more processing power (cores). But equally valuable is having an expanded pool of memory: for example, most of us don't have 250GB of RAM in our laptop, while even a small cluster is likely to have that much memory available.

To materialize a Dask dataframe (or any Delayed object) in the distributed RAM of the cluster, we use the `.persist()` API

`.persist` is not lazy: it immediately starts working ... but it returns a Delayed right away because we work is not done yet. So we still get a token or handle. And, actually, a token is what we want: the whole point is that we want the big data in the cluster, not in our local Python runtime!

In [None]:
loans_cached = ddf.read_parquet('cleaned-loans').persist()

Now we can run some queries or transformations over the data in memory... or can we?
How do we know if the data is loaded up yet?

There are several ways!

First, we can look at the __Graph Dashboard__: from the dashboard palette, click "Graph"

Each Task (delayed Python function) gets a little square, and the key explains the color coding: red boxes are tasks whose result is stored in memory. 

For a big job (and a huge graph), we can watch the boxes turn red in real time ... a sort of RAM-storage progress bar.

We can also access the information programmatically.

In [None]:
dask.distributed.futures_of(loans_cached)

A Future is another kind of handle (similar to Promises in some languages) representing a task that was started but may not have finished (or may have failed altogether). In this example, we can see that each of the Futures is `finished`. 

We can also wait for all of them:

In [None]:
dask.distributed.wait(loans_cached)

Our queries should be faster with the data in memory

In [None]:
%%time

loans_cached.mean().compute()

Compare to the non-cached timing:

In [None]:
%%time

ddf.read_parquet('cleaned-loans').mean().compute()

In practice, your speedups will depend on how expensive the I/O is relative to the computation. 

* The slower, larger, and more distant and more expensive it is to process the source data, the more of an improvement you'll see.

* On the other hand, the more expensive and complex your computation is, the less improvement you'll see.

### Custom Functions with `.apply`

Often, you'll want to apply your own logic to data in a Dask dataframe. Like Pandas, Dask supports the `.apply` method to run your own code over rows of data.

In [None]:
def my_custom_length(field):
    return len(field)

loans_cached.Title.apply(my_custom_length)

It runs, but does suggest we add some schema information to help out.

In [None]:
loans_cached.Title.apply(my_custom_length, meta=('special_feature', 'int64')).head()

We can also apply to rows, allowing us to perform calculations or transformations depending on multiple columns

In [None]:
def my_combo_length(fields):
    return len(fields[0]) + len(fields[1])

loans_cached[['Title', 'Subjects']].dropna().apply(my_combo_length, axis=1, meta=(None, 'int64')).head()

Like Pandas, we can assign arbitrary columns to our dataframe:

In [None]:
my_special_feature = loans_cached.Title.apply(my_custom_length, meta=('special_feature', 'int64'))
loans_cached['my_special_feature'] = my_special_feature

loans_cached.head()

Custom functions are also supported for aggregations and rolling ("window") computations.

### Quirks and limitations

As you've probably noticed, Dask dataframe implements a lot of the Pandas API. At the same time, there are also some quirks to get used to (e.g., the schema hints we just provided) as well as functionality that is not implemented ... at least not yet.

You can refer to the docs to see which APIs are implemented differently (or not at all). But another approach is to try your planned computation (based on Pandas knowledge) on a small subset of your date -- in a non-destructive way -- and see if it runs and the results check out. Different users will likely prefer one or the other of these tactices.

### Best practices

Some additional best practices for working with Dask dataframe as well as patterns/anti-patterns are documented here
* https://docs.dask.org/en/latest/dataframe-best-practices.html
* https://docs.dask.org/en/latest/dataframe.html#common-uses-and-anti-uses

Common scenarios are explained in the docs, including...
* Shuffles https://docs.dask.org/en/latest/dataframe-groupby.html
* Joins https://docs.dask.org/en/latest/dataframe-joins.html
* Categorical types https://docs.dask.org/en/latest/dataframe-categoricals.html

And if you're curious about how it all works, a design description of the internals is at https://docs.dask.org/en/latest/dataframe-design.html ... from there you can take a look at source if you'd still like see more.

### Swappable partition dataframe implementations and RAPIDS cuDF

Since Dask dataframe is architected around proxying to Pandas dataframes ...
and Python allows us to swap in alternative objects, provided they implement the same protocol or interface ("duck typing") ...
we can use Dask with other dataframe implementations.

Most notably, this support scalable GPU-based dataframes but placing Dask on top of cuDF dataframes in NVIDIA RAPIDS
* https://docs.rapids.ai/api/cudf/stable/10min.html#
* https://docs.rapids.ai/api/cudf/stable/dask-cudf.html

In [None]:
client.close()
cluster.close()