In [None]:
!pip uninstall apache-beam -y && pip install -U pandas polars pyarrow narwhals>=0.9.5 ibis-framework 

In [None]:
import pandas as pd
import polars as pl

pd.options.mode.copy_on_write = True
pd.options.future.infer_string = True

In [None]:
from typing import Any
from datetime import date

def q5_pandas_native(
    region_ds: Any,
    nation_ds: Any,
    customer_ds: Any,
    line_item_ds: Any,
    orders_ds: Any,
    supplier_ds: Any,
):
    var1 = "ASIA"
    var2 = date(1994, 1, 1)
    var3 = date(1995, 1, 1)

    jn1 = region_ds.merge(nation_ds, left_on="r_regionkey", right_on="n_regionkey")
    jn2 = jn1.merge(customer_ds, left_on="n_nationkey", right_on="c_nationkey")
    jn3 = jn2.merge(orders_ds, left_on="c_custkey", right_on="o_custkey")
    jn4 = jn3.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")
    jn5 = jn4.merge(
        supplier_ds,
        left_on=["l_suppkey", "n_nationkey"],
        right_on=["s_suppkey", "s_nationkey"],
    )

    jn5 = jn5[jn5["r_name"] == var1]
    jn5 = jn5[(jn5["o_orderdate"] >= var2) & (jn5["o_orderdate"] < var3)]
    jn5["revenue"] = jn5.l_extendedprice * (1.0 - jn5.l_discount)

    gb = jn5.groupby("n_name", as_index=False)["revenue"].sum()
    result_df = gb.sort_values("revenue", ascending=False)

    return result_df  # type: ignore[no-any-return]

In [None]:
from typing import Any
from datetime import datetime
import narwhals as nw

def q5(
    region_ds_raw: Any,
    nation_ds_raw: Any,
    customer_ds_raw: Any,
    lineitem_ds_raw: Any,
    orders_ds_raw: Any,
    supplier_ds_raw: Any,
) -> Any:
    var_1 = "ASIA"
    var_2 = datetime(1994, 1, 1)
    var_3 = datetime(1995, 1, 1)

    region_ds = nw.from_native(region_ds_raw)
    nation_ds = nw.from_native(nation_ds_raw)
    customer_ds = nw.from_native(customer_ds_raw)
    line_item_ds = nw.from_native(lineitem_ds_raw)
    orders_ds = nw.from_native(orders_ds_raw)
    supplier_ds = nw.from_native(supplier_ds_raw)

    result = (
        region_ds.join(nation_ds, left_on="r_regionkey", right_on="n_regionkey")
        .join(customer_ds, left_on="n_nationkey", right_on="c_nationkey")
        .join(orders_ds, left_on="c_custkey", right_on="o_custkey")
        .join(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")
        .join(
            supplier_ds,
            left_on=["l_suppkey", "n_nationkey"],
            right_on=["s_suppkey", "s_nationkey"],
        )
        .filter(
            nw.col("r_name") == var_1,
            nw.col("o_orderdate").is_between(var_2, var_3, closed="left")
        )
        .with_columns(
            (nw.col("l_extendedprice") * (1 - nw.col("l_discount"))).alias("revenue")
        )
        .group_by("n_name")
        .agg([nw.sum("revenue")])
        .sort(by="revenue", descending=True)
    )

    return nw.to_native(result)

In [None]:
from typing import Any
from datetime import datetime
import narwhals as nw
import ibis

def q5_ibis(
    region: Any,
    nation: Any,
    customer: Any,
    lineitem: Any,
    orders: Any,
    supplier: Any,
    *,
    tool: str,
) -> Any:
    var1 = "ASIA"
    var2 = datetime(1994, 1, 1)
    var3 = datetime(1995, 1, 1)

    q_final = (
        region.join(nation, region["r_regionkey"] == nation["n_regionkey"])
        .join(customer, ibis._["n_nationkey"] == customer["c_nationkey"])
        .join(orders, ibis._["c_custkey"] == orders["o_custkey"])
        .join(lineitem, ibis._["o_orderkey"] == lineitem["l_orderkey"])
        .join(
            supplier,
            (ibis._["l_suppkey"] == supplier["s_suppkey"])
            & (ibis._["n_nationkey"] == supplier["s_nationkey"]),
        )
        .filter(ibis._["r_name"] == var1)
        .filter((ibis._["o_orderdate"] >= var2) & (ibis._["o_orderdate"] < var3))
        .mutate(revenue=(lineitem["l_extendedprice"] * (1 - lineitem["l_discount"])))
        .group_by("n_name")
        .agg(revenue=ibis._["revenue"].sum())
        .order_by(ibis.desc("revenue"))
    )

    if tool == 'pandas':
        return q_final.to_pandas()
    if tool == 'polars':
        return q_final.to_polars()
    raise ValueError("expected pandas or polars")

In [None]:
dir_ = "/kaggle/input/tpc-h-data-parquet-s-2/"
region = dir_ + 'region.parquet'
nation = dir_ + 'nation.parquet'
customer = dir_ + 'customer.parquet'
lineitem = dir_ + 'lineitem.parquet'
orders = dir_ + 'orders.parquet'
supplier = dir_ + 'supplier.parquet'
part = dir_ + 'part.parquet'
partsupp = dir_ + 'partsupp.parquet'

In [None]:
import ibis

con_pd = ibis.pandas.connect()
con_pl = ibis.polars.connect()

IO_FUNCS = {
    'pandas': lambda x: pd.read_parquet(x, engine='pyarrow'),
    'pandas[pyarrow]': lambda x: pd.read_parquet(x, engine='pyarrow', dtype_backend='pyarrow'),
    'pandas[pyarrow][ibis]': lambda x: con_pd.read_parquet(x, engine='pyarrow', dtype_backend='pyarrow'),
    'polars[eager]': lambda x: pl.read_parquet(x),
    'polars[lazy]': lambda x: pl.scan_parquet(x),
    'polars[lazy][ibis]': lambda x: con_pl.read_parquet(x),
}

In [None]:
results = {}

## Polars, lazy, via ibis

In [None]:
tool = 'polars[lazy][ibis]'
fn = IO_FUNCS[tool]
timings = %timeit -o q5_ibis(fn(region), fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier), tool='polars')
results[tool] = timings.all_runs

## pandas, pyarrow dtypes, native

In [None]:
tool = 'pandas[pyarrow]'
fn = IO_FUNCS[tool]
timings = %timeit -o q5_pandas_native(fn(region), fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))
results[tool+'[native]'] = timings.all_runs

## pandas via Narwhals

In [None]:
tool = 'pandas'
fn = IO_FUNCS[tool]
timings = %timeit -o q5(fn(region), fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))
results[tool] = timings.all_runs

## pandas, pyarrow dtypes, via Narwhals

In [None]:
tool = 'pandas[pyarrow]'
fn = IO_FUNCS[tool]
timings = %timeit -o q5(fn(region), fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))
results[tool] = timings.all_runs

## Polars read_parquet

In [None]:
tool = 'polars[eager]'
fn = IO_FUNCS[tool]
timings = %timeit -o q5(fn(region), fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))
results[tool] = timings.all_runs

## Polars scan_parquet

In [None]:
tool = 'polars[lazy]'
fn = IO_FUNCS[tool]
timings = %timeit -o q5(fn(region), fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier)).collect()
results[tool] = timings.all_runs

## Save

In [None]:
import json
with open('results.json', 'w') as fd:
    json.dump(results, fd)
