# NYC Taxi full dataset ~40 GB - BigData with DuckDB and Polars

In [None]:
#https://github.com/gerardrbentley/uber-nyc-pickups-duckdb/blob/main/01_duck_streamlit.py

In [None]:
!pip install boto3 -q

In [3]:
import time
import boto3
import os
from botocore import UNSIGNED
from botocore.client import Config
from botocore.exceptions import ResponseStreamingError

s3 = boto3.resource("s3", config=Config(signature_version=UNSIGNED))


def download_s3_folder(bucket_name, s3_folder, local_dir=None):
    bucket = s3.Bucket(bucket_name)
    for obj in bucket.objects.filter(Prefix=s3_folder):
        target = (
            obj.key
            if local_dir is None
            else os.path.join(local_dir, os.path.relpath(obj.key, s3_folder))
        )
        if os.path.exists(target):
            print(f"skipping {target}")
            continue
        if not os.path.exists(os.path.dirname(target)):
            os.makedirs(os.path.dirname(target))
        if obj.key[-1] == "/":
            continue
        print(f"downloading {target}")
        try:
            bucket.download_file(obj.key, target)
        except ResponseStreamingError:
            time.sleep(60)
            bucket.download_file(obj.key, target)
        print(f"downloaded {target}")

if __name__ == '__main__':
    print("BEGIN DOWNLOAD!")
    download_s3_folder("ursa-labs-taxi-data", "", "nyc-taxi")
    print("DONE DOWNLOAD!!!")

BEGIN DOWNLOAD!
downloading nyc-taxi/2009/01/data.parquet
downloaded nyc-taxi/2009/01/data.parquet
downloading nyc-taxi/2009/02/data.parquet
downloaded nyc-taxi/2009/02/data.parquet
downloading nyc-taxi/2009/03/data.parquet
downloaded nyc-taxi/2009/03/data.parquet
downloading nyc-taxi/2009/04/data.parquet
downloaded nyc-taxi/2009/04/data.parquet
downloading nyc-taxi/2009/05/data.parquet
downloaded nyc-taxi/2009/05/data.parquet
downloading nyc-taxi/2009/06/data.parquet
downloaded nyc-taxi/2009/06/data.parquet
downloading nyc-taxi/2009/07/data.parquet
downloaded nyc-taxi/2009/07/data.parquet
downloading nyc-taxi/2009/08/data.parquet
downloaded nyc-taxi/2009/08/data.parquet
downloading nyc-taxi/2009/09/data.parquet
downloaded nyc-taxi/2009/09/data.parquet
downloading nyc-taxi/2009/10/data.parquet
downloaded nyc-taxi/2009/10/data.parquet
downloading nyc-taxi/2009/11/data.parquet
downloaded nyc-taxi/2009/11/data.parquet
downloading nyc-taxi/2009/12/data.parquet
downloaded nyc-taxi/2009/12/d

In [5]:
import duckdb
import pyarrow.dataset as ds

nyc = ds.dataset("nyc-taxi/", partitioning=["year", "month"])

con = duckdb.connect()
query = con.execute("SELECT total_amount FROM nyc")

# Create Record Batch Reader from Query Result.
record_batch_reader = query.fetch_record_batch()

total_rows = 0
while True:
    print('chunk total', total_rows)
    try:
        # Process a single chunk here
        # pyarrow.lib.RecordBatch
        chunk = record_batch_reader.read_next_batch()
        total_rows += len(chunk)
    except StopIteration:
        break

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

chunk total 0
chunk total 1000000
chunk total 2000000
chunk total 3000000
chunk total 4000000
chunk total 5000000
chunk total 6000000
chunk total 7000000
chunk total 8000000
chunk total 9000000
chunk total 10000000
chunk total 11000000
chunk total 12000000
chunk total 13000000
chunk total 14000000
chunk total 15000000
chunk total 16000000
chunk total 17000000
chunk total 18000000
chunk total 19000000
chunk total 20000000
chunk total 21000000
chunk total 22000000
chunk total 23000000
chunk total 24000000
chunk total 25000000
chunk total 26000000
chunk total 27000000
chunk total 28000000
chunk total 29000000
chunk total 30000000
chunk total 31000000
chunk total 32000000
chunk total 33000000
chunk total 34000000
chunk total 35000000
chunk total 36000000
chunk total 37000000
chunk total 38000000
chunk total 39000000
chunk total 40000000
chunk total 41000000
chunk total 42000000
chunk total 43000000
chunk total 44000000
chunk total 45000000
chunk total 46000000
chunk total 47000000
chunk to

## NYC Taxi with DuckDB and polars

In [9]:
import duckdb
import pyarrow as pa
import pyarrow.dataset as ds
import polars as pl
from time import perf_counter as timer

def analyze_nyc_taxi(min_year=2014, min_amount=100):
    """
    Analyze NYC taxi data with specified filters using Polars LazyFrame
    """
    print("Starting analysis...")
    start_time = timer()

    # Open dataset using year,month folder partition
    nyc = ds.dataset("nyc-taxi/", partitioning=["year", "month"])

    # Get database connection
    con = duckdb.connect()
    query= f"""
        SELECT
            total_amount,
            passenger_count,
            year,
            pickup_at,
            pickup_longitude as lon,
            pickup_latitude as lat
        FROM nyc
        WHERE total_amount > {min_amount}
            AND year >= {min_year}
            AND lat is not null
            AND lon is not null
    """
    req = con.execute(query)

    # Create Record Batch Reader and convert to Polars
    record_batch_reader = req.fetch_record_batch()

    # Process chunks and convert to Polars LazyFrame
    all_chunks = []
    total_rows = 0

    while True:
        try:
            chunk = record_batch_reader.read_next_batch()
            # Convert PyArrow RecordBatch to Polars DataFrame
            pl_chunk = pl.from_arrow(pa.Table.from_batches([chunk]))
            all_chunks.append(pl_chunk)
            total_rows += len(pl_chunk)
            print(f"Processed chunk, total rows: {total_rows}")
        except StopIteration:
            break

    end_time = timer()
    runtime = end_time - start_time

    # Combine all chunks into a single Polars DataFrame
    df = pl.concat(all_chunks)
    # Create a LazyFrame for further processing
    lf = df.lazy()

    print(f"\nAnalysis completed in {runtime:.2f} seconds")
    print(f"Total rows processed: {total_rows}")

    return lf, runtime

def get_summary_stats(lf):
    """Calculate statistics total_amount using LazyFrame operations"""
    stats = (
        lf.select([
            pl.col("total_amount").mean().alias("avg_fare"),
            pl.col("total_amount").min().alias("min_fare"),
            pl.col("total_amount").max().alias("max_fare"),
            pl.col("passenger_count").mean().alias("avg_passengers"),
            pl.col("year").value_counts().alias("trips_by_year")
        ])
    ).collect()

    return stats

In [10]:
%%time

lf, runtime = analyze_nyc_taxi(min_year=2009, min_amount=35) #3.5 min with min_year=2009, min_amount=35

Starting analysis...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processed chunk, total rows: 1000000
Processed chunk, total rows: 2000000
Processed chunk, total rows: 3000000
Processed chunk, total rows: 4000000
Processed chunk, total rows: 5000000
Processed chunk, total rows: 6000000
Processed chunk, total rows: 7000000
Processed chunk, total rows: 8000000
Processed chunk, total rows: 9000000
Processed chunk, total rows: 10000000
Processed chunk, total rows: 11000000
Processed chunk, total rows: 12000000
Processed chunk, total rows: 13000000
Processed chunk, total rows: 14000000
Processed chunk, total rows: 15000000
Processed chunk, total rows: 16000000
Processed chunk, total rows: 17000000
Processed chunk, total rows: 18000000
Processed chunk, total rows: 19000000
Processed chunk, total rows: 20000000
Processed chunk, total rows: 21000000
Processed chunk, total rows: 22000000
Processed chunk, total rows: 23000000
Processed chunk, total rows: 24000000
Processed chunk, total rows: 25000000
Processed chunk, total rows: 26000000
Processed chunk, tota

In [13]:
lf

In [14]:
# Convert LazyFrame to regular Polars DataFrame
df = lf.collect()
df

total_amount,passenger_count,year,pickup_at,lon,lat
f32,i8,i32,datetime[μs],f32,f32
35.450001,1,2009,2009-01-21 14:38:05,-73.997314,40.736099
38.220001,1,2009,2009-01-06 14:51:53,-73.862762,40.769268
45.0,2,2009,2009-01-03 16:27:10,-73.984085,40.756809
45.0,1,2009,2009-01-03 16:48:23,-74.006645,40.716652
51.75,1,2009,2009-01-07 15:22:59,-73.998528,40.723438
…,…,…,…,…,…
58.34,1,2016,2016-06-21 22:08:36,-73.784401,40.648598
36.84,1,2016,2016-06-21 21:50:55,-73.863342,40.769947
36.34,1,2016,2016-06-21 21:50:55,-73.871033,40.773754
73.300003,1,2016,2016-06-21 22:01:39,-73.783401,40.64875


In [15]:
stats = get_summary_stats(lf)
print("Summary Statistics:")
print(stats)

Summary Statistics:
shape: (8, 5)
┌───────────┬───────────┬───────────┬────────────────┬─────────────────┐
│ avg_fare  ┆ min_fare  ┆ max_fare  ┆ avg_passengers ┆ trips_by_year   │
│ ---       ┆ ---       ┆ ---       ┆ ---            ┆ ---             │
│ f32       ┆ f32       ┆ f32       ┆ f64            ┆ struct[2]       │
╞═══════════╪═══════════╪═══════════╪════════════════╪═════════════════╡
│ 51.979137 ┆ 35.009998 ┆ 3950611.5 ┆ 1.727499       ┆ {2009,4980036}  │
│ 51.979137 ┆ 35.009998 ┆ 3950611.5 ┆ 1.727499       ┆ {2012,8158817}  │
│ 51.979137 ┆ 35.009998 ┆ 3950611.5 ┆ 1.727499       ┆ {2013,10895158} │
│ 51.979137 ┆ 35.009998 ┆ 3950611.5 ┆ 1.727499       ┆ {2016,5458275}  │
│ 51.979137 ┆ 35.009998 ┆ 3950611.5 ┆ 1.727499       ┆ {2014,11332771} │
│ 51.979137 ┆ 35.009998 ┆ 3950611.5 ┆ 1.727499       ┆ {2011,6534332}  │
│ 51.979137 ┆ 35.009998 ┆ 3950611.5 ┆ 1.727499       ┆ {2010,4908528}  │
│ 51.979137 ┆ 35.009998 ┆ 3950611.5 ┆ 1.727499       ┆ {2015,11209226} │
└───────────┴────

In [16]:
# Get average fare by year
yearly_stats = (
    lf.group_by("year")
    .agg([
        pl.col("total_amount").mean().alias("avg_fare"),
        pl.len().alias("trip_count")
    ])
    .sort("year")
    .collect()
)
print("\nYearly Statistics:")
print(yearly_stats)


Yearly Statistics:
shape: (8, 3)
┌──────┬───────────┬────────────┐
│ year ┆ avg_fare  ┆ trip_count │
│ ---  ┆ ---       ┆ ---        │
│ i32  ┆ f64       ┆ u32        │
╞══════╪═══════════╪════════════╡
│ 2009 ┆ 48.593525 ┆ 4980036    │
│ 2010 ┆ 48.843003 ┆ 4908528    │
│ 2011 ┆ 49.288672 ┆ 6534332    │
│ 2012 ┆ 49.862339 ┆ 8158817    │
│ 2013 ┆ 51.888556 ┆ 10895158   │
│ 2014 ┆ 52.842987 ┆ 11332771   │
│ 2015 ┆ 54.677651 ┆ 11209226   │
│ 2016 ┆ 54.179889 ┆ 5458275    │
└──────┴───────────┴────────────┘


## with Polars secco (2X LENTO RISPETTO DuckDB)

In [2]:
import polars as pl
import pyarrow.dataset as ds
from time import perf_counter as timer
from pathlib import Path

def create_lazy_dataset(data_path: str) -> pl.LazyFrame:
    """
    Create a LazyFrame from the partitioned dataset
    """
    # Create dataset with partitioning
    nyc = ds.dataset(data_path, partitioning=["year", "month"])

    # Convert to Polars LazyFrame
    return pl.scan_pyarrow_dataset(nyc)

def build_analysis_query(lf: pl.LazyFrame, min_year: int = 2014, min_amount: float = 100) -> pl.LazyFrame:
    """
    Build the main analysis query keeping everything lazy
    """
    return (
        lf.select([
            "total_amount",
            "passenger_count",
            "year",
            "month",
            "pickup_at",
            "pickup_longitude",
            "pickup_latitude"
        ])
        .filter(
            (pl.col("total_amount") > min_amount) &
            (pl.col("year") >= min_year) &
            (pl.col("pickup_latitude").is_not_null()) &
            (pl.col("pickup_longitude").is_not_null())
        )
        .with_columns([
            pl.col("pickup_longitude").alias("lon"),
            pl.col("pickup_latitude").alias("lat")
        ])
    )

def get_basic_stats(lf: pl.LazyFrame) -> pl.LazyFrame:
    """
    Create a LazyFrame for basic statistics
    """
    return (
        lf.select([
            pl.len().alias("total_rows"),
            pl.col("total_amount").mean().alias("avg_fare"),
            pl.col("total_amount").min().alias("min_fare"),
            pl.col("total_amount").max().alias("max_fare"),
            pl.col("passenger_count").mean().alias("avg_passengers"),
        ])
    )

def get_yearly_stats(lf: pl.LazyFrame) -> pl.LazyFrame:
    """
    Create a LazyFrame for yearly statistics
    """
    return (
        lf.group_by("year")
        .agg([
            pl.col("total_amount").mean().alias("avg_fare"),
            pl.len().alias("trip_count"),
            pl.col("total_amount").sum().alias("total_revenue")
        ])
        .sort("year")
    )

def get_monthly_patterns(lf: pl.LazyFrame) -> pl.LazyFrame:
    """
    Create a LazyFrame for monthly patterns
    """
    return (
        lf.group_by(["year", "month"])
        .agg([
            pl.len().alias("trip_count"),
            pl.col("total_amount").mean().alias("avg_fare")
        ])
        .sort(["year", "month"])
    )

def analyze_taxi_data(data_path: str, min_year: int = 2014, min_amount: float = 100):
    """
    Main analysis function that keeps operations lazy until final collection
    """
    print("Starting analysis...")
    start_time = timer()

    # Create base LazyFrame
    base_lf = create_lazy_dataset(data_path)

    # Build main analysis query
    analysis_lf = build_analysis_query(base_lf, min_year, min_amount)

    # Prepare all analysis queries (still lazy)
    basic_stats_lf = get_basic_stats(analysis_lf)
    yearly_stats_lf = get_yearly_stats(analysis_lf)
    monthly_patterns_lf = get_monthly_patterns(analysis_lf)

    # Collect results only when needed
    results = {
                'basic_stats': basic_stats_lf.collect(),
                'yearly_stats': yearly_stats_lf.collect(),
                'monthly_patterns': monthly_patterns_lf.collect(),
                }
    end_time = timer()
    runtime = end_time - start_time

    print(f"\nAnalysis completed in {runtime:.2f} seconds")
    print(f"Total rows processed: {results['basic_stats']['total_rows'][0]}")

    return analysis_lf, results, runtime

In [15]:
# Run the analysis
%%time
data_path = "nyc-taxi/"
lf, results, runtime = analyze_taxi_data(data_path=data_path,min_year=2009,min_amount=35) # circa 9 min con min_year=2009,min_amount=35

Starting analysis...

Analysis completed in 530.88 seconds
Total rows processed: 63477143


In [16]:
# Print basic statistics
print("\nBasic Statistics:")
print(results['basic_stats'])


Basic Statistics:
shape: (1, 5)
┌────────────┬───────────┬───────────┬───────────┬────────────────┐
│ total_rows ┆ avg_fare  ┆ min_fare  ┆ max_fare  ┆ avg_passengers │
│ ---        ┆ ---       ┆ ---       ┆ ---       ┆ ---            │
│ u32        ┆ f32       ┆ f32       ┆ f32       ┆ f64            │
╞════════════╪═══════════╪═══════════╪═══════════╪════════════════╡
│ 63477143   ┆ 51.979137 ┆ 35.009998 ┆ 3950611.5 ┆ 1.727499       │
└────────────┴───────────┴───────────┴───────────┴────────────────┘


In [17]:
# Print yearly statistics
print("\nYearly Statistics:")
print(results['yearly_stats'])


Yearly Statistics:
shape: (8, 4)
┌──────┬───────────┬────────────┬───────────────┐
│ year ┆ avg_fare  ┆ trip_count ┆ total_revenue │
│ ---  ┆ ---       ┆ ---        ┆ ---           │
│ i32  ┆ f64       ┆ u32        ┆ f32           │
╞══════╪═══════════╪════════════╪═══════════════╡
│ 2009 ┆ 48.593525 ┆ 4980036    ┆ 2.41997504e8  │
│ 2010 ┆ 48.843003 ┆ 4908528    ┆ 2.39747248e8  │
│ 2011 ┆ 49.288672 ┆ 6534332    ┆ 3.22068544e8  │
│ 2012 ┆ 49.862339 ┆ 8158817    ┆ 4.06817696e8  │
│ 2013 ┆ 51.888556 ┆ 10895158   ┆ 5.65334016e8  │
│ 2014 ┆ 52.842987 ┆ 11332771   ┆ 5.98857472e8  │
│ 2015 ┆ 54.677651 ┆ 11209226   ┆ 6.12894144e8  │
│ 2016 ┆ 54.179889 ┆ 5458275    ┆ 2.95728736e8  │
└──────┴───────────┴────────────┴───────────────┘


In [18]:
# Print monthly patterns
print("\nMonthly Patterns:")
print(results['monthly_patterns'])


Monthly Patterns:
shape: (89, 4)
┌──────┬───────┬────────────┬───────────┐
│ year ┆ month ┆ trip_count ┆ avg_fare  │
│ ---  ┆ ---   ┆ ---        ┆ ---       │
│ i32  ┆ i32   ┆ u32        ┆ f64       │
╞══════╪═══════╪════════════╪═══════════╡
│ 2009 ┆ 1     ┆ 324094     ┆ 48.835026 │
│ 2009 ┆ 2     ┆ 318258     ┆ 48.758812 │
│ 2009 ┆ 3     ┆ 394992     ┆ 48.837511 │
│ 2009 ┆ 4     ┆ 385753     ┆ 48.78389  │
│ 2009 ┆ 5     ┆ 425945     ┆ 48.743326 │
│ …    ┆ …     ┆ …          ┆ …         │
│ 2016 ┆ 2     ┆ 810478     ┆ 54.23402  │
│ 2016 ┆ 3     ┆ 940763     ┆ 54.49375  │
│ 2016 ┆ 4     ┆ 928302     ┆ 53.778141 │
│ 2016 ┆ 5     ┆ 1019728    ┆ 54.236428 │
│ 2016 ┆ 6     ┆ 954911     ┆ 56.020123 │
└──────┴───────┴────────────┴───────────┘


In [None]:
# Get average fare by year
yearly_stats = (
    lf.group_by("year")
    .agg([
        pl.col("total_amount").mean().alias("avg_fare"),
        pl.len().alias("trip_count")
    ])
    .sort("year")
    .collect()
)
print("\nYearly Statistics:")
print(yearly_stats)