In [1]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import dask.dataframe as dd
import pickle
from scipy import stats
from dask.diagnostics import ProgressBar
from tabulate import tabulate
import dask.array as da
from dask.distributed import Client,progress
import calendar
import time
# for faster processing
import dask.multiprocessing
#dask.config.set(get=dask.multiprocessing.get)  # set processes as default

In [2]:
dask.config.set(scheduler='multiprocessing')

<dask.config.set at 0x1f1a44c1828>

In [3]:
# # from dask.distributed import Client, progress
# # c = Client(processes=False)
# # c
c = Client()  # create processes and set as default

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


### Load Parquet Files

In [4]:
scolumns = ['tpep_pickup_datetime',
            'passenger_count', 
            'trip_distance',
            'PULocationID',
            'fare_amount',
           'duration',
           'speed']


y2017_ddf_final = dd.read_parquet("taxi_data/y2017_ddf_final.parquet", columns=scolumns)

In [5]:
y2017_ddf_final.head()

Unnamed: 0,tpep_pickup_datetime,passenger_count,trip_distance,PULocationID,fare_amount,duration,speed
0,2017-05-30 15:34:55,1.0,1.3,68.0,9.0,773.0,0.001682
1,2017-05-30 15:34:55,1.0,2.2,186.0,18.0,1790.0,0.001229
2,2017-05-30 15:34:55,1.0,1.13,231.0,9.0,741.0,0.001525
3,2017-05-30 15:34:55,5.0,1.08,161.0,7.0,525.0,0.002057
4,2017-05-30 15:34:55,1.0,1.96,239.0,14.0,1244.0,0.001576


Set the DataFrame index (row labels) using an existing column

This realigns the dataset to be sorted by a new column. This can have a significant impact on performance, because joins, groupbys, lookups, etc. are all much faster on that column. 

In [6]:
# set index
y2017_ddf_final = y2017_ddf_final.set_index("tpep_pickup_datetime", drop=True)

In [7]:
y2017_ddf_final.head()

Unnamed: 0_level_0,passenger_count,trip_distance,PULocationID,fare_amount,duration,speed
tpep_pickup_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2017-01-01 00:00:02,1.0,0.5,48.0,4.0,228.0,0.002193
2017-01-01 00:00:02,4.0,7.75,186.0,22.0,2360.0,0.003284
2017-01-01 00:00:03,1.0,0.8,162.0,6.0,415.0,0.001928
2017-01-01 00:00:05,2.0,0.9,48.0,7.0,508.0,0.001772
2017-01-01 00:00:05,5.0,1.76,140.0,7.0,299.0,0.005886


In [8]:
# check nulls
y2017_ddf_final.isnull().sum().compute()

passenger_count    0
trip_distance      0
PULocationID       0
fare_amount        0
duration           0
speed              0
dtype: int64

In [9]:
# the data is divided by time stamps
y2017_ddf_final.divisions[0:5]

(Timestamp('2017-01-01 00:00:02'),
 Timestamp('2017-01-03 15:41:37'),
 Timestamp('2017-01-05 21:28:22.311316736'),
 Timestamp('2017-01-07 22:59:35'),
 Timestamp('2017-01-10 11:54:42.066666752'))

In [10]:
y2017_ddf_final.get_partition(1).index

Dask Index Structure:
npartitions=1
2017-01-03 15:41:37.000000000    datetime64[ns]
2017-01-05 21:28:22.311316736               ...
Name: tpep_pickup_datetime, dtype: datetime64[ns]
Dask Name: get-partition, 6400 tasks

In [11]:
y2017_ddf_final.get_partition(1).index

Dask Index Structure:
npartitions=1
2017-01-03 15:41:37.000000000    datetime64[ns]
2017-01-05 21:28:22.311316736               ...
Name: tpep_pickup_datetime, dtype: datetime64[ns]
Dask Name: get-partition, 6400 tasks

In [12]:
len(y2017_ddf_final)

105886773

In [13]:
y2017_ddf_final.get_partition(1).index.compute()

DatetimeIndex(['2017-01-03 15:41:37', '2017-01-03 15:41:37',
               '2017-01-03 15:41:37', '2017-01-03 15:41:37',
               '2017-01-03 15:41:37', '2017-01-03 15:41:37',
               '2017-01-03 15:41:37', '2017-01-03 15:41:37',
               '2017-01-03 15:41:38', '2017-01-03 15:41:38',
               ...
               '2017-01-05 21:28:21', '2017-01-05 21:28:21',
               '2017-01-05 21:28:21', '2017-01-05 21:28:22',
               '2017-01-05 21:28:22', '2017-01-05 21:28:22',
               '2017-01-05 21:28:22', '2017-01-05 21:28:22',
               '2017-01-05 21:28:22', '2017-01-05 21:28:22'],
              dtype='datetime64[ns]', name='tpep_pickup_datetime', length=644749, freq=None)

### Group, Aggregate, Resample Data

The dask dataframe is grouped by location and using pandas to do the operations within each group.

In [14]:
meta={'PULocationID':'float64',
    'passenger_count': 'float64', 
      'trip_distance': 'float64',
      'fare_amount':'float64',
      'duration':'float64', 
      'speed':'float64'
     }
def per_group(blk):
    return blk.resample('H').mean()

y2017_as_pd_resampled = y2017_ddf_final.groupby("PULocationID").apply(per_group).compute()

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  # This is added back by InteractiveShellApp.init_path()


In [15]:
y2017_as_pd_resampled.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,passenger_count,trip_distance,PULocationID,fare_amount,duration,speed
PULocationID,tpep_pickup_datetime,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
87.0,2017-01-01 00:00:00,1.567568,4.133851,87.0,15.736486,976.445946,0.004297
87.0,2017-01-01 01:00:00,1.609053,4.746872,87.0,17.598765,1095.666667,0.004307
87.0,2017-01-01 02:00:00,1.680982,4.488344,87.0,16.588957,1006.503067,0.019612
87.0,2017-01-01 03:00:00,1.674603,4.507937,87.0,15.821429,843.539683,0.005046
87.0,2017-01-01 04:00:00,1.695652,5.572174,87.0,18.913043,933.231884,0.007549


In [18]:
y2017_as_pd_resampled = y2017_as_pd_resampled.drop("PULocationID", axis=1)

In [21]:
#conda install python-snappy
y2017_as_pd_resampled.to_parquet("taxi_data/y2017_as_pd_resampled.parquet", compression="GZIP")
y2017_as_pd_resampled.to_csv("taxi_data/y2017_as_pd_resampled.csv")

In [29]:
y2017_as_pd_resampled = pd.read_csv("taxi_data/y2017_as_pd_resampled.csv", index_col=1)

In [30]:
y2017_as_pd_resampled.head()

Unnamed: 0_level_0,PULocationID,passenger_count,trip_distance,fare_amount,duration,speed
tpep_pickup_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2017-01-01 00:00:00,87.0,1.567568,4.133851,15.736486,976.445946,0.004297
2017-01-01 01:00:00,87.0,1.609053,4.746872,17.598765,1095.666667,0.004307
2017-01-01 02:00:00,87.0,1.680982,4.488344,16.588957,1006.503067,0.019612
2017-01-01 03:00:00,87.0,1.674603,4.507937,15.821429,843.539683,0.005046
2017-01-01 04:00:00,87.0,1.695652,5.572174,18.913043,933.231884,0.007549
