This file is for merging the yellow, green, fhv and hvfhv taxi data

In [1]:
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os

In [2]:
# Get list of each filename
yellow_files = [filename for filename in os.listdir('.') if filename.startswith("yellow") and filename.endswith(".parquet")]
green_files = [filename for filename in os.listdir('.') if filename.startswith("green") and filename.endswith(".parquet")] 
fhv_files = [filename for filename in os.listdir('.') if filename.startswith("fhv_") and filename.endswith(".parquet")] #Need underscore to ensure it doesn't pick up fhvhv
fhvhv_files = [filename for filename in os.listdir('.') if filename.startswith("fhvhv") and filename.endswith(".parquet")]

In [3]:
def merge_files(filenames, output_name):
    # merge multiple files into one parquet file
    schema = pq.ParquetFile(filenames[0]).schema_arrow
    with pq.ParquetWriter(output_name, schema=schema) as writer:
        for file in filenames:
            table = pq.read_table(file, schema=schema)
            writer.write_table(table)
            print("Read", file)

In [9]:
print(pq.ParquetFile(fhvhv_files[0]).schema_arrow)

hvfhs_license_num: string
dispatching_base_num: string
originating_base_num: string
request_datetime: timestamp[us]
on_scene_datetime: timestamp[us]
pickup_datetime: timestamp[us]
dropoff_datetime: timestamp[us]
PULocationID: int64
DOLocationID: int64
trip_miles: double
trip_time: int64
base_passenger_fare: double
tolls: double
bcf: double
sales_tax: double
congestion_surcharge: double
airport_fee: double
tips: double
driver_pay: double
shared_request_flag: string
shared_match_flag: string
access_a_ride_flag: string
wav_request_flag: string
wav_match_flag: string
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 3168


In [5]:
files_to_merge = [yellow_files] # [yellow_files, green_files, fhv_files, fhvhv_files]
output_names = ["all_yellow_tripdata.parquet"] # ["all_yellow_tripdata.parquet", "all_green_tripdata.parquet", "all_fhv_tripdata.parquet", "all_fhvhv_tripdata.parquet"] # start name with all to ensure 
# NOTE: FHVHV files have to be done separately (below) due to malloc issues
for filenames, output_name in zip(files_to_merge, output_names):
    merge_files(filenames, output_name)
    print(output_name, "merged")

Read fhvhv_tripdata_2021-01.parquet
Read fhvhv_tripdata_2021-02.parquet
Read fhvhv_tripdata_2021-03.parquet
Read fhvhv_tripdata_2021-04.parquet
Read fhvhv_tripdata_2021-05.parquet
Read fhvhv_tripdata_2021-06.parquet
Read fhvhv_tripdata_2021-07.parquet
Read fhvhv_tripdata_2021-08.parquet
Read fhvhv_tripdata_2021-09.parquet
Read fhvhv_tripdata_2021-10.parquet
Read fhvhv_tripdata_2021-11.parquet
Read fhvhv_tripdata_2021-12.parquet
Read fhvhv_tripdata_2022-01.parquet
Read fhvhv_tripdata_2022-02.parquet
Read fhvhv_tripdata_2022-03.parquet
Read fhvhv_tripdata_2022-04.parquet
Read fhvhv_tripdata_2022-05.parquet
Read fhvhv_tripdata_2022-06.parquet
Read fhvhv_tripdata_2022-07.parquet
Read fhvhv_tripdata_2022-08.parquet
Read fhvhv_tripdata_2022-09.parquet
Read fhvhv_tripdata_2022-10.parquet
Read fhvhv_tripdata_2022-11.parquet
Read fhvhv_tripdata_2022-12.parquet
Read fhvhv_tripdata_2023-01.parquet


ArrowMemoryError: malloc of size 44040256 failed

In [10]:
print(pq.ParquetFile(fhvhv_files[-1]).schema_arrow)

hvfhs_license_num: large_string
dispatching_base_num: large_string
originating_base_num: large_string
request_datetime: timestamp[us]
on_scene_datetime: timestamp[us]
pickup_datetime: timestamp[us]
dropoff_datetime: timestamp[us]
PULocationID: int32
DOLocationID: int32
trip_miles: double
trip_time: int64
base_passenger_fare: double
tolls: double
bcf: double
sales_tax: double
congestion_surcharge: double
airport_fee: double
tips: double
driver_pay: double
shared_request_flag: large_string
shared_match_flag: large_string
access_a_ride_flag: large_string
wav_request_flag: large_string
wav_match_flag: large_string


In [21]:
table = pq.read_table("fhvhv_tripdata_2023-01.parquet")
table = table.drop(['hvfhs_license_num', 'dispatching_base_num', 'originating_base_num',
       'request_datetime', 'on_scene_datetime', 'trip_miles',
       'trip_time', 'base_passenger_fare', 'tolls', 'bcf', 'sales_tax',
       'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay',
       'shared_request_flag', 'shared_match_flag', 'access_a_ride_flag',
       'wav_request_flag', 'wav_match_flag'])
filtered_schema = table.schema
df = table.to_pandas()
df.columns

Index(['pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID'], dtype='object')

In [36]:
# merge multiple files into one parquet file
schema = pq.ParquetFile(fhvhv_files[0]).schema_arrow
with pq.ParquetWriter("all_fhvhv_tripdata.parquet", schema=filtered_schema) as writer:
    for file in fhvhv_files:
        table = pq.read_table(file)
        table = table.drop(['hvfhs_license_num', 'dispatching_base_num', 'originating_base_num',
        'request_datetime', 'on_scene_datetime', 'trip_miles',
        'trip_time', 'base_passenger_fare', 'tolls', 'bcf', 'sales_tax',
        'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay',
        'shared_request_flag', 'shared_match_flag', 'access_a_ride_flag',
        'wav_request_flag', 'wav_match_flag'])
        table = table.cast(filtered_schema)
        writer.write_table(table)
        print("Read", file)

Read fhvhv_tripdata_2021-01.parquet
Read fhvhv_tripdata_2021-02.parquet
Read fhvhv_tripdata_2021-03.parquet
Read fhvhv_tripdata_2021-04.parquet
Read fhvhv_tripdata_2021-05.parquet
Read fhvhv_tripdata_2021-06.parquet
Read fhvhv_tripdata_2021-07.parquet
Read fhvhv_tripdata_2021-08.parquet
Read fhvhv_tripdata_2021-09.parquet
Read fhvhv_tripdata_2021-10.parquet
Read fhvhv_tripdata_2021-11.parquet
Read fhvhv_tripdata_2021-12.parquet
Read fhvhv_tripdata_2022-01.parquet
Read fhvhv_tripdata_2022-02.parquet
Read fhvhv_tripdata_2022-03.parquet
Read fhvhv_tripdata_2022-04.parquet
Read fhvhv_tripdata_2022-05.parquet
Read fhvhv_tripdata_2022-06.parquet
Read fhvhv_tripdata_2022-07.parquet
Read fhvhv_tripdata_2022-08.parquet
Read fhvhv_tripdata_2022-09.parquet
Read fhvhv_tripdata_2022-10.parquet
Read fhvhv_tripdata_2022-11.parquet
Read fhvhv_tripdata_2022-12.parquet
Read fhvhv_tripdata_2023-01.parquet
Read fhvhv_tripdata_2023-02.parquet
Read fhvhv_tripdata_2023-03.parquet
Read fhvhv_tripdata_2023-04.

In [35]:
# merge multiple files into one parquet file
schema = pq.ParquetFile(fhvhv_files[0]).schema_arrow
with pq.ParquetWriter("all_fhvhv_tripdata.parquet", schema=filtered_schema) as writer:

    table = pq.read_table("fhvhv_tripdata_2023-02.parquet")
    table = table.drop(['hvfhs_license_num', 'dispatching_base_num', 'originating_base_num',
    'request_datetime', 'on_scene_datetime', 'trip_miles',
    'trip_time', 'base_passenger_fare', 'tolls', 'bcf', 'sales_tax',
    'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay',
    'shared_request_flag', 'shared_match_flag', 'access_a_ride_flag',
    'wav_request_flag', 'wav_match_flag'])
    table = table.cast(filtered_schema)
    writer.write_table(table)
    print("Read", file)

pickup_datetime: timestamp[us]
dropoff_datetime: timestamp[us]
PULocationID: int64
DOLocationID: int64
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 3168
Read fhvhv_tripdata_2023-02.parquet


In [8]:
for file in fhvhv_files:
    print(pq.ParquetFile(fhvhv_files[0]).schema_arrow == pq.ParquetFile(file).schema_arrow, file)

True fhvhv_tripdata_2021-01.parquet
True fhvhv_tripdata_2021-02.parquet
True fhvhv_tripdata_2021-03.parquet
True fhvhv_tripdata_2021-04.parquet
True fhvhv_tripdata_2021-05.parquet
True fhvhv_tripdata_2021-06.parquet
True fhvhv_tripdata_2021-07.parquet
True fhvhv_tripdata_2021-08.parquet
True fhvhv_tripdata_2021-09.parquet
True fhvhv_tripdata_2021-10.parquet
True fhvhv_tripdata_2021-11.parquet
True fhvhv_tripdata_2021-12.parquet
True fhvhv_tripdata_2022-01.parquet
True fhvhv_tripdata_2022-02.parquet
True fhvhv_tripdata_2022-03.parquet
True fhvhv_tripdata_2022-04.parquet
True fhvhv_tripdata_2022-05.parquet
True fhvhv_tripdata_2022-06.parquet
True fhvhv_tripdata_2022-07.parquet
True fhvhv_tripdata_2022-08.parquet
True fhvhv_tripdata_2022-09.parquet
True fhvhv_tripdata_2022-10.parquet
True fhvhv_tripdata_2022-11.parquet
True fhvhv_tripdata_2022-12.parquet
True fhvhv_tripdata_2023-01.parquet
False fhvhv_tripdata_2023-02.parquet
False fhvhv_tripdata_2023-03.parquet
False fhvhv_tripdata_2023-

# Sample of each table

In [None]:
table1 = pq.read_table('green_tripdata_2024-01.parquet')
table1.to_pandas()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2024-01-01 00:46:55,2024-01-01 00:58:25,N,1.0,236,239,1.0,1.98,12.80,1.0,0.5,3.61,0.0,,1.0,21.66,1.0,1.0,2.75
1,2,2024-01-01 00:31:42,2024-01-01 00:52:34,N,1.0,65,170,5.0,6.54,30.30,1.0,0.5,7.11,0.0,,1.0,42.66,1.0,1.0,2.75
2,2,2024-01-01 00:30:21,2024-01-01 00:49:23,N,1.0,74,262,1.0,3.08,19.80,1.0,0.5,3.00,0.0,,1.0,28.05,1.0,1.0,2.75
3,1,2024-01-01 00:30:20,2024-01-01 00:42:12,N,1.0,74,116,1.0,2.40,14.20,1.0,1.5,0.00,0.0,,1.0,16.70,2.0,1.0,0.00
4,2,2024-01-01 00:32:38,2024-01-01 00:43:37,N,1.0,74,243,1.0,5.14,22.60,1.0,0.5,6.28,0.0,,1.0,31.38,1.0,1.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
56546,2,2024-01-31 20:46:00,2024-01-31 20:55:00,,,33,25,,0.00,11.58,0.0,0.0,3.14,0.0,,1.0,15.72,,,
56547,2,2024-01-31 21:06:00,2024-01-31 21:11:00,,,72,72,,0.49,11.58,0.0,0.0,0.00,0.0,,1.0,12.58,,,
56548,2,2024-01-31 21:36:00,2024-01-31 21:40:00,,,72,72,,0.52,11.58,0.0,0.0,2.52,0.0,,1.0,15.10,,,
56549,2,2024-01-31 22:45:00,2024-01-31 22:51:00,,,41,42,,1.17,14.22,0.0,0.0,0.00,0.0,,1.0,15.22,,,


In [None]:
table2 = pq.read_table('fhv_tripdata_2024-01.parquet')
table2.to_pandas()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00053,2024-01-01 00:15:00,2024-01-01 02:13:00,,,,B00014
1,B00111,2024-01-01 00:30:00,2024-01-01 02:37:00,,,,B00111
2,B00112,2024-01-01 00:27:24,2024-01-01 01:12:05,,14.0,,B00112
3,B00112,2024-01-01 00:10:09,2024-01-01 00:25:39,,133.0,,B00112
4,B00112,2024-01-01 00:57:07,2024-01-01 01:05:04,,14.0,,B00112
...,...,...,...,...,...,...,...
1290111,B03492,2024-01-31 23:51:21,2024-01-31 23:57:09,,14.0,,B03492
1290112,B03492,2024-01-31 23:05:06,2024-01-31 23:20:13,,21.0,,B03492
1290113,B03492,2024-01-31 23:49:07,2024-02-01 00:16:25,,14.0,,B03492
1290114,B03505,2024-01-31 23:04:46,2024-01-31 23:18:22,,76.0,,B03505


In [None]:
table3 = pq.read_table('fhvhv_tripdata_2024-01.parquet')
table3.to_pandas()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,...,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
0,HV0003,B03404,B03404,2024-01-01 00:21:47,2024-01-01 00:25:06,2024-01-01 00:28:08,2024-01-01 01:05:39,161,158,2.83,...,4.05,2.75,0.0,0.00,40.18,N,N,N,N,N
1,HV0003,B03404,B03404,2024-01-01 00:10:56,2024-01-01 00:11:08,2024-01-01 00:12:53,2024-01-01 00:20:05,137,79,1.57,...,0.89,2.75,0.0,0.00,6.12,N,N,N,N,N
2,HV0003,B03404,B03404,2024-01-01 00:20:04,2024-01-01 00:21:51,2024-01-01 00:23:05,2024-01-01 00:35:16,79,186,1.98,...,1.60,2.75,0.0,0.00,9.47,N,N,N,N,N
3,HV0003,B03404,B03404,2024-01-01 00:35:46,2024-01-01 00:39:59,2024-01-01 00:41:04,2024-01-01 00:56:34,234,148,1.99,...,1.52,2.75,0.0,0.00,11.35,N,N,N,N,N
4,HV0003,B03404,B03404,2024-01-01 00:48:19,2024-01-01 00:56:23,2024-01-01 00:57:21,2024-01-01 01:10:02,148,97,2.65,...,3.43,2.75,0.0,0.00,28.63,N,N,N,N,N
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
19663925,HV0003,B03404,B03404,2024-01-31 23:24:46,2024-01-31 23:26:11,2024-01-31 23:28:08,2024-01-31 23:32:13,79,113,0.65,...,0.81,2.75,0.0,1.00,5.39,N,N,N,N,N
19663926,HV0003,B03404,B03404,2024-01-31 23:33:02,2024-01-31 23:34:07,2024-01-31 23:34:19,2024-02-01 00:07:53,113,248,13.32,...,3.19,2.75,0.0,0.00,36.43,N,N,N,N,N
19663927,HV0003,B03404,B03404,2024-01-31 23:28:59,2024-01-31 23:30:51,2024-01-31 23:31:14,2024-01-31 23:38:18,161,50,1.31,...,0.89,2.75,0.0,0.00,5.71,N,N,N,N,N
19663928,HV0003,B03404,B03404,2024-01-31 23:39:00,2024-01-31 23:41:03,2024-01-31 23:41:45,2024-01-31 23:52:40,246,163,1.57,...,1.62,2.75,0.0,4.62,8.54,N,N,N,N,N


In [None]:
print(table.shape[1])
print(table1.shape[1])
print(table2.shape[1])
print(table3.shape[1])

19
20
7
24
