In [1]:
import polars as pl

# Test links

### August 2024 data
* Yellow Taxi Trip Records
  URL: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-08.parquet
* Green Taxi Trip Records
  URL: https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-08.parquet
* For-Hire Vehicle Trip Records
  URL: https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2024-08.parquet
* High Volume For-Hire Vehicle Trip Records
  URL: https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2024-08.parquet

# Location ID

In [2]:
df_loc_dict = pl.read_csv("../data/data dictionary/taxi_zone_lookup.csv",
                          columns=["LocationID", "Borough"],
                          schema_overrides={'LocationID': pl.Int32, 'Borough': pl.Utf8})

# Yellow Taxi

In [3]:
ride_type = "Yellow Taxi Trip Records".replace(" Trip Records", "")
timestamp_col = 'tpep_pickup_datetime'
schema_overrides = {
        timestamp_col: pl.Datetime, 
        'PULocationID': pl.Int32,
        'DOLocationID': pl.Int32,
        'payment_type': pl.Int32,
        'total_amount': pl.Float64
    }

df = pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-08.parquet",
                           columns=["tpep_pickup_datetime", "PULocationID", "DOLocationID", 
                                    "payment_type", "total_amount"])
df = df.with_columns([pl.col(col).cast(dtype) for col, dtype in schema_overrides.items()])
df = df.filter(pl.col("payment_type") != 6)

 # Ride type column = Yellow Taxi
df = df.with_columns(
    pl.lit(ride_type).alias("ride_type")
)

df = df.with_columns(
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.truncate('1h').alias('timestamp_hour'),
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.date().alias('txn_date'),
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.hour().alias('txn_hour')
)

# Pickup Borough
df = (
    df
    .join(df_loc_dict[["Borough", "LocationID"]], 
          how="left", 
          left_on="PULocationID", right_on="LocationID")
    .rename({"Borough": "PUBorough"})
)

# Dropoff Borough
df = (
    df
    .join(df_loc_dict[["Borough", "LocationID"]], 
          how="left", 
          left_on="DOLocationID", right_on="LocationID")
    .rename({"Borough": "DOBorough"})
)

# Drop off the Location IDs
df = (
     df
    .group_by(["txn_date", "txn_hour", "timestamp_hour", "ride_type", "PUBorough", "DOBorough", "PULocationID", "DOLocationID"])
    .agg([
        pl.len().alias("num_txns"),
        pl.col("total_amount").mean().alias("total_amount")
    ])
)

# Arranging the columns
df = df.select(['txn_date', 'txn_hour', 'timestamp_hour',
                "PULocationID", 'PUBorough', 
                "DOLocationID", 'DOBorough',
                'ride_type',
                'num_txns','total_amount',
                ])

# Filter only august data
df = df.filter(pl.col("txn_date").dt.strftime("%B") == "August")
display(df.head())

### Test if there are missing hours
(
    df
    [["txn_date", "txn_hour"]]
    .group_by(["txn_date"])
    .agg(
        pl.col("txn_hour").n_unique().alias("num_hrs")
    )
    .sort("txn_date")
).filter(pl.col("num_hrs") < 24)

txn_date,txn_hour,timestamp_hour,PULocationID,PUBorough,DOLocationID,DOBorough,ride_type,num_txns,total_amount
date,i8,datetime[ns],i32,str,i32,str,str,u32,f64
2024-08-18,20,2024-08-18 20:00:00,238,"""Manhattan""",68,"""Manhattan""","""Yellow Taxi""",2,30.02
2024-08-26,21,2024-08-26 21:00:00,163,"""Manhattan""",143,"""Manhattan""","""Yellow Taxi""",7,17.168571
2024-08-11,16,2024-08-11 16:00:00,132,"""Queens""",124,"""Queens""","""Yellow Taxi""",2,35.96
2024-08-13,7,2024-08-13 07:00:00,170,"""Manhattan""",49,"""Brooklyn""","""Yellow Taxi""",1,31.0
2024-08-02,12,2024-08-02 12:00:00,140,"""Manhattan""",166,"""Manhattan""","""Yellow Taxi""",2,37.175


txn_date,num_hrs
date,u32


# Green Taxi

In [4]:
timestamp_col = "lpep_pickup_datetime"
ride_type = "Green Taxi Trip Records".replace(" Trip Records", "")
schema_overrides = {
        timestamp_col: pl.Datetime, 
        'PULocationID': pl.Int32,
        'DOLocationID': pl.Int32,
        'payment_type': pl.Int32,
        'total_amount': pl.Float64
    }

df = pl.read_parquet(
    "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-08.parquet",
    columns=[timestamp_col, "PULocationID", "DOLocationID", "payment_type", "total_amount"],
)
df = df.with_columns([pl.col(col).cast(dtype) for col, dtype in schema_overrides.items()])
df = df.with_columns(pl.lit(ride_type).alias("ride_type"))
df = df.filter(pl.col("payment_type") != 6)

In [5]:
df = df.with_columns(
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.truncate('1h').alias('timestamp_hour'),
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.date().alias('txn_date'),
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.hour().alias('txn_hour')
)

# Pickup Borough
df = (
    df
    .join(df_loc_dict[["Borough", "LocationID"]], 
          how="left", 
          left_on="PULocationID", right_on="LocationID")
    .rename({"Borough": "PUBorough"})
)

# Dropoff Borough
df = (
    df
    .join(df_loc_dict[["Borough", "LocationID"]], 
          how="left", 
          left_on="DOLocationID", right_on="LocationID")
    .rename({"Borough": "DOBorough"})
)

# Aggregations
df = (
     df
    .group_by(["txn_date", "txn_hour", "timestamp_hour", "ride_type", "PUBorough", "DOBorough", "PULocationID", "DOLocationID"])
    .agg([
        pl.len().alias("num_txns"),
        pl.col("total_amount").mean().alias("total_amount")
    ])
)

# Arranging the columns
df = df.select(['txn_date', 'txn_hour', 'timestamp_hour',
                "PULocationID", 'PUBorough', 
                "DOLocationID", 'DOBorough',
                'ride_type',
                'num_txns','total_amount',
                ])

# Filter only august data
df = df.filter(pl.col("txn_date").dt.strftime("%B") == "August")
display(df.head())

### Test if there are missing hours
(
    df
    [["txn_date", "txn_hour"]]
    .group_by(["txn_date"])
    .agg(
        pl.col("txn_hour").n_unique().alias("num_hrs")
    )
    .sort("txn_date")
).filter(pl.col("num_hrs") < 24)

txn_date,txn_hour,timestamp_hour,PULocationID,PUBorough,DOLocationID,DOBorough,ride_type,num_txns,total_amount
date,i8,datetime[ns],i32,str,i32,str,str,u32,f64
2024-08-12,19,2024-08-12 19:00:00,75,"""Manhattan""",239,"""Manhattan""","""Green Taxi""",1,19.75
2024-08-29,9,2024-08-29 09:00:00,29,"""Brooklyn""",210,"""Brooklyn""","""Green Taxi""",1,19.0
2024-08-26,6,2024-08-26 06:00:00,74,"""Manhattan""",141,"""Manhattan""","""Green Taxi""",1,23.82
2024-08-03,2,2024-08-03 02:00:00,265,"""N/A""",265,"""N/A""","""Green Taxi""",1,261.3
2024-08-26,10,2024-08-26 10:00:00,75,"""Manhattan""",41,"""Manhattan""","""Green Taxi""",2,11.6


txn_date,num_hrs
date,u32


# FHV

In [6]:
timestamp_col = "pickup_datetime"
ride_type = "For-Hire Vehicle Trip Records".replace(" Trip Records", "")

schema_overrides = {
        timestamp_col: pl.Datetime, 
        'PULocationID': pl.Int32,
        'DOLocationID': pl.Int32,
        'total_amount': pl.Float64
    }

df = pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2024-08.parquet",
                    columns=[timestamp_col, "PUlocationID", "DOlocationID"])
df = df.rename({"PUlocationID": "PULocationID", "DOlocationID": "DOLocationID"})
df = df.with_columns([
    pl.lit(None).cast(pl.Float64).alias("total_amount")  # Add a blank column
])
df = df.with_columns([pl.col(col).cast(dtype) for col, dtype in schema_overrides.items()])
df = df.with_columns(pl.lit(ride_type).alias("ride_type"))

In [7]:
df = df.with_columns(
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.truncate('1h').alias('timestamp_hour'),
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.date().alias('txn_date'),
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.hour().alias('txn_hour')
)

# Pickup Borough
df = (
    df
    .join(df_loc_dict[["Borough", "LocationID"]], 
          how="left", 
          left_on="PULocationID", right_on="LocationID")
    .rename({"Borough": "PUBorough"})
)

# Dropoff Borough
df = (
    df
    .join(df_loc_dict[["Borough", "LocationID"]], 
          how="left", 
          left_on="DOLocationID", right_on="LocationID")
    .rename({"Borough": "DOBorough"})
)

# Aggregations
df = (
     df
    .group_by(["txn_date", "txn_hour", "timestamp_hour", "ride_type", "PUBorough", "DOBorough", "PULocationID", "DOLocationID"])
    .agg([
        pl.len().alias("num_txns"),
        pl.col("total_amount").mean().alias("total_amount")
    ])
)

# Arranging the columns
df = df.select(['txn_date', 'txn_hour', 'timestamp_hour',
                "PULocationID", 'PUBorough', 
                "DOLocationID", 'DOBorough',
                'ride_type',
                'num_txns','total_amount',
                ])

# Filter only august data
df = df.filter(pl.col("txn_date").dt.strftime("%B") == "August")
display(df.head())

### Test if there are missing hours
(
    df
    [["txn_date", "txn_hour"]]
    .group_by(["txn_date"])
    .agg(
        pl.col("txn_hour").n_unique().alias("num_hrs")
    )
    .sort("txn_date")
).filter(pl.col("num_hrs") < 24)

txn_date,txn_hour,timestamp_hour,PULocationID,PUBorough,DOLocationID,DOBorough,ride_type,num_txns,total_amount
date,i8,datetime[ns],i32,str,i32,str,str,u32,f64
2024-08-03,15,2024-08-03 15:00:00,251.0,"""Staten Island""",156,"""Staten Island""","""For-Hire Vehicle""",1,
2024-08-17,12,2024-08-17 12:00:00,,,4,"""Manhattan""","""For-Hire Vehicle""",2,
2024-08-13,22,2024-08-13 22:00:00,,,49,"""Brooklyn""","""For-Hire Vehicle""",1,
2024-08-15,9,2024-08-15 09:00:00,41.0,"""Manhattan""",92,"""Queens""","""For-Hire Vehicle""",1,
2024-08-23,14,2024-08-23 14:00:00,214.0,"""Staten Island""",156,"""Staten Island""","""For-Hire Vehicle""",1,


txn_date,num_hrs
date,u32


# HVFHV

In [8]:
hvfhs_mapping = {
    "HV0002": "Juno",
    "HV0003": "Uber",
    "HV0004": "Via",
    "HV0005": "Lyft"
}

In [9]:
timestamp_col = 'request_datetime'
df = pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2024-08.parquet",
                    columns=["hvfhs_license_num", timestamp_col, "PULocationID", "DOLocationID", 
                             "base_passenger_fare", "tolls", "bcf", "sales_tax", 
                             "congestion_surcharge", "airport_fee", "tips"])

# Create the total_amount column based on the other columns
df = df.with_columns([
    (pl.col("base_passenger_fare") + pl.col("tolls") + pl.col("bcf") + 
     pl.col("sales_tax") + pl.col("congestion_surcharge") + pl.col("airport_fee") + pl.col("tips"))
    .alias("total_amount")
])

# Map the license num to uber, lyft, etc
df = df.with_columns([
    pl.col("hvfhs_license_num").replace(hvfhs_mapping).alias("ride_type")
])

# Drop unnecessary columns
df = df.drop(["hvfhs_license_num", "base_passenger_fare", "tolls", "bcf", 
              "sales_tax", "congestion_surcharge", "airport_fee", "tips"])

In [10]:
df = df.with_columns(
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.truncate('1h').alias('timestamp_hour'),
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.date().alias('txn_date'),
    pl.col(timestamp_col).cast(pl.Datetime("ns")).dt.hour().alias('txn_hour')
)

# Pickup Borough
df = (
    df
    .join(df_loc_dict[["Borough", "LocationID"]], 
          how="left", 
          left_on="PULocationID", right_on="LocationID")
    .rename({"Borough": "PUBorough"})
)

# Dropoff Borough
df = (
    df
    .join(df_loc_dict[["Borough", "LocationID"]], 
          how="left", 
          left_on="DOLocationID", right_on="LocationID")
    .rename({"Borough": "DOBorough"})
)

# Aggregations
df = (
     df
    .group_by(["txn_date", "txn_hour", "timestamp_hour", "ride_type", "PUBorough", "DOBorough", "PULocationID", "DOLocationID"])
    .agg([
        pl.len().alias("num_txns"),
        pl.col("total_amount").mean().alias("total_amount")
    ])
)

# Arranging the columns
df = df.select(['txn_date', 'txn_hour', 'timestamp_hour',
                "PULocationID", 'PUBorough', 
                "DOLocationID", 'DOBorough',
                'ride_type',
                'num_txns','total_amount',
                ])

# Filter only august data
df = df.filter(pl.col("txn_date").dt.strftime("%B") == "August")
display(df.head())

### Test if there are missing hours
(
    df
    [["txn_date", "txn_hour"]]
    .group_by(["txn_date"])
    .agg(
        pl.col("txn_hour").n_unique().alias("num_hrs")
    )
    .sort("txn_date")
).filter(pl.col("num_hrs") < 24)

txn_date,txn_hour,timestamp_hour,PULocationID,PUBorough,DOLocationID,DOBorough,ride_type,num_txns,total_amount
date,i8,datetime[ns],i32,str,i32,str,str,u32,f64
2024-08-13,10,2024-08-13 10:00:00,189,"""Brooklyn""",62,"""Brooklyn""","""Uber""",1,14.96
2024-08-18,21,2024-08-18 21:00:00,75,"""Manhattan""",53,"""Queens""","""Lyft""",1,48.88
2024-08-28,9,2024-08-28 09:00:00,61,"""Brooklyn""",249,"""Manhattan""","""Uber""",1,53.88
2024-08-07,18,2024-08-07 18:00:00,236,"""Manhattan""",140,"""Manhattan""","""Lyft""",1,20.09
2024-08-02,3,2024-08-02 03:00:00,219,"""Queens""",203,"""Queens""","""Uber""",2,9.45


txn_date,num_hrs
date,u32


# Combining all parquet files in a directory

In [29]:
import os

# Directory name
# Root directory
root_dir = '../../data/monthly_aggregates'

# Store the polars dataframes
dfs = []

for dirpath, dirnames, filenames in os.walk(absolute_path):
    # Check for parquet.gz files in the current directory
    for filename in filenames:
        if filename.endswith('.parquet.gz'):
            # Construct the full path of the parquet file
            file_path = os.path.join(dirpath, filename)
            # Read the parquet file and append to the list
            dfs.append(pl.read_parquet(file_path))

# Reading the output

In [43]:
df = pl.read_parquet(r"../../data/concatenated/2024/2024 Taxi and Ride Hailing Records.parquet.gz")
df.head()

txn_date,txn_hour,timestamp_hour,PULocationID,PUBorough,DOLocationID,DOBorough,ride_type,num_txns,total_amount
date,i32,datetime[μs],i32,str,i32,str,str,i32,f64
2024-03-21,20,2024-03-21 20:00:00,191.0,"""Queens""",198,"""Queens""","""For-Hire Vehicle Trip Records""",1,
2024-03-18,8,2024-03-18 08:00:00,95.0,"""Queens""",92,"""Queens""","""For-Hire Vehicle Trip Records""",1,
2024-03-18,10,2024-03-18 10:00:00,,,66,"""Brooklyn""","""For-Hire Vehicle Trip Records""",4,
2024-03-26,18,2024-03-26 18:00:00,,,209,"""Manhattan""","""For-Hire Vehicle Trip Records""",2,
2024-03-18,12,2024-03-18 12:00:00,156.0,"""Staten Island""",214,"""Staten Island""","""For-Hire Vehicle Trip Records""",3,


In [45]:
len(df["txn_date"].unique())

31