
Dask Dataframes on NYC Taxi Data
================================

<img src="http://pandas.pydata.org/_static/pandas_logo.png"
     align="left"
     width="30%"
     alt="Pandas logo">
     <img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">

In this section we will learn how to ...

-  use Dask Dataframe to scale Pandas workloads
-  call `.compute` and `.persist` to trigger computation
-  start and scale a Dask cluster on Kubernetes
-  interpret dashboard plots


In [1]:
import warnings

warnings.filterwarnings("ignore", message="numpy.dtype size changed")
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")

## We have several CSV files in cloud storage

In [2]:
from gcsfs import GCSFileSystem
gcs = GCSFileSystem()

sorted(gcs.glob('anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv'))

['anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-01.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-02.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-03.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-04.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-05.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-06.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-07.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-08.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-09.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-10.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-11.csv',
 'anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-12.csv']

## Read a subset with Pandas

It's too big to fit in memory on a single machine, so we pull out the first million rows to get a first impression.

In [3]:
import pandas as pd

with gcs.open('anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-01.csv') as f:
    df = pd.read_csv(f, nrows=1000000, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

In [None]:
df

## Investigate the subset as normal

In [None]:
# How many passengers total?
df.passenger_count.sum()

In [None]:
# The average trip distance for rides with a single passenger
df2 = df[df.passenger_count == 1]
df2.trip_distance.mean()

In [None]:
# The average trip distance grouped by passenger counts
df.groupby(df.passenger_count).trip_distance.mean()  

## Start a Dask Cluster

Your notebook is conveniently attached to a Kubernetes cluster, so you can start a Dask cluster using the [dask-kubernetes](https://kubernetes.dask.org/en/latest/) project.

For more information on deploying Dask on different cluster technology see [Dask's deployment documentation](https://docs.dask.org/en/latest/setup.html)

In [4]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=20)
cluster

VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [5]:
from dask.distributed import Client

client = Client(cluster)

## Create Dask dataframe around all of the data

Before we loaded only a subset of one CSV file.  Now lets use Dask dataframe to read all of the files.

For more information you can read [Dask's documentation for creating dataframes](http://docs.dask.org/en/latest/dataframe-create.html)

In [6]:
import dask.dataframe as dd

df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df = df.persist()

Dask dataframes look like Pandas dataframes, and support most of the common Pandas methods.

In [8]:
df.passenger_count.sum().compute()

245566747

## Investigate laziness and use the `.compute()` method

Note that the `df.passenger_count.sum()` computation did not yet execute.  Dask dataframes are *lazy* by default, so they only evaluate when we tell them to.

There are two ways to trigger computation:

-  `result = result.compute()`: triggers computation and stores the result into local memory as a Pandas object.  

    You should use this with *small* results that will fit into memory.
-  `result = result.persist()`: triggers computation and stores the result into distributed memory, returning another Dask dataframe object.  

    You should use this with *large* results that you want to stage in distributed memory for repeated computation.

#### *Exercise*: Run the Pandas computations above with Dask dataframe

In [None]:
# How many passengers total?


In [None]:
# The average trip distance for rides with a single passenger



In [None]:
# The average trip distance grouped by passenger counts


#### *Question*: When is it safe to call compute?

Recall that calling `.compute()` on a Dask DataFrame returns a Pandas result in your local memory.  This can be dangerous if the size of the result is large.  In which of the following situations is calling `.compute()` ok?

-  `df.sum()`
-  `df[df.passenger_count == 1]`
-  `df[df.passenger_count == 10]`
-  `df.groupby(df.passenger_count).trip_distance.mean()`
-  `df.groupby(df.tpep_pickup_latitute).trip_distance.mean()`

## Persist data in memory

When we started this notebook we ran the following lines to create our dataframe.

```python
df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df = df.persist()
```

In particular, we called `df = df.persist()` to load all of the CSV data into distributed memory.  Having this data in memory made our subsequent computations fast.  

In this section we're going to reset our cluster and run the same computations, but without persisting our data in memory.  What happens to our computation times?  Why?

In [None]:
client.restart()

In [None]:
df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df

In [None]:
# How many passengers total?
df.passenger_count.sum().compute()  

In [None]:
# The average trip distance for rides with a single passenger
df2 = df[df.passenger_count == 1]  
df2.trip_distance.mean().compute()

In [9]:
# The average trip distance grouped by passenger counts
df.groupby(df.passenger_count).trip_distance.mean().compute()

passenger_count
0     2.279183
1    15.541413
2    11.815871
3     1.620052
4     7.481066
5     3.066019
6     2.977158
9     5.459763
7     3.303054
8     3.866298
Name: trip_distance, dtype: float64

In [56]:
#
# IMPORTANT
#
# Run the following in an interactive terminal
# to authenticate yourself to your GCP project:
#

# gcloud auth application-default login

In [55]:
#verify that we have local credentials
import os
import google.auth

auth_info = google.auth.default()
credentials = auth_info[0]
print("Credentials stored in", os.environ['GOOGLE_APPLICATION_CREDENTIALS'])


Credentials stored in /notebooks/adc.json


In [60]:
import pandas_gbq

sql = """
SELECT 
    passenger_count, 
    AVG(trip_distance) as mean_trip_distance 
FROM [bigquery-public-data.new_york.tlc_yellow_trips_2015]
GROUP BY passenger_count 
ORDER BY passenger_count
"""
df = pandas_gbq.read_gbq(sql, 
                         project_id="jsp-work",
                         dialect="legacy",
                         credentials=credentials)
df

Unnamed: 0,passenger_count,mean_trip_distance
0,0,2.279183
1,1,15.541413
2,2,11.815871
3,3,1.620052
4,4,7.481066
5,5,3.066019
6,6,2.977158
7,7,3.303054
8,8,3.866298
9,9,5.459763


#### *Exercise*: What did our workers spend their time doing?

To answer this question look at the Task Stream dashboard plot.  It will tell you the activity on each core of your cluster (y-axis) over time (x-axis).  You can hover over each rectangle of this plot to determine what kind of task it was.  What kinds of tasks are most common and take up the most time?

*Extra*: if you're ahead of the group you might also want to look at the Profile dashboard plot.  You can access this by selecting the orange Dask icon on the left side of your JupyterLab page.  The profile plot is an interactive [Flame graph](http://www.brendangregg.com/FlameGraphs/cpuflamegraphs.html)

In [None]:
df = df.persist()  # we persist our data again, just to make future sections faster

## Dask DataFrame Design

We briefly discuss the design of Dask dataframes.  Then we follow this section with exercises that dive into this design.

<img src="http://docs.dask.org/en/latest/_images/dask-dataframe.svg"
     width="50%">
     
Dask dataframes are composed of many *partitions*, split along the index.  Each partition is a Pandas dataframe or Series.  You can see the number of partitions in the rendering of a Dask Dataframe.

In [None]:
df

And the type of each partition using the `map_partitions` method.

In [None]:
type(df)

In [None]:
df.map_partitions(type).compute()

### Divisions and the Index

Just like Pandas, Dask Dataframe has an *index*, a special column that indexes the rows of our dataframe.  In Dask this index has an additional purpose, it serves as a sorted partitioning of our data.  This makes some algorithms more efficient.  In this section, we'll sort our data by time and dive into the index a bit more deeply.

First, notice that our index is not particularly informative.  This is common when you load a dataset from CSV data, which generally doesn't store index or sorting information.

Lets set a new index to be the pickup time.  Sorting in parallel is hard, so this is an expensive operation.

In [None]:
df2 = df.set_index('tpep_pickup_datetime').persist()

In [None]:
df2

In [None]:
df2.head()

In [None]:
df2.tail()

Our dataframe is split into roughly as many partitions as before, but now we know the time range of each partition.  Internally, the divisions between partitions is stored in the divisions attribute.

In [None]:
df2.divisions

### Question: What took up the most time in the operation above?

What colors are most prominent in the task stream plot?

When you hover over some of these bars, what do they say?

### Fast operations along the index

Having a sorted dataframe allows for fast operations, like random access lookup and timeseries operations.

In [None]:
df2.loc['2015-05-05'].compute()  # pick out one day of data

In [None]:
df2.passenger_count.resample('1h').mean().compute().plot()

### Exercises if you are done early

Explore timeseries operations like `resample` and `rolling`

You may want to look at the [DataFrame API](http://docs.dask.org/en/latest/dataframe-api.html)

## Close things when you're done

Before you move onto the next notebook, please close down your current cluster.

Alternatively, you can restart this notebook by pressing the `"0"` key twice

In [62]:
cluster.close();