<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">

DataFrames on a Cluster
=======================

<img src="http://pandas.pydata.org/_static/pandas_logo.png"
     align="left"
     width="50%"
     alt="Pandas logo">


### Read single dataframe from S3 with Pandas

In [2]:
from s3fs import S3FileSystem

s3 = S3FileSystem(anon=True)
s3.ls('dask-data/nyc-taxi/2015/')

['dask-data/nyc-taxi/2015/',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-02.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-03.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-04.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-05.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-06.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-07.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-08.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-09.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-10.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-11.csv',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-12.csv',
 'dask-data/nyc-taxi/2015/parquet.gz',
 'dask-data/nyc-taxi/2015/parquet',
 'dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.parq']

In [None]:
import pandas as pd

with s3.open('dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv') as f:
    df = pd.read_csv(f, nrows=5, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
df

### Parallelize Pandas with Dask.dataframe


In [3]:
from dask.distributed import Client, progress
client = Client(processes=False)
#client = Client('18.222.238.200:8786')
client

0,1
Client  Scheduler: inproc://192.168.254.105/21066/1,Cluster  Workers: 1  Cores: 4  Memory: 8.26 GB


In [4]:
import dask.dataframe as dd

df = dd.read_csv('s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-1*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                 storage_options={'anon': True})

In [6]:
df.size

dd.Scalar<size-ag..., dtype=int64>

In [7]:
df = client.persist(df)
progress(df)

Function:  execute_task
args:      ((<function check_meta at 0x7f268239db70>, (<function apply at 0x7f269a9f0b70>, <function pandas_read_text at 0x7f267eaa1d90>, [<function _make_parser_function.<locals>.parser_f at 0x7f2683f25b70>, (<function read_block_from_file at 0x7f2682e1d8c8>, <dask.bytes.core.OpenFile object at 0x7f267e8986a0>, 1152000000, 64000000, b'\n'), b'VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount\r\n', (<class 'dict'>, [['parse_dates', ['tpep_pickup_datetime', 'tpep_dropoff_datetime']]]), (<class 'dict'>, [['VendorID', dtype('int64')], ['tpep_pickup_datetime', dtype('<M8[ns]')], ['tpep_dropoff_datetime', dtype('<M8[ns]')], ['passenger_count', dtype('int64')], ['trip_distance', dtype('float64')], ['pickup_longitude', dtype('float64')], ['picku

Function:  execute_task
args:      ((<function check_meta at 0x7f268239db70>, (<function apply at 0x7f269a9f0b70>, <function pandas_read_text at 0x7f267eaa1d90>, [<function _make_parser_function.<locals>.parser_f at 0x7f2683f25b70>, (<function read_block_from_file at 0x7f2682e1d8c8>, <dask.bytes.core.OpenFile object at 0x7f267e898860>, 1408000000, 64000000, b'\n'), b'VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount\r\n', (<class 'dict'>, [['parse_dates', ['tpep_pickup_datetime', 'tpep_dropoff_datetime']]]), (<class 'dict'>, [['VendorID', dtype('int64')], ['tpep_pickup_datetime', dtype('<M8[ns]')], ['tpep_dropoff_datetime', dtype('<M8[ns]')], ['passenger_count', dtype('int64')], ['trip_distance', dtype('float64')], ['pickup_longitude', dtype('float64')], ['picku


Dask DataFrames
---------------

*  Coordinate many Pandas DataFrames across a cluster
*  Faithfully implement a subset of the Pandas API
*  Use Pandas under the hood (for speed and maturity)

In [None]:
df

In [None]:
df.dtypes

In [None]:
df.head()

In [None]:
%time len(df)

In [None]:
%time df.passenger_count.sum().compute()

In [None]:
# Compute average trip distance grouped by passenger count
df.groupby(df.passenger_count).trip_distance.mean().compute()

### Tip Fraction, grouped by day-of-week and hour-of-day

In [None]:
df2 = df[(df.tip_amount > 0) & (df.fare_amount > 0)]
df2['tip_fraction'] = df2.tip_amount / df2.fare_amount

In [None]:
# Group df.tpep_pickup_datetime by dayofweek and hour
dayofweek = df2.groupby(df2.tpep_pickup_datetime.dt.dayofweek).tip_fraction.mean() 
hour = df2.groupby(df2.tpep_pickup_datetime.dt.hour).tip_fraction.mean()

dayofweek, hour = client.persist([dayofweek, hour])
progress(dayofweek, hour)

### Plot results

In [None]:
from bokeh.plotting import figure, output_notebook, show
output_notebook()

fig = figure(title='Tip Fraction',
             x_axis_label='Hour of day',
             y_axis_label='Tip Fraction',
             height=300)
fig.line(x=hour.index.compute(), y=hour.compute(), line_width=3)
fig.y_range.start = 0

show(fig)