This is just a demo how duckdb work together with pyarrow or pandas

Sample datasets from https://github.com/streamlit/demo-uber-nyc-pickups and s3://voltrondata-labs-datasets/nyc-taxi/

For Performance related discussion, check: 
https://tech.gerardbentley.com/python/data/intermediate/2022/04/26/holy-duck.html

In [None]:
# !pip install duckdb
# !pip install pyarrow
# !pip install pandas

In [1]:
import duckdb
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from pyarrow import csv
import pandas as pd

In [83]:
duckdb.__version__, pa.__version__,pd.__version__

('0.8.1', '13.0.0', '2.1.0')

# pyarrow

## Read

Ref  - https://arrow.apache.org/docs/python/generated/pyarrow.Table.html

In [54]:
# read from csv
csv_table = csv.read_csv(
        "uber-raw-data-sep14.csv.gz",
        convert_options=csv.ConvertOptions(
            timestamp_parsers=["%m/%d/%Y %H:%M:%S"],
        ),
    ).rename_columns(["date_time", "lat", "lon", "base"])
csv_table.schema

date_time: timestamp[s]
lat: double
lon: double
base: string

In [55]:
# read from parque 
parque_table = pq.read_table('sample_dataset')
# # explicitly define columns
# parque_table = pq.read_table('sample_dataset',columns=['date_time', 'lat', 'lon'])
parque_table.schema

date_time: timestamp[ms]
lat: double
lon: double
base: string

In [None]:
# # read from json
# from pyarrow import json
# # can not parse "4444" to float 4444, use walk aroud below
# raw_schema = pa.schema([
#     pa.field('d', pa.string())
# ])
# processed_schema = pa.schema([
#     pa.field('d', pa.float32())
# ])
    
# raw_table = pa.json.read_json('sample.json', parse_options=json.ParseOptions(explicit_schema=raw_schema))
# table = raw_table.cast(processed_schema)

## Write

In [56]:
# read multiple files, dataset read
dataset = pq.ParquetDataset('sample_dataset')
table = dataset.read()
# OR a short cut
table = pq.read_table('sample_dataset')

# duckdb/pyarrow Examples

In [57]:
# read from remote storage like s3, nyc-taxi
dataset = ds.dataset("s3://voltrondata-labs-datasets/nyc-taxi/year=2022/month=2")
dataset.files

['voltrondata-labs-datasets/nyc-taxi/year=2022/month=2/part-0.parquet']

In [58]:
# convert to pyarrow.Table
table=dataset.to_table()

In [59]:
# check table schema
table.schema

vendor_name: string
pickup_datetime: timestamp[ms]
dropoff_datetime: timestamp[ms]
passenger_count: int64
trip_distance: double
pickup_longitude: double
pickup_latitude: double
rate_code: string
store_and_fwd: string
dropoff_longitude: double
dropoff_latitude: double
payment_type: string
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
total_amount: double
improvement_surcharge: double
congestion_surcharge: double
pickup_location_id: int64
dropoff_location_id: int64

In [60]:
# check table shape
table.shape

(2979367, 22)

In [31]:
# query with duckdb
rel = duckdb.from_arrow(table)
sql = """
select *,
date_trunc('day', pickup_datetime) as pickup_date, 
date_part('year', pickup_datetime) as year,
date_part('month', pickup_datetime) as month,
date_part('day', pickup_datetime) as day,
EXTRACT(DOW FROM pickup_datetime) AS weekday,
dropoff_datetime - pickup_datetime as duration
from nyc_taxi_202202 
"""
rel_enriched=rel.query("nyc_taxi_202202", sql)

In [35]:
rel_enriched.fetchdf().head()

Unnamed: 0,vendor_name,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd,dropoff_longitude,...,improvement_surcharge,congestion_surcharge,pickup_location_id,dropoff_location_id,pickup_date,year,month,day,weekday,duration
0,VTS,2022-02-02 06:48:20,2022-02-02 07:12:35,1.0,18.76,,,JFK,No,,...,0.3,2.5,262,132,2022-02-02,2022,2,2,3,0 days 00:24:15
1,VTS,2022-02-02 06:42:59,2022-02-02 06:55:06,1.0,3.11,,,Standard rate,No,,...,0.3,2.5,263,137,2022-02-02,2022,2,2,3,0 days 00:12:07
2,VTS,2022-02-02 06:58:49,2022-02-02 07:04:46,1.0,1.41,,,Standard rate,No,,...,0.3,2.5,233,161,2022-02-02,2022,2,2,3,0 days 00:05:57
3,CMT,2022-02-02 06:22:57,2022-02-02 06:46:17,1.0,6.9,,,Standard rate,No,,...,0.3,2.5,238,144,2022-02-02,2022,2,2,3,0 days 00:23:20
4,CMT,2022-02-02 06:48:08,2022-02-02 06:56:53,1.0,2.1,,,Standard rate,No,,...,0.3,2.5,144,186,2022-02-02,2022,2,2,3,0 days 00:08:45


In [34]:
sql2 = """
select day, sum(passenger_count) as passenger_count
from nyc_taxi_202202_enriched
group by 1
"""
rel_enriched.query("nyc_taxi_202202_enriched", sql2).fetchdf()

Unnamed: 0,day,passenger_count
0,1,119797.0
1,2,119151.0
2,3,134026.0
3,4,145515.0
4,5,161795.0
5,6,122389.0
6,7,115354.0
7,8,133031.0
8,9,137409.0
9,10,146735.0


## Write to a dataset

In [None]:
# write pyarrow.Table into parque files
ds.write_dataset(parque_table, "sample_dataset", format="parquet")
# OR

In [None]:
output_dir = 'sample_dataset'
pa.parquet.write_to_dataset(
    table=parque_table,
    root_path=output_dir,
    filesystem=None,  # Use None for the default local filesystem
    partition_cols=None,  # Optional: Specify partition columns if needed
)

# Other duckdb basic

In [48]:
# from pyarrow table
pa_table = csv.read_csv(
        "uber-raw-data-sep14.csv.gz",
        convert_options=csv.ConvertOptions(
            timestamp_parsers=["%m/%d/%Y %H:%M:%S"],
        ),
    ).rename_columns(["date_time", "lat", "lon", "base"])

# relation from pyarrow table
rel = duckdb.from_arrow(pa_table)
# fetchone, return tuple
# the first parameter gives the rel object a view name so we can refer to it in queries
rel.query("uber_raw", "SELECT AVG(lat) as avg_lat, AVG(lon) as avg_lon FROM uber_raw").fetchone()

(40.73922135729234, -73.97181687636774)

In [40]:
# fetchall return a list of tuples
rel.query("uber_raw", "SELECT * FROM uber_raw limit 10").fetchall()

[(datetime.datetime(2014, 9, 1, 0, 1), 40.2201, -74.0021, 'B02512'),
 (datetime.datetime(2014, 9, 1, 0, 1), 40.75, -74.0027, 'B02512'),
 (datetime.datetime(2014, 9, 1, 0, 3), 40.7559, -73.9864, 'B02512'),
 (datetime.datetime(2014, 9, 1, 0, 6), 40.745, -73.9889, 'B02512'),
 (datetime.datetime(2014, 9, 1, 0, 11), 40.8145, -73.9444, 'B02512'),
 (datetime.datetime(2014, 9, 1, 0, 12), 40.6735, -73.9918, 'B02512'),
 (datetime.datetime(2014, 9, 1, 0, 15), 40.7471, -73.6472, 'B02512'),
 (datetime.datetime(2014, 9, 1, 0, 16), 40.6613, -74.2691, 'B02512'),
 (datetime.datetime(2014, 9, 1, 0, 32), 40.3745, -73.9999, 'B02512'),
 (datetime.datetime(2014, 9, 1, 0, 33), 40.7633, -73.9773, 'B02512')]

In [41]:
# query result to dataframe
df=rel.query("uber_raw", "SELECT *, hour(date_time) as hr_id FROM uber_raw limit 10").to_df()
# fetchdf() is a shortcut of to_df, they return same results
df=rel.query("uber_raw", "SELECT *, hour(date_time) as hr_id FROM uber_raw limit 10").fetchdf()
df

Unnamed: 0,date_time,lat,lon,base,hr_id
0,2014-09-01 00:01:00,40.2201,-74.0021,B02512,0
1,2014-09-01 00:01:00,40.75,-74.0027,B02512,0
2,2014-09-01 00:03:00,40.7559,-73.9864,B02512,0
3,2014-09-01 00:06:00,40.745,-73.9889,B02512,0
4,2014-09-01 00:11:00,40.8145,-73.9444,B02512,0
5,2014-09-01 00:12:00,40.6735,-73.9918,B02512,0
6,2014-09-01 00:15:00,40.7471,-73.6472,B02512,0
7,2014-09-01 00:16:00,40.6613,-74.2691,B02512,0
8,2014-09-01 00:32:00,40.3745,-73.9999,B02512,0
9,2014-09-01 00:33:00,40.7633,-73.9773,B02512,0


In [66]:
# query with parameter:hr
hr= 10
query = f'SELECT "date_time", "lat", "lon" FROM uber_raw where hour("date_time") >= {hr} and hour("date_time") < {hr} + 1 limit 10'
rel_hourly=rel.query("uber_raw", query)

In [67]:
# to arraw table
result_as_arrow = rel_hourly.arrow()
result_as_arrow.schema

date_time: timestamp[s]
lat: double
lon: double

### Check https://github.com/duckdb/duckdb/blob/main/examples/python/duckdb-python.py for more

# duckdb/pandas

In [47]:
# query dataframe directly
df = pd.read_csv("uber-raw-data-sep14.csv.gz")
df["Date/Time"] = pd.to_datetime(df["Date/Time"])
duckdb.query("""SELECT *, hour("Date/Time") as hour_id FROM df""").fetchdf().head()

Unnamed: 0,Date/Time,Lat,Lon,Base,hour_id
0,2014-09-01 00:01:00,40.2201,-74.0021,B02512,0
1,2014-09-01 00:01:00,40.75,-74.0027,B02512,0
2,2014-09-01 00:03:00,40.7559,-73.9864,B02512,0
3,2014-09-01 00:06:00,40.745,-73.9889,B02512,0
4,2014-09-01 00:11:00,40.8145,-73.9444,B02512,0
