# Dask DataFrame

## Notebook Objectives
* **Download NYC Yellow Taxi Cab Dataset for 2019**.
* **Reading and working with tabular data using pandas**, a popular library for data analysis.
* **Reading and working with tabular data using Dask DataFrame** - an interface to scale pandas code, and a look at **Dask Dashboards** for real-time visualization of the state of your cluster.
* **Scaling Dask computation to the Cloud** using Coiled, a deployment-as-a-service library for scaling Python. (Optional)
* **Limitations of Dask DataFrame**.
* **References** for further reading.

## Download NYC Yellow Taxi Cab Dataset for 2019

A typical data science workflow starts with some data that needs to be understood. A typical first step is data cleaning and  exploratory analysis to find interesting details and patterns.

In this notebook, we will be working with the [New York City Yellow Taxi Trips Dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) for 2019.

## Reading and working with tabular data using **pandas**

### Reading data

pandas has a `read_csv` method to import data into your workspace. We use it to read the taxi data for January 2019.

`%%time` is a [magic function](https://ipython.readthedocs.io/en/stable/interactive/magics.html) in IPython to compute the execution time of a Python expression.

pandas reads data in the form of a 'dataframe' -- a structured format consisting of rows and column, along with some metadata about the values.

In [1]:
%%time

import pandas as pd

df = pd.read_parquet("s3://nyc-tlc/trip data/yellow_tripdata_2019-01.parquet")
df

CPU times: user 3.16 s, sys: 1.66 s, total: 4.82 s
Wall time: 53.5 s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1.0,1.50,1.0,N,151,239,1,7.00,0.50,0.5,1.65,0.00,0.3,9.95,,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1.0,2.60,1.0,N,239,246,1,14.00,0.50,0.5,1.00,0.00,0.3,16.30,,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3.0,0.00,1.0,N,236,236,1,4.50,0.50,0.5,0.00,0.00,0.3,5.80,,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5.0,0.00,1.0,N,193,193,2,3.50,0.50,0.5,0.00,0.00,0.3,7.55,,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5.0,0.00,2.0,N,193,193,2,52.00,0.00,0.5,0.00,0.00,0.3,55.55,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7696612,2,2019-01-31 23:37:20,2019-02-01 00:10:43,,10.24,,,142,95,0,0.00,2.75,0.0,0.00,5.76,0.3,0.00,,
7696613,2,2019-01-31 23:28:00,2019-01-31 23:50:50,,12.43,,,48,213,0,48.80,5.50,0.0,0.00,0.00,0.3,54.60,,
7696614,2,2019-01-31 23:11:00,2019-01-31 23:46:00,,9.14,,,159,246,0,51.05,2.75,0.5,0.00,0.00,0.3,54.60,,
7696615,2,2019-01-31 23:03:00,2019-01-31 23:14:00,,0.00,,,265,265,0,0.00,0.00,0.5,9.82,0.00,0.3,0.00,,


Note the time taken, it's ~5 seconds in our case. pandas has read all the data for January and inferred the datatypes for each column. The `.info()` method can be used to gather a concise summary of the dataframe.

In [2]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7696617 entries, 0 to 7696616
Data columns (total 19 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int64         
 1   tpep_pickup_datetime   datetime64[ns]
 2   tpep_dropoff_datetime  datetime64[ns]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int64         
 8   DOLocationID           int64         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 18  airport_fee           

### Working with the data

After importing the data, the next step is working on the data to find some useful information.

In the following blocks, the mean of the tip amount is calculated as a function of passenger count.

In pandas, you can use `mean()` to calculate mean, and `groupby()` for mapping to a column.

In [4]:
%%time

df.groupby("passenger_count").tip_amount.mean()

CPU times: user 124 ms, sys: 17.1 ms, total: 141 ms
Wall time: 138 ms


passenger_count
0.0    1.786901
1.0    1.828352
2.0    1.833932
3.0    1.795589
4.0    1.702710
5.0    1.869868
6.0    1.856830
7.0    6.542632
8.0    6.480690
9.0    3.116667
Name: tip_amount, dtype: float64

### Limitation in pandas

pandas is the most popular library for exploratory data analysis, but it has a limitation. pandas is great at handling small quantities of data, but fails with a `MemoryError` when using larger datasets. This is where Dask comes in.

Optional: Uncomment and run the following code block to read the entire dataset in pandas.

In [1]:
# import glob

# df = pd.concat(map(pd.read_csv, glob.glob('data/*.csv')))
# df

## Reading and working with tabular data using **Dask DataFrame**

### Reading data

Dask can be used to scale pandas to larger datasets. Dask's DataFrame API has the same functions as the pandas API because it's a wrapper around pandas. This makes Dask code familiar and easy to use.

First, spin up a cluster! 

In [5]:
from dask.distributed import Client

client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:51662,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:51680,Total threads: 2
Dashboard: http://127.0.0.1:51681/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51666,
Local directory: /var/folders/hf/2s7qjx7j5ndc5220_qxv8y800000gn/T/dask-worker-space/worker-uhglwxcd,Local directory: /var/folders/hf/2s7qjx7j5ndc5220_qxv8y800000gn/T/dask-worker-space/worker-uhglwxcd

0,1
Comm: tcp://127.0.0.1:51679,Total threads: 2
Dashboard: http://127.0.0.1:51684/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51665,
Local directory: /var/folders/hf/2s7qjx7j5ndc5220_qxv8y800000gn/T/dask-worker-space/worker-vdo838m4,Local directory: /var/folders/hf/2s7qjx7j5ndc5220_qxv8y800000gn/T/dask-worker-space/worker-vdo838m4

0,1
Comm: tcp://127.0.0.1:51677,Total threads: 2
Dashboard: http://127.0.0.1:51683/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51667,
Local directory: /var/folders/hf/2s7qjx7j5ndc5220_qxv8y800000gn/T/dask-worker-space/worker-_g98czai,Local directory: /var/folders/hf/2s7qjx7j5ndc5220_qxv8y800000gn/T/dask-worker-space/worker-_g98czai

0,1
Comm: tcp://127.0.0.1:51678,Total threads: 2
Dashboard: http://127.0.0.1:51682/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:51668,
Local directory: /var/folders/hf/2s7qjx7j5ndc5220_qxv8y800000gn/T/dask-worker-space/worker-xqi396ff,Local directory: /var/folders/hf/2s7qjx7j5ndc5220_qxv8y800000gn/T/dask-worker-space/worker-xqi396ff


Open the Dask Dashboard in JupyterLab -- Cluster Map, Task Stream, and Dask workers

* **Cluster map** (also called the pew-pew map) visualizes interactions between the scheduler and the workers.
* **Task stream** shows tasks performed by each worker in real-time.
* **Dask workers** displays CPU and memory being used by each worker.

The same reading operation with Dask, but this time read the complete dataset - data for all the years.

In [6]:
%%time

import dask.dataframe as dd

df = dd.read_parquet("s3://nyc-tlc/trip data/yellow_tripdata_2019-*.parquet")
df

CPU times: user 443 ms, sys: 151 ms, total: 594 ms
Wall time: 2.05 s


Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
npartitions=12,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
,int64,datetime64[ns],datetime64[ns],float64,float64,float64,object,int64,int64,int64,float64,float64,float64,float64,float64,float64,float64,float64,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


That took ~600 milliseconds because Dask hasn't actually imported all the data. It has created partitions and estimated the datatypes of each column.

Let's look at the first few rows, `head()` pandas method can be used for this.

In [7]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1.0,1.5,1.0,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1.0,2.6,1.0,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3.0,0.0,1.0,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5.0,0.0,2.0,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,


To look at the last few rows, use the `tail()` pandas method.

In [8]:
df.tail()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
6896312,2,2019-12-31 23:56:29,2020-01-01 00:11:17,,2.82,,,143,141,0,18.95,2.75,0.0,0.0,0.0,0.3,22.0,,
6896313,2,2019-12-31 23:11:53,2019-12-31 23:30:56,,3.75,,,148,246,0,22.45,2.75,0.0,0.0,0.0,0.3,25.5,,
6896314,2,2019-12-31 23:57:21,2020-01-01 00:23:34,,6.46,,,197,205,0,34.86,2.75,0.0,0.0,0.0,0.3,37.91,,
6896315,2,2019-12-31 23:37:29,2020-01-01 00:28:21,,5.66,,,90,74,0,36.45,2.75,0.0,0.0,0.0,0.3,39.5,,
6896316,2,2019-12-31 23:09:00,2019-12-31 23:54:00,,-15.5,,,142,149,0,53.03,2.75,0.5,0.0,6.12,0.3,62.7,,


This is different from pandas. pandas reads the complete dataset before inferring the datatypes and null-value information, which wouldn't be ideal for a larger-than-memory dataset.

Dask estimates the datatypes with a small sample of data to stay efficient, so a good practice is to specify datatypes during the function call.

*Note that Dask also provides a helpful error message to diagnose this issue.*

In [8]:
df = dd.read_parquet(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.parquet",
    dtype={'RatecodeID': 'float64',
           'VendorID': 'float64',
           'passenger_count': 'float64',
           'payment_type': 'float64'}
)
# repartition the dataset to a more optimal size for faster computations
df = df.repartition(partition_size="100MB").persist()

In [9]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1.0,1.5,1.0,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1.0,2.6,1.0,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3.0,0.0,1.0,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5.0,0.0,2.0,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,


In [10]:
df.tail()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
6896312,2,2019-12-31 23:56:29,2020-01-01 00:11:17,,2.82,,,143,141,0,18.95,2.75,0.0,0.0,0.0,0.3,22.0,,
6896313,2,2019-12-31 23:11:53,2019-12-31 23:30:56,,3.75,,,148,246,0,22.45,2.75,0.0,0.0,0.0,0.3,25.5,,
6896314,2,2019-12-31 23:57:21,2020-01-01 00:23:34,,6.46,,,197,205,0,34.86,2.75,0.0,0.0,0.0,0.3,37.91,,
6896315,2,2019-12-31 23:37:29,2020-01-01 00:28:21,,5.66,,,90,74,0,36.45,2.75,0.0,0.0,0.0,0.3,39.5,,
6896316,2,2019-12-31 23:09:00,2019-12-31 23:54:00,,-15.5,,,142,149,0,53.03,2.75,0.5,0.0,6.12,0.3,62.7,,


This now works!

### Working with the data

The same computation (to calculate mean for the tip amount as a function of passenger count) is now performed on the entire dataset using Dask DataFrame.

*Note that Dask code is similar to pandas code.*

In [11]:
%%time

mean_tip_amount = df.groupby("passenger_count").tip_amount.mean()
mean_tip_amount

CPU times: user 5.54 ms, sys: 1.69 ms, total: 7.24 ms
Wall time: 6.61 ms


Dask Series Structure:
npartitions=1
    float64
        ...
Name: tip_amount, dtype: float64
Dask Name: truediv, 6 graph layers

Dask DataFrame is backed by the Delayed API we saw in the previous notebook, so the evaluations here are also lazy.

You can use `compute()` to get the output.

In [None]:
%%time

mean_tip_amount.compute()

Dask deletes intermediate results, like the full pandas dataframe for each file. This lets us handle datasets that are larger than memory, but also means that repeated computations will have to load all of the data in each time.

You can use `persist()` to store intermediate results for future use:

```
mean_tip_persist = mean_tip_amount.persist()
```

### Checkpoint

**Question:** Compute the standard deviation for tip_amount as a function of passenger_count for the entire dataset.

In [None]:
#your answer here

In [None]:
# Solution 1

std_tip = df.groupby("passenger_count").tip_amount.std().compute()

### Sharing intermediate outputs

Sometimes individual computations may related to each other, and can benefit from sharing intermediate results. For example, computing minimum and maximum values.

In pandas (and therefore in Dask DataFrame), you can use `min()` and `max()` to compute minimum and maximum respectively.

In [13]:
max_tip_amount = df.tip_amount.max()
min_tip_amount = df.tip_amount.min()
median_tip_amount = df.tip_amount.median()

### Without Sharing

In [14]:
%%time
max_tip = max_tip_amount.compute()
min_tip = min_tip_amount.compute()
median_tip = median_tip_amount.compute()

CPU times: user 1min 16s, sys: 2.13 s, total: 1min 18s
Wall time: 1min 33s


### With Sharing

In [15]:
import dask

In [16]:
%%time
max_tip, min_tip = dask.compute(max_tip_amount, min_tip_amount)

CPU times: user 47.6 s, sys: 1.21 s, total: 48.8 s
Wall time: 51.8 s


Notice the shared computation is significantly faster!

### Checkpoint

**Question:** Compute the mean and standard deviation for total amount by sharing intermediate results.

In [None]:
#your answer here

In [None]:
# Solution 2

import dask

mean_total = df.total_amount.mean()
std_total = df.total_amount.mean()

dask.compute(mean_total, std_total)

In [17]:
client.close()

## Scaling to the Cloud (Optional)

We can now scale our Dask workflow to the cloud. There are many different ways to do this, but here we'll use [Coiled](https://www.coiled.io/). Coiled allows us to stay in this same notebook and makes the process much easier (see the [Coiled documentation](https://docs.coiled.io/user_guide/index.html)).

1. Sign in to [cloud.coiled.io](https://cloud.coiled.io/)
2. In your terminal (or command prompt in Windows) run `coiled login`
4. Set up Coiled with your cloud provider account by running `coiled setup wizard`

*Coiled is free to start!*

That's it! Now in the same notebook, let's connect to our Coiled cluster.

In [None]:
import coiled

cluster = coiled.Cluster(
    name="talkpython",
    n_workers=10,
    # uncomment if you're running on binder
    # scheduler_port=443
)

In [2]:
from dask.distributed import Client

client = Client(cluster)

In [3]:
import dask.dataframe as dd

In [13]:
df = dd.read_parquet(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.parquet"
)
df = df.repartition(partition_size="100MB").persist()

In [14]:
df.groupby("passenger_count").tip_amount.mean().compute()

passenger_count
0.0    2.122789
1.0    2.206793
2.0    2.214356
3.0    2.137791
4.0    2.023801
5.0    2.235441
6.0    2.221106
7.0    6.675962
8.0    7.111625
9.0    7.377822
Name: tip_amount, dtype: float64

In [15]:
# Close the cluster
# Will close automatically after 20 minutes of inactivity
cluster.close()

# Close the client
client.close()

## Limitations of Dask DataFrame

Dask DataFrame API does not implement the complete pandas interface because some pandas operations are not suited for a parallel and distributed environment.

### Data Shuffling

Dask DataFrames consist of multiple pandas dataframes, each of which has it's index starting from zero. Some operations like indexing (`set_index`, `reset_index`) may need the data to be sorted, which requires a lot of time-consuming shuffling of data. These operations are slower in Dask. Hence, presorting the index and making logical partitions are good practices.


## References

* [Dask DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)
* [Dask DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html)
* [Dask DataFrame examples](https://examples.dask.org/dataframe.html)
* [Dask Tutorial - DataFrames](https://github.com/pavithraes/dask-tutorial/blob/master/04_dataframe.ipynb)