<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     width="30%" 
     align=right
     alt="Dask logo">

Distributed DataFrames with Dask
--------------------------------

In [11]:
from dask.distributed import Executor, progress
e = Executor('cluster.demo.continuum.io:8786')
e

<Executor: scheduler=cluster.demo.continuum.io:8786 processes=24 cores=24>

In [9]:
e.restart()

<Executor: scheduler=cluster.demo.continuum.io:8786 processes=24 cores=24>

In [12]:
import dask
dask.set_options(get=e.get)

<dask.context.set_options at 0x7f31e4611b50>

In [13]:
import dask.dataframe as dd

df = dd.read_csv('hdfs:///user/ubuntu/nyc-taxi/yellow_tripdata_2015-*.csv', 
                  parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                  header='infer')
df = e.persist(df)

In [14]:
progress(df)

In [17]:
df

dd.DataFrame<from-de..., npartitions=91>

In [18]:
df.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                   int64
trip_distance                   float64
pickup_longitude                float64
pickup_latitude                 float64
RateCodeID                      float64
store_and_fwd_flag               object
dropoff_longitude               float64
dropoff_latitude                float64
payment_type                    float64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
total_amount\r                  float64
dtype: object

In [19]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,total_amount
0,2,2015-01-08 22:44:09,2015-01-08 22:50:56,1,1.55,-73.987686,40.724251,1.0,N,-73.973763,40.743378,2.0,7.5,0.5,0.5,0.0,0.0,8.8
1,1,2015-01-08 22:44:09,2015-01-08 22:51:17,3,1.2,-73.99157,40.726933,1.0,N,-74.004105,40.721081,2.0,7.0,0.5,0.5,0.0,0.0,8.3
2,1,2015-01-08 22:44:10,2015-01-08 22:55:27,1,2.4,-73.981918,40.783443,1.0,N,-73.952354,40.798199,2.0,10.5,0.5,0.5,0.0,0.0,11.8
3,1,2015-01-08 22:44:10,2015-01-08 22:58:09,1,7.3,-73.973122,40.743553,1.0,N,-73.919571,40.832001,2.0,21.5,0.5,0.5,0.0,0.0,22.8
4,1,2015-01-08 22:44:12,2015-01-08 22:46:16,1,0.4,-73.982948,40.766209,1.0,N,-73.98439,40.764053,2.0,3.5,0.5,0.5,0.0,0.0,4.8


## Familiar experience for Pandas users

Dask.dataframes have the same Pandas API loved by data scientists.

In [20]:
df.passenger_count.sum().compute()

129332965

In [24]:
df.groupby(df.passenger_count).trip_distance.mean().compute()

passenger_count
0     2.313313
1    19.017508
2    17.851579
3     0.120515
4    11.545251
5     3.013812
6     2.922961
7     4.224845
8     3.842568
9     5.302687
Name: trip_distance, dtype: float64

In [25]:
df2 = df.assign(payment_2=(df.payment_type == 2),
                no_tip=(df.tip_amount == 0))[['no_tip', 'payment_2']]
df2.head()

Unnamed: 0,no_tip,payment_2
0,True,True
1,True,True
2,True,True
3,True,True
4,True,True


In [26]:
df2.astype(int).corr().compute()

Unnamed: 0,no_tip,payment_2
no_tip,1.0,0.944811
payment_2,0.944811,1.0


## Look at tips collected by hour of day

New Yorkers tip surprisingly well at 4am.  Can you guess why?

In [27]:
df2 = df[(df.payment_type != 2) & (df.fare_amount > 0)]
df2 = df2.assign(tip_fraction=df2.tip_amount / df2.fare_amount)  # ratio of tip to fare

In [28]:
hour = df2.groupby(df2.tpep_pickup_datetime.dt.hour).tip_fraction.mean()
hour = e.persist(hour)
progress(hour)

## Plot results with Bokeh

In [24]:
from bokeh.plotting import figure, output_notebook, show
output_notebook()

fig = figure(title='Tip Fraction', 
             x_axis_label='Hour of day', 
             y_axis_label='Tip Fraction')
fig.line(x=hour.index.compute(), y=hour.compute(), line_width=3)
fig.y_range.start = 0

show(fig)