<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">

DataFrames on a Cluster
=======================

<img src="http://pandas.pydata.org/_static/pandas_logo.png"
     align="left"
     width="30%"
     alt="Pandas logo">



This notebook needs the [gcsfs library](https://gcsfs.readthedocs.io).

    pip install gcsfs

## Read single dataframe from the cloud with Pandas

In [None]:
from gcsfs import GCSFileSystem
gcs = GCSFileSystem(token='anon')

gcs.ls('anaconda-public-data/nyc-taxi/csv/2015/')

In [None]:
import pandas as pd

with gcs.open('anaconda-public-data/nyc-taxi/csv/2015/yellow_tripdata_2015-01.csv') as f:
    df = pd.read_csv(f, nrows=5, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df

## Parallelize Pandas with Dask.dataframe


In [None]:
import dask
from dask.distributed import Client, progress
client = Client()
client

In [None]:
import dask.dataframe as dd

df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv',
                 storage_options={'token': 'anon'}, 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

In [None]:
df = df.persist()
progress(df)


Dask DataFrames
---------------

*  Coordinate many Pandas DataFrames across a cluster
*  Faithfully implement a subset of the Pandas API
*  Use Pandas under the hood (for speed and maturity)

In [None]:
df

In [None]:
df.dtypes

In [None]:
df.head()

In [None]:
%time len(df)

In [None]:
%time df.passenger_count.sum().compute()

In [None]:
# Compute average trip distance grouped by passenger count
df.groupby(df.passenger_count).trip_distance.mean().compute()

### Tip Fraction, grouped by day-of-week and hour-of-day

In [None]:
df2 = df[(df.tip_amount > 0) & (df.fare_amount > 0)]
df2['tip_fraction'] = df2.tip_amount / df2.fare_amount

In [None]:
# Group df.tpep_pickup_datetime by dayofweek and hour
dayofweek = df2.groupby(df2.tpep_pickup_datetime.dt.dayofweek).tip_fraction.mean() 
hour = df2.groupby(df2.tpep_pickup_datetime.dt.hour).tip_fraction.mean()

dayofweek, hour = dask.persist(dayofweek, hour)
progress(dayofweek, hour)

### Plot results

This requires matplotlib to be installed

In [None]:
%matplotlib inline

In [None]:
hour.compute().plot(figsize=(10, 6), title='Tip Fraction by Hour')