In [1]:
import pandas as pd
import os
from tqdm import tqdm_notebook as tqdm 
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
ProgressBar().register()
import multiprocessing
nCPU = multiprocessing.cpu_count()
from datetime import datetime, timedelta

  import pandas.util.testing as tm


In [2]:
# Get paths to files under the pwd, returned to get_df()
def get_paths(c_path, extension, yyyymmdd, limitation_keyword):
  l = []
  for pathname, dirnames, filenames in os.walk(c_path):
      for filename in filenames: 
          if yyyymmdd in filename.split('.')[0] and filename.split('.')[1] == extension:
              formal_path = os.path.join(pathname, filename)
              if limitation_keyword == '': # when no keyword is specified
                  l.append(formal_path)
              else:
                  if limitation_keyword in formal_path.split('/'): # when keyword specified
                      l.append(formal_path) 
  return l

def get_date(x): 
    t = datetime.strptime(x, "%Y-%m-%d %H:%M:%S")
    date = str(t.year) + '-' + str("%02d" % t.month) + '-' + str("%02d" % t.day)
    if t.year != 2017: # 2016 data is by accident included
        return None
    else:
        return date

def get_timeperiod(x): 
    t = datetime.strptime(x, "%Y-%m-%d %H:%M:%S")
    if t.hour in [22, 23, 0, 1, 2, 3]:
        tp = 'latenight'
    elif t.hour in [4, 5, 6, 7, 8, 9]:
        tp = 'morning'
    elif t.hour in [10, 11, 12, 13, 14, 15]:
        tp = 'midday' 
    else:
        tp = 'evening'
    return tp 

In [3]:
def aggregate_taxidata(datadir, dataname):
    df_out = pd.DataFrame(index=[], columns=['DOLocationID', 'Date', 'Time_Period'])
    for i in tqdm(get_paths('./data/' + datadir, 'csv', dataname, '')):
        df = pd.read_csv(i, dtype=str)
        
        if datadir == 'yellow':
            dropField = ['VendorID', 'tpep_pickup_datetime', 'passenger_count', 'trip_distance', 'PULocationID', 'RatecodeID', 'store_and_fwd_flag', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'improvement_surcharge', 'tip_amount', 'tolls_amount', 'total_amount']
            df = df.drop(dropField, axis='columns')
        elif datadir == 'green':
            dropField = ['VendorID', 'lpep_pickup_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type']
            df = df.drop(dropField, axis='columns')
            df.rename(inplace=True, columns={"lpep_dropoff_datetime": "tpep_dropoff_datetime", "DOLocationID": "DOLocationID"})
        elif datadir == 'fhv':
            dropField = ['Dispatching_base_num', 'Pickup_DateTime', 'PUlocationID']
            df = df.drop(dropField, axis='columns')
            df = df.dropna(subset=['DropOff_datetime']) 
            df = df.dropna(subset=['DOlocationID']) 
            df.rename(inplace=True, columns={"DropOff_datetime": "tpep_dropoff_datetime", "DOlocationID": "DOLocationID"})
    
        df = dd.from_pandas(df, npartitions=nCPU)
        meta = df['tpep_dropoff_datetime'].head(1).apply(lambda x: get_date(x))
        res = df['tpep_dropoff_datetime'].apply(lambda x: get_date(x), meta=meta)
        df['Date'] = res.compute(scheduler='processes') 
        meta = df['tpep_dropoff_datetime'].head(1).apply(lambda x: get_timeperiod(x))
        res = df['tpep_dropoff_datetime'].apply(lambda x: get_timeperiod(x), meta=meta)
        df['Time_Period'] = res.compute(scheduler='processes')
        df = df.compute()

        df = df.dropna(subset=['Date'])
        df = df.drop(['tpep_dropoff_datetime'], axis='columns')
        df_out = pd.concat([df_out, df])

    df_out.reset_index(inplace=True, drop=True)
    df_agg_out = pd.DataFrame(index=[], columns=['Date','DOLocationID','Time_Period','Volume'])
    for i in tqdm(df_out.Date.unique().tolist()):
        df_tmp = df_out.query('Date == @i')

        tmp = df_tmp.groupby('DOLocationID').Time_Period.value_counts().to_frame(name="Volume")
        tmp['Date'] = i
        tmp.reset_index(inplace=True)
        tmp = tmp[['Date','DOLocationID','Time_Period','Volume']]

        df_agg_out = pd.concat([df_agg_out, tmp])
    df_agg_out.rename(inplace=True, columns={"Date": "Date", "DOLocationID": "Taxi_Zone", "Time_Period": "Time_Period", "Volume": "Volume"})
    df_agg_out.to_csv(dataname + '_out.csv', index=False)

In [4]:
aggregate_taxidata('yellow', 'yellow_tripdata_2017')

HBox(children=(IntProgress(value=0, max=12), HTML(value='')))

[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1min  7.2s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  8min 31.4s
[########################################] | 100% Completed |  1.2s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1min 12.2s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  9min  3.6s
[########################################] | 100% Completed |  1.3s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1min  4.7s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  8min 21.2s
[###########

HBox(children=(IntProgress(value=0, max=365), HTML(value='')))




In [5]:
aggregate_taxidata('green', 'green_tripdata_2017')

HBox(children=(IntProgress(value=0, max=12), HTML(value='')))

[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  6.2s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed | 44.1s
[########################################] | 100% Completed |  0.2s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  6.6s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed | 44.4s
[########################################] | 100% Completed |  0.2s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  7.8s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed | 53.6s
[########################################] | 100

HBox(children=(IntProgress(value=0, max=365), HTML(value='')))




In [6]:
aggregate_taxidata('fhv', 'fhv_tripdata_2017')

HBox(children=(IntProgress(value=0, max=12), HTML(value='')))

[########################################] | 100% Completed |  0.1s




[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100



[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  2min 15.2s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed | 15min  1.7s
[########################################] | 100% Completed |  2.9s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1min 39.9s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed | 11min 47.3s
[########################################] | 100% Completed |  1.8s
[#######################

HBox(children=(IntProgress(value=0, max=216), HTML(value='')))


