In [1]:
import dask.dataframe as dd
import numpy as np
from dask.distributed import Client

In [2]:
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:63071  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 17.18 GB


In [3]:
fare_dtypes = {
    "vendor_id": "category",
    "payment_type": "category",
    "fare_amount": 'float64',
    'surcharge': 'float64',
    'mta_tax': 'float64',
    'tip_amount': 'float64',
    'tolls_amount': "float64",
    "total_amount": "float64"
}

cat_cols = ['medallion', 'hack_license']

In [4]:
fares_fn = "/Volumes/ssd-t3/data/nyc-opendata/taxi/trip_fare_12.csv"

In [5]:
# read column names
with open(fares_fn, 'r') as f:
    fares_cols = [x.strip() for x in f.readline().split(',')]
fares_cols

['medallion',
 'hack_license',
 'vendor_id',
 'pickup_datetime',
 'payment_type',
 'fare_amount',
 'surcharge',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'total_amount']

In [6]:
fares = dd.read_csv(
    fares_fn, names=fares_cols, header=0, parse_dates=['pickup_datetime'],
    dtype=fare_dtypes, infer_datetime_format=True
)

In [7]:
fares = fares.persist()

In [15]:
fares.dtypes

medallion                  object
hack_license               object
vendor_id                category
pickup_datetime    datetime64[ns]
payment_type             category
fare_amount               float64
surcharge                 float64
mta_tax                   float64
tip_amount                float64
tolls_amount              float64
total_amount              float64
dtype: object

## Create foreign key tables

In [10]:
# how many medallions?
fares.medallion.nunique().compute()

13460

In [8]:
medallions = fares.medallion.unique().compute()

In [11]:
medallions = (medallions
    .sort_values()
    .to_frame()
    .assign(medallion_id=lambda df: df.index + 1)
    .astype({'medallion_id': np.uint16})
)

In [13]:
# how many licenses?
fares.hack_license.nunique().compute()

33381

In [12]:
hack_license = fares.hack_license.unique().compute()

In [14]:
hack_license = (hack_license
    .sort_values()
    .to_frame()
    .assign(hack_license_id=lambda df: df.index + 1)
    .astype({'hack_license_id': np.uint16})
)

## More exploratory analysis

In [17]:
len(fares)

13971118

In [None]:
fares.npartitions

In [16]:
fares.vendor_id.nunique().compute()

2

In [21]:
"%d" % fares.medallion.nunique_approx().compute()

'13459'

In [22]:
# get values for partitions
fares_dts = sorted(
    fares.pickup_datetime.dt.normalize().unique().compute().array)

In [26]:
fares_cols = [
    'medallion_id', 'hack_license_id',
    'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount', 
    'tolls_amount', 'total_amount', 'pickup_datetime'
]
fares2 = (fares
    .merge(medallions, on='medallion')
    .merge(hack_license, on='hack_license')
    [fares_cols]
    .set_index('pickup_datetime', divisions=fares_dts)
    .persist())

In [27]:
fares2.head()

Unnamed: 0_level_0,medallion_id,hack_license_id,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
pickup_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2013-12-01,229,245,CRD,8.5,0.5,0.5,2.0,0.0,11.5
2013-12-01,2406,4173,CSH,11.5,0.5,0.5,0.0,0.0,12.5
2013-12-01,3172,1400,CRD,16.0,0.5,0.5,3.3,0.0,20.3
2013-12-01,4430,7333,CRD,23.0,0.5,0.5,4.7,0.0,28.7
2013-12-01,1910,3714,UNK,19.0,0.5,0.5,4.88,0.0,24.88


In [None]:
# fares.index.max().compute() # slow, don't do this

In [28]:
fares2.known_divisions

True

In [29]:
fares2.divisions[:5]

(Timestamp('2013-12-01 00:00:00'),
 Timestamp('2013-12-02 00:00:00'),
 Timestamp('2013-12-03 00:00:00'),
 Timestamp('2013-12-04 00:00:00'),
 Timestamp('2013-12-05 00:00:00'))

In [30]:
min(fares2.divisions), max(fares2.divisions)

(Timestamp('2013-12-01 00:00:00'), Timestamp('2013-12-31 00:00:00'))

In [31]:
fares_fout = 'data/taxi-small/fares.parq'
(fares2
    .sample(frac=0.025)
    .to_parquet(fares_fout, compression="brotli")
)

In [32]:
del fares, fares2

# Load the trips data

In [33]:
trips_fn = "/Volumes/ssd-t3/data/nyc-opendata/taxi/trip_data_12.csv"

In [34]:
# read column names
with open(trips_fn, 'r') as f:
    trips_cols = [x.strip() for x in f.readline().split(',')]
trips_cols

['medallion',
 'hack_license',
 'vendor_id',
 'rate_code',
 'store_and_fwd_flag',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_time_in_secs',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude']

In [39]:
trips = dd.read_csv(trips_fn, header=0, names=trips_cols).persist()

In [40]:
trips.head()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,D7D598CD99978BD012A87A76A7C891B7,82F90D5EFE52FDFD2FDEC3EAD6D5771D,VTS,1,,2013-12-01 00:13:00,2013-12-01 00:31:00,1,1080,3.9,-73.97934,40.776653,-73.981865,40.73428
1,5455D5FF2BD94D10B304A15D4B7F2735,177B80B867CEC990DA166BA1D0FCAF82,VTS,1,,2013-12-01 00:40:00,2013-12-01 00:48:00,6,480,3.2,-73.939674,40.726154,-73.98558,40.718075
2,93D6821F86A12B537C5EADBDFB432CA7,28B0AA10202F83FEB0F4E69340CA8F86,VTS,1,,2013-12-01 02:21:00,2013-12-01 02:30:00,5,540,3.28,-73.958755,40.768078,-73.958755,40.768078
3,0C107B532C1207A74F0D8609B9E092FF,66C2CECD93E395CB9B875E9B382DB5D9,VTS,1,,2013-12-01 02:14:00,2013-12-01 02:22:00,1,480,1.84,-73.978836,40.724194,-73.979736,40.743412
4,801C69A08B51470871A8110F8B0505EE,91A07EEF642E8590C2EFD631C3DF89C9,VTS,1,,2013-12-01 04:45:00,2013-12-01 04:50:00,1,300,1.02,-73.991364,40.735073,-73.978943,40.7346


In [41]:
trips.dtypes

medallion              object
hack_license           object
vendor_id              object
rate_code               int64
store_and_fwd_flag     object
pickup_datetime        object
dropoff_datetime       object
passenger_count         int64
trip_time_in_secs       int64
trip_distance         float64
pickup_longitude      float64
pickup_latitude       float64
dropoff_longitude     float64
dropoff_latitude      float64
dtype: object

In [42]:
trips[['rate_code', 'passenger_count', 'trip_time_in_secs']].max().compute()

rate_code              210
passenger_count          9
trip_time_in_secs    10800
dtype: int64

In [43]:
trips.store_and_fwd_flag.unique().compute()

0    NaN
1      N
2      Y
Name: store_and_fwd_flag, dtype: object

In [44]:
trips_dtypes = {
    "vendor_id": "category",
    "rate_code": np.uint8,
    "store_and_fwd_flag": "category",
    "passenger_count": np.uint32,
    "trip_time_in_secs": np.uint32
}

trips_dates = ['pickup_datetime', 'dropoff_datetime']

In [45]:
trips = dd.read_csv(
    trips_fn, names=trips_cols, header=0, 
    parse_dates=trips_dates,
    dtype=trips_dtypes, infer_datetime_format=True
).persist()

In [46]:
# calculate divisions for trips
trip_divs = sorted(
    trips.pickup_datetime.dt.normalize().unique().compute().array)

In [47]:
trip_cols = [
    'medallion_id', 'hack_license_id',
    'rate_code', 'passenger_count', 'trip_time_in_secs', 'trip_distance', 
    'pickup_longitude', 'pickup_latitude', 'pickup_datetime'
]

trips = (trips
    .merge(medallions, on='medallion')
    .merge(hack_license, on='hack_license')
    [trip_cols]
    .set_index('pickup_datetime', divisions=trip_divs)
    .persist())

In [48]:
trips.head()

Unnamed: 0_level_0,medallion_id,hack_license_id,rate_code,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude
pickup_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2013-12-01,2796,4624,1,1,1080,3.57,-73.988686,40.736572
2013-12-01,3737,12864,1,1,840,3.72,-73.99308,40.76284
2013-12-01,229,245,1,1,480,1.83,-74.00824,40.726112
2013-12-01,4622,11904,1,1,1320,6.07,-74.009712,40.722065
2013-12-01,6620,11716,1,2,600,1.86,-73.992783,40.737125


In [51]:
fares3 = dd.read_parquet(fares_fout, columns=["medallion_id"]).persist()
trips2 = dd.merge(
    trips, fares3, on=["pickup_datetime", "medallion_id"]).persist()

In [52]:
len(trips2)

349278

In [53]:
trips2.head()

Unnamed: 0_level_0,medallion_id,hack_license_id,rate_code,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude
pickup_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2013-12-01 00:00:00,3306,9702,1,1,360,2.07,-73.988358,40.723461
2013-12-01 00:00:00,3540,6736,1,1,300,1.21,-73.947563,40.784515
2013-12-01 00:00:00,5986,14096,1,3,840,3.85,-73.999901,40.728611
2013-12-01 00:00:07,8899,18541,1,2,333,1.2,-73.951279,40.772842
2013-12-01 00:00:08,7171,15110,1,1,331,1.0,-73.996834,40.753414


In [54]:
assert trips2.known_divisions

In [56]:
trips_fout = "data/taxi-small/trips.parq"
trips2.to_parquet(trips_fout, compression="brotli")

In [57]:
client.close()