
Dask Dataframes on NYC Taxi Data
================================

<img src="http://pandas.pydata.org/_static/pandas_logo.png"
     align="left"
     width="30%"
     alt="Pandas logo">
     <img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">

In this section we will learn how to ...

-  use Dask Dataframe to scale Pandas workloads
-  call `.compute` and `.persist` to trigger computation
-  start and scale a Dask cluster on Kubernetes
-  interpret dashboard plots


In [None]:
import warnings

warnings.filterwarnings("ignore", message="numpy.dtype size changed")
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")

## We have several CSV files in cloud storage

In [None]:
from gcsfs import GCSFileSystem
gcs = GCSFileSystem()

sorted(gcs.glob('anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv'))

## Read a subset with Pandas

It's too big to fit in memory on a single machine, so we pull out the first million rows to get a first impression.

In [None]:
import pandas as pd

with gcs.open('anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-01.csv') as f:
    df = pd.read_csv(f, nrows=1000000, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

In [None]:
df

## Investigate the subset as normal

In [None]:
! wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip

In [None]:
! unzip ngrok-stable-linux-amd64.zip

## Start a Dask Cluster

Your notebook is conveniently attached to a Kubernetes cluster, so you can start a Dask cluster using the [dask-kubernetes](https://kubernetes.dask.org/en/latest/) project.

For more information on deploying Dask on different cluster technology see [Dask's deployment documentation](https://docs.dask.org/en/latest/setup.html)

In [None]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker.yml')
cluster

In [None]:
from dask.distributed import Client

client = Client(cluster)

## Create Dask dataframe around all of the data

Before we loaded only a subset of one CSV file.  Now lets use Dask dataframe to read all of the files.

For more information you can read [Dask's documentation for creating dataframes](http://docs.dask.org/en/latest/dataframe-create.html)

In [None]:
import dask.dataframe as dd

df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-01.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df = df.persist()

Dask dataframes look like Pandas dataframes, and support most of the common Pandas methods.

In [None]:
df.passenger_count.sum().compute()

## Investigate laziness and use the `.compute()` method

Note that the `df.passenger_count.sum()` computation did not yet execute.  Dask dataframes are *lazy* by default, so they only evaluate when we tell them to.

There are two ways to trigger computation:

-  `result = result.compute()`: triggers computation and stores the result into local memory as a Pandas object.  

    You should use this with *small* results that will fit into memory.
-  `result = result.persist()`: triggers computation and stores the result into distributed memory, returning another Dask dataframe object.  

    You should use this with *large* results that you want to stage in distributed memory for repeated computation.

#### *Exercise*: Run the Pandas computations above with Dask dataframe

In [None]:
# Example usage
import distributed
import dask.array as da

# Connect dask to the cluster
client = distributed.Client(cluster)

# Create an array and calculate the mean
array = da.ones((1000, 1000, 100), chunks=(100, 100, 10))
print(array.mean().compute())  # Should print 1.0

In [None]:
# The average trip distance for rides with a single passenger



In [None]:
# The average trip distance grouped by passenger counts


## Persist data in memory

When we started this notebook we ran the following lines to create our dataframe.

```python
df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df = df.persist()
```

In particular, we called `df = df.persist()` to load all of the CSV data into distributed memory.  Having this data in memory made our subsequent computations fast.  

In this section we're going to reset our cluster and run the same computations, but without persisting our data in memory.  What happens to our computation times?  Why?

In [None]:
client.restart()

In [None]:
df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df

In [None]:
# How many passengers total?
df.passenger_count.sum().compute()  

In [None]:
# The average trip distance for rides with a single passenger
df2 = df[df.passenger_count == 1]  
df2.trip_distance.mean().compute()

In [None]:
# The average trip distance grouped by passenger counts
df.groupby(df.passenger_count).trip_distance.mean().compute()

#### *Exercise*: What did our workers spend their time doing?

To answer this question look at the Task Stream dashboard plot.  It will tell you the activity on each core of your cluster (y-axis) over time (x-axis).  You can hover over each rectangle of this plot to determine what kind of task it was.  What kinds of tasks are most common and take up the most time?

*Extra*: if you're ahead of the group you might also want to look at the Profile dashboard plot.  You can access this by selecting the orange Dask icon on the left side of your JupyterLab page.  The profile plot is an interactive [Flame graph](http://www.brendangregg.com/FlameGraphs/cpuflamegraphs.html)

In [None]:
df = df.persist()  # we persist our data again, just to make future sections faster

And the type of each partition using the `map_partitions` method.

In [None]:
type(df)

In [None]:
df.map_partitions(type).compute()

### Divisions and the Index

Just like Pandas, Dask Dataframe has an *index*, a special column that indexes the rows of our dataframe.  In Dask this index has an additional purpose, it serves as a sorted partitioning of our data.  This makes some algorithms more efficient.  In this section, we'll sort our data by time and dive into the index a bit more deeply.

First, notice that our index is not particularly informative.  This is common when you load a dataset from CSV data, which generally doesn't store index or sorting information.

Lets set a new index to be the pickup time.  Sorting in parallel is hard, so this is an expensive operation.

In [None]:
df2 = df.set_index('tpep_pickup_datetime').persist()

In [None]:
df2

In [None]:
df2.head()

In [None]:
df2.tail()

Our dataframe is split into roughly as many partitions as before, but now we know the time range of each partition.  Internally, the divisions between partitions is stored in the divisions attribute.

In [None]:
df2.divisions

## Close things when you're done

Before you move onto the next notebook, please close down your current cluster.

Alternatively, you can restart this notebook by pressing the `"0"` key twice

In [None]:
cluster.close();