Skip to content

Commit

Permalink
add fireducks_queries and q8-q22 for pandas and modin
Browse files Browse the repository at this point in the history
  • Loading branch information
k-ishizaka committed Feb 6, 2024
1 parent c306449 commit c5b1487
Show file tree
Hide file tree
Showing 56 changed files with 2,728 additions and 3 deletions.
Empty file added fireducks_queries/__init__.py
Empty file.
26 changes: 26 additions & 0 deletions fireducks_queries/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
import shutil
import sys
from subprocess import run
from linetimer import CodeTimer


def execute_all(solution: str):
package_name = f"{solution}_queries"
num_queries = 22

with CodeTimer(name=f"Overall execution of ALL {solution} queries", unit="s"):
for i in range(1, num_queries + 1):
run(
[sys.executable, "-m", "fireducks.imhook", f"{package_name}/q{i}.py"],
check=True,
)

if "--trace" in os.environ.get("FIREDUCKS_FLAGS", "") and os.path.isfile(
"trace.json"
):
shutil.copyfile("trace.json", f"trace_q{i:02}.json")


if __name__ == "__main__":
execute_all("fireducks")
40 changes: 40 additions & 0 deletions fireducks_queries/q1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from datetime import datetime
from fireducks_queries import utils

Q_NUM = 1


def q():
# first call one time to cache in case we don't include the IO times
utils.get_line_item_ds()

def query():
lineitem = utils.get_line_item_ds()

result = (
lineitem[lineitem["l_shipdate"] <= datetime(1998, 9, 2)]
.assign(
disc_price=lambda df: df["l_extendedprice"] * (1 - df["l_discount"])
)
.assign(charge=lambda df: df["disc_price"] * (1 + df["l_tax"]))
.groupby(["l_returnflag", "l_linestatus"], as_index=False)
.agg(
sum_qty=("l_quantity", "sum"),
sum_base_price=("l_extendedprice", "sum"),
sum_disc_price=("disc_price", "sum"),
sum_charge=("charge", "sum"),
avg_qty=("l_quantity", "mean"),
avg_price=("l_extendedprice", "mean"),
avg_disc=("l_discount", "mean"),
count_order=("l_returnflag", "count"),
)
.sort_values(["l_returnflag", "l_linestatus"])
)

return result

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
59 changes: 59 additions & 0 deletions fireducks_queries/q10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from fireducks_queries import utils
from datetime import datetime

Q_NUM = 10


def q():
utils.get_customer_ds()
utils.get_orders_ds()
utils.get_line_item_ds()
utils.get_nation_ds()

def query():
customer = utils.get_customer_ds()
orders = utils.get_orders_ds()
lineitem = utils.get_line_item_ds()
nation = utils.get_nation_ds()

from_date = datetime(1993, 10, 1)
to_date = datetime(1994, 1, 1)
q_flag = "R"
limit = 20

lineitem = lineitem[lineitem["l_returnflag"] == q_flag]
orders = orders[
(orders["o_orderdate"] < to_date) & (orders["o_orderdate"] >= from_date)
]

result = (
orders.merge(customer, left_on="o_custkey", right_on="c_custkey")
.merge(nation, left_on="c_nationkey", right_on="n_nationkey")
.merge(lineitem, left_on="o_orderkey", right_on="l_orderkey")
.assign(volume=lambda df: df["l_extendedprice"] * (1 - df["l_discount"]))
.groupby(
[
"c_custkey",
"c_name",
"c_acctbal",
"c_phone",
"n_name",
"c_address",
"c_comment",
],
as_index=False,
sort=False,
)
.agg(revenue=("volume", "sum"))
.sort_values(by="revenue", ascending=False)
.reset_index(drop=True)
.head(limit)
)

return result

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
40 changes: 40 additions & 0 deletions fireducks_queries/q11.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from fireducks_queries import utils

Q_NUM = 11


def q():
utils.get_part_supp_ds()
utils.get_supplier_ds()
utils.get_nation_ds()

def query():
partsupp = utils.get_part_supp_ds()
supplier = utils.get_supplier_ds()
nation = utils.get_nation_ds()

q_nation = "GERMANY"
# https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v3.0.1.pdf
# FRACTION is chosen as 0.0001 / SF
# S_SUPPKEY identifier SF*10,000 are populated
sumres_rate = 0.0001 / (supplier.shape[0] / 10000)

result = (
nation[nation["n_name"] == q_nation]
.merge(supplier, left_on="n_nationkey", right_on="s_nationkey")
.merge(partsupp, left_on="s_suppkey", right_on="ps_suppkey")
.assign(value=lambda df: df["ps_supplycost"] * df["ps_availqty"])
.groupby("ps_partkey", as_index=False, sort=False)
.agg({"value": "sum"})
.pipe(lambda df: df[df["value"] > df["value"].sum() * sumres_rate])
.sort_values(by=["value", "ps_partkey"], ascending=[False, True])
.reset_index(drop=True)
)

return result

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
46 changes: 46 additions & 0 deletions fireducks_queries/q12.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import datetime

from fireducks_queries import utils

Q_NUM = 12


def q():
utils.get_line_item_ds()
utils.get_orders_ds()

def query():
lineitem = utils.get_line_item_ds()
orders = utils.get_orders_ds()

from_date = datetime(1994, 1, 1)
to_date = datetime(1995, 1, 1)
q_shipmodes = ["MAIL", "SHIP"]
high_priorities = ["1-URGENT", "2-HIGH"]

lineitem = lineitem[
(lineitem["l_shipmode"].isin(q_shipmodes))
& (lineitem["l_shipdate"] < lineitem["l_commitdate"])
& (lineitem["l_commitdate"] < lineitem["l_receiptdate"])
& (lineitem["l_receiptdate"] >= from_date)
& (lineitem["l_receiptdate"] < to_date)
]

merged = lineitem.merge(orders, left_on="l_orderkey", right_on="o_orderkey")

is_high_priority = merged["o_orderpriority"].isin(high_priorities)
merged["high_line_count"] = is_high_priority
merged["low_line_count"] = ~is_high_priority

result = (
merged.groupby("l_shipmode", as_index=False, sort=True)
.agg({"high_line_count": "sum", "low_line_count": "sum"})
.astype({"high_line_count": int, "low_line_count": int})
)
return result

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
37 changes: 37 additions & 0 deletions fireducks_queries/q13.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from fireducks_queries import utils

Q_NUM = 13


def q():
utils.get_customer_ds()
utils.get_orders_ds()

def query():
customer = utils.get_customer_ds()
orders = utils.get_orders_ds()

q_words = ["special", "requests"]
pattern = r".*".join(q_words)

orders = orders[~orders["o_comment"].str.contains(pattern, regex=True)]

result = (
customer.merge(
orders, how="left", left_on="c_custkey", right_on="o_custkey"
)
.groupby("c_custkey", as_index=False, sort=False)
.agg(c_count=("o_orderkey", "count"))
.groupby("c_count", as_index=False, sort=False)
.agg(custdist=("c_custkey", "count"))
.sort_values(by=["custdist", "c_count"], ascending=[False, False])
.reset_index(drop=True)
)

return result

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
43 changes: 43 additions & 0 deletions fireducks_queries/q14.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from datetime import datetime

import pandas as pd
from fireducks_queries import utils

Q_NUM = 14


def q():
utils.get_line_item_ds()
utils.get_part_ds()

def query():
lineitem = utils.get_line_item_ds()
part = utils.get_part_ds()

from_date = datetime(1995, 9, 1)
to_date = datetime(1995, 10, 1)
startstr = "PROMO"

result = (
lineitem[
(lineitem["l_shipdate"] < to_date)
& (lineitem["l_shipdate"] >= from_date)
]
.merge(part, left_on="l_partkey", right_on="p_partkey")
.assign(revenue=lambda df: df["l_extendedprice"] * (1 - df["l_discount"]))
.assign(
promo_revenue=lambda df: df["revenue"].where(
df["p_type"].str.startswith(startstr)
)
)[["revenue", "promo_revenue"]]
.sum()
.pipe(lambda s: (100.0 * s["promo_revenue"] / s["revenue"]))
)

return pd.DataFrame({"promo_revenue": [result]}).round(2)

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
46 changes: 46 additions & 0 deletions fireducks_queries/q15.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import datetime

from fireducks_queries import utils

Q_NUM = 15


def q():
utils.get_supplier_ds()
utils.get_line_item_ds()

def query():
supplier = utils.get_supplier_ds()
lineitem = utils.get_line_item_ds()

revenue = (
lineitem[
(lineitem["l_shipdate"] >= datetime(1996, 1, 1))
& (lineitem["l_shipdate"] < datetime(1996, 4, 1))
]
.assign(
total_revenue=lambda df: df["l_extendedprice"] * (1 - df["l_discount"])
)
.groupby("l_suppkey", as_index=False)
.agg({"total_revenue": "sum"})
)

revenue = revenue[
revenue["total_revenue"] == revenue["total_revenue"].max()
].round(2)

result = supplier.merge(
revenue,
left_on="s_suppkey",
right_on="l_suppkey",
)[["s_suppkey", "s_name", "s_address", "s_phone", "total_revenue"]].sort_values(
"s_suppkey", ignore_index=True
)

return result

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
43 changes: 43 additions & 0 deletions fireducks_queries/q16.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from fireducks_queries import utils

Q_NUM = 16


def q():
utils.get_supplier_ds()
utils.get_part_supp_ds()
utils.get_part_ds()

def query():
supplier = utils.get_supplier_ds()
partsupp = utils.get_part_supp_ds()
part = utils.get_part_ds()

part = part[
(part["p_brand"] != "Brand#45")
& (~(part["p_type"].str.startswith("MEDIUM POLISHED")))
& part["p_size"].isin([49, 14, 23, 45, 19, 3, 36, 9])
]

supplier = supplier[
~(supplier["s_comment"].str.contains(".*Customer.*Complaints.*"))
]

result = (
partsupp.merge(supplier, left_on="ps_suppkey", right_on="s_suppkey")
.merge(part, left_on="ps_partkey", right_on="p_partkey")
.groupby(["p_brand", "p_type", "p_size"], as_index=False)
.agg(supplier_cnt=("ps_suppkey", "nunique"))
.sort_values(
["supplier_cnt", "p_brand", "p_type", "p_size"],
ascending=[False, True, True, True],
)
)

return result

utils.run_query(Q_NUM, query)


if __name__ == "__main__":
q()
Loading

0 comments on commit c5b1487

Please sign in to comment.