In [None]:
from nyc_taxi_eta_expected_time_of_arrival.configs.settings import DATA_DIR

import glob
import re
import polars as pl
from tqdm.notebook import tqdm

TAXI_DATA_DIR = DATA_DIR / "nyc_tlc_yellow_taxi_data"

yellow_taxi_files = glob.glob(str(TAXI_DATA_DIR / "*.parquet"))
yellow_taxi_files.sort()

# Filter files to include only those from 2011 onwards
yellow_taxi_files = [
    f for f in yellow_taxi_files
    if (match := re.search(r"(\d{4})[-_]", f)) and int(match.group(1)) > 2010
]

data: list[pl.LazyFrame] = []
for file in tqdm(yellow_taxi_files):
    try:
        df = pl.scan_parquet(file)
        data.append(df)
    except Exception as e:
        print(f"Error reading {file}: {e}")

  0%|          | 0/168 [00:00<?, ?it/s]

In [2]:
def read_parquet(file: str) -> pl.LazyFrame | None:
    """Read a Parquet file and return a LazyFrame."""
    try:
        return pl.scan_parquet(file)
    except Exception as e:
        print(f"Error reading {file}: {e}")
        return None
def process_dataframe(lazy_df: pl.LazyFrame) -> tuple[float, int] | None:
    """Process a LazyFrame to get estimated size and number of rows."""
    try:
        df = lazy_df.collect()
        size = df.estimated_size('gb')
        num_rows = df.height
        return size, num_rows
    except Exception as e:
        print(f"Error processing dataframe: {e}")
        return None


rows = 0
estimated_size = 0
for lazy_df in tqdm(data, desc="Processing LazyFrames", unit="file"):
    result = process_dataframe(lazy_df)
    if result:
        size, num_rows = result
        estimated_size += size
        rows += num_rows

Processing LazyFrames:   0%|          | 0/168 [00:00<?, ?file/s]

In [3]:
rows.__format__('_'), estimated_size

('1_438_340_823', 175.92419571895152)

In [None]:
import polars as pl
from tqdm import tqdm

sample_dfs_list: list[pl.DataFrame] = []

# Define target schema
target_schema : dict[str, pl.datatypes] = {
    "VendorID": pl.Int32,
    "tpep_pickup_datetime": pl.Datetime("ns", time_zone="America/New_York"),
    "tpep_dropoff_datetime": pl.Datetime("ns", time_zone="America/New_York"),
    "passenger_count": pl.Int64,
    "trip_distance": pl.Float64,
    "RatecodeID": pl.Int64,
    "store_and_fwd_flag": pl.String,
    "PULocationID": pl.Int32,
    "DOLocationID": pl.Int32,
    "payment_type": pl.Int64,
    "fare_amount": pl.Float64,
    "extra": pl.Float64,
    "mta_tax": pl.Float64,
    "tip_amount": pl.Float64,
    "tolls_amount": pl.Float64,
    "improvement_surcharge": pl.Float64,
    "total_amount": pl.Float64,
    "congestion_surcharge": pl.Float64,
    "Airport_fee": pl.Float64
}

for file in tqdm(yellow_taxi_files, desc="Reading Parquet Files", unit="file"):
    try:
        df = pl.read_parquet(file, n_rows=10)

        # Standardize column name if needed
        if 'airport_fee' in df.columns and 'Airport_fee' not in df.columns:
            df = df.rename({"airport_fee": "Airport_fee"})

        # Cast to target schema
        df = df.select([
            pl.col(col).cast(dtype).alias(col)
            for col, dtype in target_schema.items()
            if col in df.columns
        ])

        sample_dfs_list.append(df)

    except Exception as e:
        print(f"Error reading {file}: {e}")

# Concatenate with consistent schema
sample_dfs = pl.concat(sample_dfs_list)
sample_dfs

Reading Parquet Files: 100%|██████████| 168/168 [00:05<00:00, 28.00file/s]


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
i32,"datetime[ns, America/New_York]","datetime[ns, America/New_York]",i64,f64,i64,str,i32,i32,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64
2,2010-12-31 19:10:00 EST,2010-12-31 19:12:00 EST,4,0.0,1,,145,145,1,2.9,0.5,0.5,0.28,0.0,0.0,4.18,,
2,2010-12-31 19:04:00 EST,2010-12-31 19:13:00 EST,4,0.0,1,,264,264,1,5.7,0.5,0.5,0.24,0.0,0.0,6.94,,
2,2010-12-31 19:14:00 EST,2010-12-31 19:16:00 EST,4,0.0,1,,264,264,1,2.9,0.5,0.5,1.11,0.0,0.0,5.01,,
2,2010-12-31 19:04:00 EST,2010-12-31 19:06:00 EST,5,0.0,1,,146,146,1,2.9,0.5,0.5,0.0,0.0,0.0,3.9,,
2,2010-12-31 19:08:00 EST,2010-12-31 19:08:00 EST,5,0.0,1,,146,146,1,2.5,0.5,0.5,0.11,0.0,0.0,3.61,,
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
1,2024-11-30 19:21:17 EST,2024-11-30 19:37:22 EST,1,4.3,1,"""N""",249,141,1,20.5,3.5,0.5,5.1,0.0,1.0,30.6,2.5,0.0
2,2024-11-30 19:04:53 EST,2024-11-30 19:31:03 EST,1,7.66,1,"""N""",186,52,1,35.2,1.0,0.5,8.04,0.0,1.0,48.24,2.5,0.0
1,2024-11-30 19:15:28 EST,2024-11-30 19:20:13 EST,1,0.3,1,"""N""",148,148,3,5.8,3.5,0.5,0.0,0.0,1.0,10.8,2.5,0.0
1,2024-11-30 19:38:54 EST,2024-11-30 20:03:46 EST,1,9.4,1,"""N""",234,244,1,39.4,3.5,0.5,0.0,0.0,1.0,44.4,2.5,0.0
