**Combine csv files into one hdf5 dataset. Location coordinates are added.**

In [0]:
!pip install -U tables
!pip install pandarallel
!pip install holidays
import pandas as pd
import numpy as np
import json
import os
import pandarallel

Requirement already up-to-date: tables in /home/ubuntu/anaconda3/lib/python3.6/site-packages (3.6.1)


In [0]:
# from google.colab import drive
# drive.mount('/content/gdrive')

In [0]:
dataset_root = os.path.expanduser('~/TaxiData')

In [0]:
cx_map = json.load(open(os.path.join(dataset_root, 'cx.json')))
cy_map = json.load(open(os.path.join(dataset_root, 'cy.json')))
cx_map = {int(k): v for k, v in cx_map.items()}
cy_map = {int(k): v for k, v in cy_map.items()}

In [0]:
# compute central coordinates
print(np.array([v for k,v in cx_map.items() if k < 264]).mean())
print(np.array([v for k,v in cy_map.items() if k < 264]).mean())

-73.93129805178539
40.72586769315081


In [0]:
def load_all(root):
  with open(os.path.join(root, 'manifest.txt')) as f:
    files = f.readlines()
  files = map(lambda filename: os.path.join(root, filename.strip()), files)
  frames = list(map(pd.read_csv, files))
  dataset = pd.concat(frames, ignore_index=True)
  return dataset

In [0]:
dataset = load_all(dataset_root)
print(dataset)
dataset.to_hdf(os.path.join(dataset_root, 'data.h5'),'df')

          VendorID tpep_pickup_datetime tpep_dropoff_datetime  \
0                1  2017-07-30 00:20:56   2017-07-30 00:48:20   
1                1  2017-07-30 00:30:28   2017-07-30 00:48:10   
2                2  2017-07-30 00:20:08   2017-07-30 00:23:58   
3                2  2017-07-30 00:29:00   2017-07-30 00:32:40   
4                2  2017-07-30 00:41:55   2017-07-30 01:08:30   
5                2  2017-07-30 00:20:09   2017-07-30 00:24:50   
6                2  2017-07-30 00:29:11   2017-07-30 00:47:57   
7                2  2017-07-30 00:07:08   2017-07-30 00:13:03   
8                2  2017-07-30 00:20:49   2017-07-30 00:32:17   
9                2  2017-07-30 00:34:52   2017-07-30 00:38:17   
10               2  2017-07-30 00:41:34   2017-07-30 00:48:15   
11               2  2017-07-30 00:49:44   2017-07-30 01:22:53   
12               1  2017-07-30 00:19:46   2017-07-30 00:37:33   
13               1  2017-07-30 00:38:41   2017-07-30 00:58:02   
14               1  2017-

In [0]:
dataset['tpep_pickup_datetime'] = pd.to_datetime(dataset['tpep_pickup_datetime'])
dataset['tpep_dropoff_datetime'] = pd.to_datetime(dataset['tpep_dropoff_datetime'])

In [0]:
dataset.drop_duplicates(inplace=True)
dataset.dropna(inplace=True)
dataset.drop(dataset.loc[dataset['tpep_pickup_datetime'] >= dataset['tpep_dropoff_datetime']].index, inplace=True)

In [0]:
dataset.drop(dataset.loc[dataset['tpep_pickup_datetime'].dt.year < 2017].index, inplace=True)
dataset.drop(dataset.loc[dataset['tpep_dropoff_datetime'].dt.year < 2017].index, inplace=True)

In [0]:
print(dataset['tpep_pickup_datetime'].min())
print(dataset['tpep_pickup_datetime'].max())
print(dataset['tpep_dropoff_datetime'].min())
print(dataset['tpep_dropoff_datetime'].max())

2017-01-01 00:00:00
2017-07-31 00:00:00
2017-01-01 00:00:00
2017-07-31 23:54:24


In [0]:
dataset['PULocationX'] = dataset['PULocationID'].map(lambda v: cx_map[v])
dataset['PULocationY'] = dataset['PULocationID'].map(lambda v: cy_map[v])
dataset['DOLocationX'] = dataset['DOLocationID'].map(lambda v: cx_map[v])
dataset['DOLocationY'] = dataset['DOLocationID'].map(lambda v: cy_map[v])

In [0]:
# load from disk
dataset = pd.read_hdf(os.path.join(dataset_root, 'data.h5'))

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,PULocationID,DOLocationID,payment_type,PULocationX,PULocationY,DOLocationX,DOLocationY,snow,rain,isHoliday
0,1,2017-07-30 00:20:56,2017-07-30 00:48:20,1,138,265,2,-73.873629,40.774376,-73.931298,40.725868,0.0,0.0,0
1,1,2017-07-30 00:30:28,2017-07-30 00:48:10,1,161,7,1,-73.977698,40.758028,-73.919694,40.761493,0.0,0.0,0
2,2,2017-07-30 00:20:08,2017-07-30 00:23:58,5,164,161,2,-73.985157,40.748575,-73.977698,40.758028,0.0,0.0,0
3,2,2017-07-30 00:29:00,2017-07-30 00:32:40,5,229,233,1,-73.965146,40.756729,-73.970443,40.749914,0.0,0.0,0
4,2,2017-07-30 00:41:55,2017-07-30 01:08:30,5,137,244,2,-73.976494,40.740439,-73.941399,40.841709,0.0,0.0,0
5,2,2017-07-30 00:20:09,2017-07-30 00:24:50,1,48,43,2,-73.989845,40.762253,-73.965554,40.782478,0.0,0.0,0
6,2,2017-07-30 00:29:11,2017-07-30 00:47:57,1,163,158,1,-73.977569,40.764421,-74.008984,40.735035,0.0,0.0,0
7,2,2017-07-30 00:07:08,2017-07-30 00:13:03,5,234,137,1,-73.990458,40.740337,-73.976494,40.740439,0.0,0.0,0
8,2,2017-07-30 00:20:49,2017-07-30 00:32:17,5,234,186,2,-73.990458,40.740337,-73.992438,40.748497,0.0,0.0,0
9,2,2017-07-30 00:34:52,2017-07-30 00:38:17,5,186,164,1,-73.992438,40.748497,-73.985157,40.748575,0.0,0.0,0


In [0]:
import holidays
# import holidays
us_holidays = holidays.UnitedStates()
def is_holiday(x):
  if x in us_holidays:
    return 1
  else:
    return 0


In [0]:
import pandarallel
pandarallel.pandarallel.initialize(progress_bar=True)

INFO: Pandarallel will run on 48 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


In [0]:
weather_df = pd.read_csv('/home/ubuntu/weather.csv', parse_dates=['date_time'])
weather_dict = {}
for i, row in weather_df.iterrows():
    time = row['date_time']
    year = time.year
    month = time.month
    day = time.day
    hour = time.hour
    snow = row['totalSnow_cm']
    rain = row['precipMM']
    weather_dict[(year,month,day,hour)] = (snow,rain)
    


In [0]:
def get_weather(x):
  time = x
  data = weather_dict[(time.year,time.month,time.day,time.hour)]
  return data
    
def parallel_func(x):
  snow, rain = get_weather(x)
  return snow, rain, is_holiday(x)


In [0]:
# pandarallel.pandarallel.initialize(progress_bar=True)
dataset['snow'], dataset['rain'], dataset['isHoliday'] = zip(*dataset['tpep_pickup_datetime'].parallel_apply(parallel_func))


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=1400767), Label(value='0 / 1400767…

In [0]:
# save to disk
dataset.to_hdf(os.path.join(dataset_root, 'data.h5'), key='df')

In [0]:
# upload hdf5 file to S3
import boto3
s3 = boto3.client('s3', aws_access_key_id='AWS_ACCESS_KEY', aws_secret_access_key='AWS_SECRET_KEY')
with open(os.path.join(dataset_root, 'data.h5'), "rb") as f:
    s3.upload_fileobj(f, "project", "data.h5")

In [0]:
# load from disk
dataset = pd.read_hdf(os.path.join(dataset_root, 'data.h5'))

In [0]:
subset = dataset.sample(n=1000000)

In [0]:
# show the sampled dataset
subset

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,PULocationID,DOLocationID,payment_type,PULocationX,PULocationY,DOLocationX,DOLocationY,snow,rain,isHoliday
35144951,1,2017-04-20 11:07:57,2017-04-20 11:17:24,1,164,161,1,-73.985157,40.748575,-73.977698,40.758028,0.0,0.0,0
39747208,2,2017-05-01 19:42:59,2017-05-01 19:45:17,1,234,113,2,-73.990458,40.740337,-73.994305,40.732579,0.0,0.0,0
52921370,1,2017-06-14 00:09:25,2017-06-14 00:15:24,2,161,230,1,-73.977698,40.758028,-73.984196,40.759818,0.0,0.0,0
45911450,1,2017-05-20 15:34:14,2017-05-20 15:40:24,1,125,211,1,-74.007486,40.726290,-74.001538,40.723888,0.0,0.0,0
8067041,1,2017-01-26 17:56:07,2017-01-26 18:03:48,2,100,230,2,-73.988786,40.753513,-73.984196,40.759818,0.0,0.0,0
6704448,1,2017-01-14 15:08:56,2017-01-14 15:12:35,1,236,237,1,-73.957012,40.780436,-73.965634,40.768615,0.9,0.3,0
62423487,2,2017-07-13 14:10:28,2017-07-13 14:32:44,1,161,79,2,-73.977698,40.758028,-73.985937,40.727620,0.0,1.5,0
50009581,2,2017-06-01 16:24:09,2017-06-01 16:37:04,2,68,211,1,-73.999917,40.748427,-74.001538,40.723888,0.0,0.0,0
50029427,1,2017-06-01 17:35:47,2017-06-01 17:43:09,2,151,239,1,-73.968168,40.797962,-73.978633,40.783962,0.0,0.0,0
41838662,1,2017-05-07 18:55:17,2017-05-07 19:09:19,1,261,170,2,-74.013024,40.709139,-73.978492,40.747746,0.0,0.0,0
