# Simple Example of Palars, Dask, DuckDB 

## Setup

Install the dependencies if you haven't already.

In [1]:
# !pip install -r requirements.txt

In [2]:
from datetime import date

import polars as pl
import dask.dataframe as dd
import duckdb

## Example 1: Read And Query Parquet files

References:

- Polars: https://pola-rs.github.io/polars/user-guide/io/parquet/
- Dask: https://docs.dask.org/en/latest/dataframe-parquet.html
- DuckDB: https://duckdb.org/docs/data/parquet/overview.html

### Polars

Read and query Parquet files

In [3]:
pl_df = pl.read_parquet("./data/parquet/*.parquet")
pl_df.filter((pl.col("date") == pl.lit(date(2023, 12, 22))) & (pl.col("close") >= 1.5))

date,ticker,close
date,str,f64
2023-12-22,"""intu""",1.595353
2023-12-22,"""phm""",2.2171


Lazy evaluation using Polars

In [4]:
pl_lazy_df = pl.scan_parquet("./data/parquet/*.parquet")
pl_lazy_df.filter((pl.col("date") == pl.lit(date(2023, 12, 22))) & (pl.col("close") >= 1.5)).collect()

date,ticker,close
date,str,f64
2023-12-22,"""intu""",1.595353
2023-12-22,"""phm""",2.2171


### Dask

Read and query Parquet files

In [5]:
dd_df = dd.read_parquet("./data/parquet/*.parquet")
dd_df[(dd_df["date"] == date(2023, 12, 22)) & (dd_df["close"] >= 1.5)].compute()

Unnamed: 0,date,ticker,close
245,2023-12-22,intu,1.595353
245,2023-12-22,phm,2.2171


### DuckDB

Read and query Parquet files

In [6]:
TABLE = "./data/parquet/*.parquet"
duckdb.query(f"""SELECT * FROM '{TABLE}' WHERE date = '2023-12-22' AND close >= 1.5""")

┌────────────┬─────────┬────────────────────┐
│    date    │ ticker  │       close        │
│    date    │ varchar │       double       │
├────────────┼─────────┼────────────────────┤
│ 2023-12-22 │ phm     │  2.217099567099567 │
│ 2023-12-22 │ intu    │ 1.5953525231351298 │
└────────────┴─────────┴────────────────────┘

## Example 2: Pivot and Aggregate

### Polars

Pivot and aggregate

In [7]:
pl_df = pl.read_parquet("./data/parquet/*.parquet")
pl_final_df = pl_df.pivot(index="ticker", columns="date", values="close").with_columns(
    pl.concat_list(pl.all().exclude("ticker")).alias("closes")
)
pl_final_df.write_parquet("./tmp/polars.parquet")

### Dask

Pivot and aggregate

In [8]:
dd_df = dd.read_parquet("./data/parquet/*.parquet")

# Make date column to categorical to use pivot method
dd_df.date = dd_df.date.dt.strftime("%Y-%m-%d").astype("category").cat.as_known()

dd_pivot_df = dd_df.pivot_table(index='ticker', columns='date', values='close').compute()

# Aggregate the date columns
dd_agg_df = dd_df.compute().groupby('ticker').agg({"close": list})

# Merge the two dataframes
dd_final_df = dd_pivot_df.merge(dd_agg_df, on="ticker")

dd_final_df.rename(columns={"close": "closes"}, inplace=True)

dd_final_df.to_parquet("./tmp/dask.parquet")

**NOTE: Object data type conversion issue**

Above example works fine, "closes" column data type is list of float64

In [9]:
type(dd_final_df.closes.iloc[0])

list

In [10]:
type(dd_final_df.closes.iloc[0][0])

float

However, if you move lazy evaluation to the end, the "closes" column data type is converted to string

In [11]:
dd_dtissue_df = dd.read_parquet("./data/parquet/*.parquet")

# Make date column to categorical to use pivot method
dd_dtissue_df.date = dd_dtissue_df.date.dt.strftime("%Y-%m-%d").astype("category").cat.as_known()

dd_dtissue_pivot_df = dd_dtissue_df.pivot_table(index='ticker', columns='date', values='close')

# Aggregate the date columns
dd_dtissue_agg_df = dd_dtissue_df.groupby('ticker').agg({"close": list})

# Merge the two dataframes
dd_dtissue_final_df = dd_dtissue_pivot_df.merge(dd_dtissue_agg_df, on="ticker").compute()

dd_dtissue_final_df.rename(columns={"close": "closes"}, inplace=True)

dd_dtissue_final_df.to_parquet("./tmp/dask.parquet")

In [12]:
dd_dtissue_final_df.closes.iloc[0][:120]

'[1.0, 1.0108637696614236, 1.0137963209810719, 0.9842042122100773, 0.98287123433751, 1.0345907757931219, 1.05418555051986'

Dask community is aware of this issue, see below links for more details.

- [[FEEDBACK] User experience with arrow strings](https://github.com/dask/dask/issues/10139)
- [Issues with `convert-string` label](https://github.com/dask/dask/issues?q=label%3Aconvert-string+)

### DuckDB

Pivot and aggregate

In [13]:
TABLE = "./data/parquet/*.parquet"

# Directly join the results of the two queries using subqueries
duckdb.sql(f"""
CREATE OR REPLACE TEMPORARY TABLE final_t AS (
    SELECT pivot_t.*, agg_t.closes
    FROM (
        PIVOT '{TABLE}' ON date USING first(close) GROUP BY ticker
    ) AS pivot_t
    INNER JOIN (
        SELECT ticker, list(close ORDER BY date ASC) AS closes 
        FROM '{TABLE}' 
        GROUP BY ticker
    ) AS agg_t ON pivot_t.ticker = agg_t.ticker
)
""")

In [14]:
duckdb.sql("SELECT * FROM final_t").to_df().head()

Unnamed: 0,ticker,2023-01-03,2023-01-04,2023-01-05,2023-01-06,2023-01-09,2023-01-10,2023-01-11,2023-01-12,2023-01-13,...,2023-12-12,2023-12-13,2023-12-14,2023-12-15,2023-12-18,2023-12-19,2023-12-20,2023-12-21,2023-12-22,closes
0,xyl,1.0,0.998465,0.970116,1.004605,1.009299,1.019682,1.037017,1.049025,1.055706,...,0.97454,0.985735,1.001535,0.999639,0.999549,1.007403,0.990159,1.008397,1.016974,"[1.0, 0.9984651498736006, 0.9701155651859877, ..."
1,intu,1.0,1.000997,0.960223,0.988087,1.010634,0.993609,1.010558,1.014111,1.005189,...,1.518943,1.559947,1.536837,1.555575,1.580935,1.58868,1.573981,1.587479,1.595353,"[1.0, 1.0009969834858632, 0.9602229152819673, ..."
2,etr,1.0,1.000733,0.949876,0.986713,0.993861,0.987171,0.996884,0.979016,0.97645,...,0.930908,0.965821,0.946303,0.924402,0.930358,0.930542,0.912948,0.913864,0.918354,"[1.0, 1.0007330706496838, 0.9498762943278659, ..."
3,sbux,1.0,1.036001,1.035704,1.058118,1.038778,1.051274,1.05425,1.049787,1.063473,...,0.973421,0.973024,0.970346,0.959536,0.957651,0.969156,0.939304,0.945453,0.944957,"[1.0, 1.0360011901219874, 1.0357036596251117, ..."
4,pru,1.0,1.017089,0.999598,1.01538,0.987736,0.998693,1.005227,1.001206,1.015279,...,1.027141,1.046844,1.0576,1.044029,1.037093,1.048854,1.030458,1.037696,1.040812,"[1.0, 1.0170888620828307, 0.9995979091274627, ..."


Export to Parquet

In [15]:
OUTPUT = "./tmp/duckdb.parquet"

duckdb.sql(f"""COPY final_t TO '{OUTPUT}' (FORMAT PARQUET)""")