In [15]:
import pandas as pd
import json
import re
import os
from bs4 import BeautifulSoup
import numpy as np

## DataFusion runtimes

Total runtimes of binary join & shredded yannakakis, aggregated per query.

In [16]:
df = pd.read_csv('timings_revision.csv')
df["total_time"] = np.nan # total time = optimization time + execution time (in seconds)
df['execution_time'] = df['duration(µs)'] / 1_000_000
df.drop(columns=['variant','duration(µs)'], inplace=True)
df.replace("BinaryJoin","DF-Bin", inplace=True)
df.replace("Yannakakis","SYA", inplace=True)
df_agg = df.groupby(["query","method"]).aggregate("median")
# df_agg.columns = ['_'.join(col) for col in df_agg.columns]
df_agg.reset_index(inplace=True)
# df_agg.to_csv('timings_agg.csv', index=False) # , mode='a'
df_agg

Unnamed: 0,query,method,total_time,execution_time
0,dblp_acyclic_201_00,DF-Bin,,1.395854
1,dblp_acyclic_201_00,SYA,,0.783645
2,dblp_acyclic_201_01,DF-Bin,,0.236780
3,dblp_acyclic_201_01,SYA,,0.191551
4,dblp_acyclic_201_02,DF-Bin,,0.276294
...,...,...,...,...
3183,yago_acyclic_tree_6_77,SYA,,0.007968
3184,yago_acyclic_tree_6_78,DF-Bin,,0.208485
3185,yago_acyclic_tree_6_78,SYA,,0.248917
3186,yago_acyclic_tree_6_79,DF-Bin,,0.008972


## Datafusion detailed metrics

In [17]:
def replace_utf8_string(text):
    # Use regex to match and replace value: Utf8("SomeString") by value: Utf8(\"SomeString\")
    pattern = r'value:\s*Utf8\("(.*?)"\)'  # This matches 'value: Utf8("SomeString")'
    replacement = r'value: Utf8(\"\1\")'  # Adds escaped quotes: Utf8(\"SomeString\")
    
    result = re.sub(pattern, replacement, text)
    
    return result

metrics_file = "output_revision/metrics.txt"
metrics = []
with open(metrics_file) as f:
    for i, line in enumerate(f):
        try:
            line = replace_utf8_string(line)
            metrics.append(json.loads(line))
        except json.JSONDecodeError:
            print(f"Error in line {i}")

# Drop 2NSA plan metrics (we're now analyzing binary joins)
metrics = [metric for metric in metrics if metric["params"]["method"] == "BinaryJoin"]
print(len(metrics)) # should be nr_of_queries * 10 (10 repetitions)

15940


In [18]:
def get_metric(metric_name, metrics):
    for metric in metrics:
        if metric["name"] == metric_name:
            return metric

def collect_metrics(metrics):
    def update_timings(node):
        if node["operator"].startswith("AggregateExec"):
            timings["aggregate_time"] += get_metric("elapsed_compute", node["metrics"])["value"]
        elif node["operator"].startswith("FilterExec"):
            timings["filter_time"] += get_metric("elapsed_compute", node["metrics"])["value"]
        elif node["operator"].startswith("ProjectionExec"):
            timings["projection_time"] += get_metric("elapsed_compute", node["metrics"])["value"]
        # memoryexec does not contain timing metrics
        elif node["operator"].startswith("ParquetExec"):
            timings["parquet_time"] += get_metric("time_elapsed_processing", node["metrics"])["value"]
        elif node["operator"].startswith("CoalesceBatchesExec"):
            timings["coalesce_batches_time"] += get_metric("elapsed_compute", node["metrics"])["value"]

        for child in node["children"]:
            update_timings(child)

    result = {}

    result["method"] = metrics["params"]["method"]
    result["query"] = metrics["params"]["query"]

    timings = {
        "aggregate_time": 0,
        "filter_time": 0,
        "projection_time": 0,
        "parquet_time": 0,
        "coalesce_batches_time": 0
    }
    root = metrics["plan"]
    update_timings(root)
    # all timings are in nanoseconds, convert to seconds
    timings = {k: v / 1_000_000_000 for k, v in timings.items()}
    result.update(timings)
    return result

df_bin = pd.DataFrame([collect_metrics(m) for m in metrics])
df_bin["method"] = "DF-Bin"
df_bin = df_bin.groupby(["query","method"]).aggregate("median")
df_bin.reset_index(inplace=True)
df_bin = pd.merge(
    df_agg[df_agg["method"] == "DF-Bin"],
    df_bin,
    on=["query","method"]
)
# hashjoin time = total time - aggregate time - filter time - projection time - parquet time - coalesce_batches_time
df_bin["hashjoin_time"] = df_bin["execution_time"] - df_bin["aggregate_time"] - df_bin["filter_time"] - df_bin["projection_time"] - df_bin["parquet_time"] - df_bin["coalesce_batches_time"]
df_bin

Unnamed: 0,query,method,total_time,execution_time,aggregate_time,filter_time,projection_time,parquet_time,coalesce_batches_time,hashjoin_time
0,dblp_acyclic_201_00,DF-Bin,,1.395854,0.001248,0.019759,0.000000,0.096954,0.000271,1.277622
1,dblp_acyclic_201_01,DF-Bin,,0.236780,0.000628,0.008491,0.000000,0.096510,0.000115,0.131036
2,dblp_acyclic_201_02,DF-Bin,,0.276294,0.000311,0.017391,0.000000,0.071704,0.000169,0.186719
3,dblp_acyclic_201_03,DF-Bin,,0.927781,0.000882,0.019077,0.000000,0.082849,0.000261,0.824711
4,dblp_acyclic_201_04,DF-Bin,,0.458164,0.000732,0.024164,0.000000,0.102556,0.000242,0.330471
...,...,...,...,...,...,...,...,...,...,...
1589,yago_acyclic_tree_6_75,DF-Bin,,0.265963,0.000182,0.012960,0.000186,0.119761,0.002254,0.130620
1590,yago_acyclic_tree_6_76,DF-Bin,,0.056429,0.000010,0.003232,0.000002,0.020310,0.000620,0.032254
1591,yago_acyclic_tree_6_77,DF-Bin,,0.026384,0.000009,0.000295,0.000002,0.002030,0.000040,0.024009
1592,yago_acyclic_tree_6_78,DF-Bin,,0.208485,0.000088,0.026775,0.000001,0.087275,0.003761,0.090585


In [19]:
# Time (sec) spent on ParquetExec
print(df_bin["parquet_time"].describe())

# Percentage of time spent on ParquetExec
print((df_bin["parquet_time"] / df_bin["execution_time"] * 100).describe())

count    1594.000000
mean        0.036543
std         0.040698
min         0.000266
25%         0.003627
50%         0.022487
75%         0.051494
max         0.240365
Name: parquet_time, dtype: float64
count    1594.000000
mean       21.050752
std        14.991005
min         0.046357
25%         7.140708
50%        19.337665
75%        33.164695
max        63.217879
dtype: float64


In [20]:
df_bin.drop(columns=["aggregate_time","filter_time","projection_time","parquet_time","coalesce_batches_time"], inplace=True)
df_bin.to_csv('timings_agg_revision.csv', index=False)
df_bin

Unnamed: 0,query,method,total_time,execution_time,hashjoin_time
0,dblp_acyclic_201_00,DF-Bin,,1.395854,1.277622
1,dblp_acyclic_201_01,DF-Bin,,0.236780,0.131036
2,dblp_acyclic_201_02,DF-Bin,,0.276294,0.186719
3,dblp_acyclic_201_03,DF-Bin,,0.927781,0.824711
4,dblp_acyclic_201_04,DF-Bin,,0.458164,0.330471
...,...,...,...,...,...
1589,yago_acyclic_tree_6_75,DF-Bin,,0.265963,0.130620
1590,yago_acyclic_tree_6_76,DF-Bin,,0.056429,0.032254
1591,yago_acyclic_tree_6_77,DF-Bin,,0.026384,0.024009
1592,yago_acyclic_tree_6_78,DF-Bin,,0.208485,0.090585


In [21]:
metrics_file = "output_revision/metrics.txt"
metrics = []
with open(metrics_file) as f:
    for i, line in enumerate(f):
        try:
            line = replace_utf8_string(line)
            metrics.append(json.loads(line))
        except json.JSONDecodeError:
            print(f"Error in line {i}")

projection = []

def filter_time(filternode):
    for metric in filternode["metrics"]:
        if metric["name"] == "elapsed_compute":
            return metric["value"]

    # filternode was never executed
    # can be due to early stopping in case of a multisemijoin with >=2 children.  
    return 0

def projection_time(projectionnode):
    for metric in projectionnode["metrics"]:
        if metric["name"] == "elapsed_compute":
            return metric["value"]

    # projectionnode was never executed
    # can be due to early stopping in case of a multisemijoin with >=2 children.  
    return 0

def aggregate_time(aggregatenode):
    for metric in aggregatenode["metrics"]:
        if metric["name"] == "elapsed_compute":
            return metric["value"]
    raise ValueError("aggregate_time metric not found")

def parquet_time(parquetnode):
    for metric in parquetnode["metrics"]:
        if metric["name"] == "time_elapsed_processing":
            return metric["value"]
    
    # parquetexec was never executed
    # can be due to early stopping in case of a multisemijoin with >=2 children.  
    return 0

def coalesce_batches_time(coalescenode):
    for metric in coalescenode["metrics"]:
        if metric["name"] == "elapsed_compute":
            return metric["value"]
        
    # coalescebatchesexec was never executed
    # can be due to early stopping in case of a multisemijoin with >=2 children.
    return 0
        
def collect_timings(node, timings: dict):
    if node["operator"].startswith("FilterExec"):
        timings["filter_time"] += filter_time(node)
    elif node["operator"].startswith("ProjectionExec"):
        timings["projection_time"] += projection_time(node)
    elif node["operator"].startswith("Aggregate"):
        timings["aggregate_time"] += aggregate_time(node)
    elif node["operator"].startswith("ParquetExec"):
        timings["parquet_time"] += parquet_time(node)
    elif node["operator"].startswith("CoalesceBatchesExec"):
        timings["coalesce_batches_time"] += coalesce_batches_time(node)

    for child in node["children"]:
        collect_timings(child, timings)


for entry in metrics:
    method = entry["params"]["method"]
    if method=="BinaryJoin": # skip binaryjoin, we're analyzing 2NSA now
        continue
    
    query = entry["params"]["query"]
    metrics = entry["plan"]["metrics"]
    row = {
        "method": method,   
        "query": query,
    }
    metrics = {"filter_time": 0, "projection_time": 0, "aggregate_time": 0, "parquet_time": 0, "coalesce_batches_time": 0}
    collect_timings(entry["plan"], metrics)
    # already convert all timings from ns to s
    metrics = {key: value / 1_000_000_000 for key, value in metrics.items()}
    row.update(metrics)
    projection.append(row)

yann_metrics = pd.DataFrame(projection)
yann_metrics["method"] = "SYA"
yann_metrics = yann_metrics.groupby(["query","method"]).aggregate("median")
yann_metrics.reset_index(inplace=True)

yann_metrics = pd.merge(
    df_agg[df_agg["method"] == "SYA"],
    yann_metrics,
    on=["query","method"]
)

# join_time = total_time - filter_time - projection_time - aggregate_time - parquet_time - coalesce_batches_time

yann_metrics["hashjoin_time"] = yann_metrics["execution_time"] - yann_metrics["aggregate_time"] - yann_metrics["filter_time"] - yann_metrics["projection_time"] - yann_metrics["parquet_time"] - yann_metrics["coalesce_batches_time"]
yann_metrics

Unnamed: 0,query,method,total_time,execution_time,filter_time,projection_time,aggregate_time,parquet_time,coalesce_batches_time,hashjoin_time
0,dblp_acyclic_201_00,SYA,,0.783645,0.019637,0.000000e+00,0.000522,0.096673,0.000278,0.666536
1,dblp_acyclic_201_01,SYA,,0.191551,0.008359,0.000000e+00,0.000348,0.095876,0.000120,0.086848
2,dblp_acyclic_201_02,SYA,,0.238054,0.017475,0.000000e+00,0.000212,0.071614,0.000167,0.148587
3,dblp_acyclic_201_03,SYA,,0.561408,0.018958,0.000000e+00,0.000370,0.081908,0.000266,0.459905
4,dblp_acyclic_201_04,SYA,,0.305297,0.023994,0.000000e+00,0.000362,0.101333,0.000229,0.179379
...,...,...,...,...,...,...,...,...,...,...
1589,yago_acyclic_tree_6_75,SYA,,0.297645,0.012891,1.832740e-04,0.000157,0.122635,0.002218,0.159561
1590,yago_acyclic_tree_6_76,SYA,,0.052425,0.003153,1.348000e-06,0.000152,0.020434,0.000618,0.028067
1591,yago_acyclic_tree_6_77,SYA,,0.007968,0.000274,1.151000e-06,0.000010,0.002098,0.000041,0.005543
1592,yago_acyclic_tree_6_78,SYA,,0.248917,0.026662,3.580000e-06,0.000273,0.090494,0.003961,0.127522


In [22]:
yann_metrics.drop(columns=["filter_time","projection_time","aggregate_time","parquet_time","coalesce_batches_time"], inplace=True)
yann_metrics.to_csv('timings_agg_revision.csv', index=False, header=False, mode='a')
yann_metrics

Unnamed: 0,query,method,total_time,execution_time,hashjoin_time
0,dblp_acyclic_201_00,SYA,,0.783645,0.666536
1,dblp_acyclic_201_01,SYA,,0.191551,0.086848
2,dblp_acyclic_201_02,SYA,,0.238054,0.148587
3,dblp_acyclic_201_03,SYA,,0.561408,0.459905
4,dblp_acyclic_201_04,SYA,,0.305297,0.179379
...,...,...,...,...,...
1589,yago_acyclic_tree_6_75,SYA,,0.297645,0.159561
1590,yago_acyclic_tree_6_76,SYA,,0.052425,0.028067
1591,yago_acyclic_tree_6_77,SYA,,0.007968,0.005543
1592,yago_acyclic_tree_6_78,SYA,,0.248917,0.127522


## DuckDB runtimes

In [23]:
def total_mark_join_time(html):
    pattern = r"<b>\s*HASH JOIN\s*\(([\d|\.]+)s\)\s*<\/b>\s*</p>\s*<p>\s*MARK"
    mark_times = [float(match.group(1)) for match in re.finditer(pattern, html)]
    return sum(mark_times)

def extract_times_from_timing_table(html_file_path: str):
    with open(html_file_path, 'r') as file:
        content = file.read()
    
    soup = BeautifulSoup(content, 'html.parser')
    
    rows = soup.find_all('tr')
    
    total_time = None
    execution_time = None
    seq_scan_time = None
    hashjoin_time = None
    aggregate_time = None
    projection_time = None
    filter_time = None
    
    for row in rows:
        cells = row.find_all('td')
        if len(cells) > 1:
            phase = cells[0].get_text(strip=True)
            time = cells[1].get_text(strip=True)
            
            if phase == "TOTAL TIME":
                total_time = float(time)
            elif phase == 'Execution Time':
                execution_time = float(time)
            elif phase == 'SEQ_SCAN':
                seq_scan_time = float(time)
            elif phase == "HASH_JOIN":
                hashjoin_time = float(time)
            elif phase == "UNGROUPED_AGGREGATE":
                aggregate_time = float(time)
            elif phase == "PROJECTION":
                projection_time = float(time)
            elif phase == "FILTER":
                filter_time = float(time)

    mark_join_time = total_mark_join_time(content)
    
    return {
        'total_time': total_time,
        'execution_time': execution_time,
        'seq_scan_time': seq_scan_time,
        'hashjoin_time': hashjoin_time,
        "markjoin_time": mark_join_time,
        'aggregate_time': aggregate_time,
        'projection_time': projection_time,
        'filter_time': filter_time,
    }

def build_table(duckdb_plans: str):
    """ 
    Build Pandas DataFrame with timings reported by DuckDB html query plans.
    `duckdb_plans` is the path to the folder containing the html files.
    """
    duckdb_df = []

    for query_folder in os.listdir(duckdb_plans):
        query_folder_path = os.path.join(duckdb_plans, query_folder)
        if not os.path.isdir(query_folder_path):
            continue
        
        for query_file in os.listdir(query_folder_path):
            if not query_file.endswith('.html'):
                continue
            
            query_file_path = os.path.join(query_folder_path, query_file)
            times = extract_times_from_timing_table(query_file_path)
            duckdb_df.append(
                {
                    "query": query_folder,
                    "run": os.path.splitext(query_file)[0],
                    "total_time": times['total_time'],
                    "execution_time": times['execution_time'],
                    "aggregate_time": times['aggregate_time'],
                    "hashjoin_time": times["hashjoin_time"],
                    "markjoin_time": times["markjoin_time"],
                    "projection_time": times['projection_time'],
                    "filter_time": times['filter_time'],
                    "seq_scan_time": times['seq_scan_time'],
                }
            )

    duckdb_df = pd.DataFrame(duckdb_df)
    return duckdb_df


duckdb_plans = "../../query_plans/ce_duckdb/2_original_with_aliases"
duckdb_df = build_table(duckdb_plans)
duckdb_df.drop(columns=["run"], inplace=True)
duckdb_df["method"] = "DuckDB-Bin"

# Time (sec) spent on SequentialScan
print(duckdb_df["seq_scan_time"].describe())

# Percentage of time spent on SequentialScan
print((duckdb_df["seq_scan_time"] / duckdb_df["execution_time"] * 100).describe())

# # subtract markjoin time from hashjoin time to get the actual time spent in computing inner hashjoins
duckdb_df["hashjoin_time(s)"] = duckdb_df["hashjoin_time"] - duckdb_df["markjoin_time"]
duckdb_df.drop(columns=["aggregate_time","hashjoin_time","markjoin_time","projection_time","filter_time","seq_scan_time"], inplace=True)
duckdb_df.rename(columns={"hashjoin_time(s)":"hashjoin_time"}, inplace=True)

duckdb_df = duckdb_df.groupby(["query","method"]).aggregate("median")
duckdb_df.reset_index(inplace=True)
duckdb_df.to_csv('timings_agg_revision.csv', index=False, header=False, mode='a')
duckdb_df

count    16070.000000
mean         0.014703
std          0.015192
min          0.000048
25%          0.001344
50%          0.011369
75%          0.022864
max          0.521192
Name: seq_scan_time, dtype: float64
count    16070.000000
mean        11.477615
std         12.301813
min          0.001269
25%          2.324188
50%          7.044001
75%         17.400995
max         92.375257
dtype: float64


Unnamed: 0,query,method,total_time,execution_time,hashjoin_time
0,dblp_acyclic_201_00,DuckDB-Bin,1.118311,0.859715,0.824105
1,dblp_acyclic_201_01,DuckDB-Bin,0.166843,0.147681,0.125715
2,dblp_acyclic_201_02,DuckDB-Bin,0.234484,0.205610,0.179452
3,dblp_acyclic_201_03,DuckDB-Bin,0.826783,0.590417,0.561459
4,dblp_acyclic_201_04,DuckDB-Bin,0.420088,0.350535,0.313139
...,...,...,...,...,...
1602,yago_acyclic_tree_6_75,DuckDB-Bin,0.258555,0.248308,0.204962
1603,yago_acyclic_tree_6_76,DuckDB-Bin,0.055133,0.052372,0.043944
1604,yago_acyclic_tree_6_77,DuckDB-Bin,0.039601,0.027772,0.025055
1605,yago_acyclic_tree_6_78,DuckDB-Bin,0.200243,0.193142,0.144882


## Umbra runtimes

In [24]:
def parse_umbra_timings(file: str, benchmark: str, method: str) -> pd.DataFrame:
    df = pd.read_csv(file)
    df["benchmark"] = df["name"].apply(lambda x: x.split(":")[0])
    df["query"] = df["name"].apply(lambda x: x.split(":")[1].split(".")[0])
    df["method"] = method
    df = df[df["benchmark"] == benchmark]
    df = df[["query","method","compilation_time_median","execution_time_median"]]
    df["total_time"] = df["compilation_time_median"] + df["execution_time_median"]
    df["hashjoin_time"]=np.nan
    df.rename(columns={"execution_time_median":"execution_time"}, inplace=True)
    return df[["query","method","total_time","execution_time","hashjoin_time"]]

In [25]:
method = "Umbra-Default"
file = "../../umbra/results/benchmark_umbra_default.csv"
benchmark = "ce"

umbra = parse_umbra_timings(file, benchmark, method)
umbra.to_csv('timings_agg_revision.csv', index=False, header=False, mode='a')
umbra

Unnamed: 0,query,method,total_time,execution_time,hashjoin_time
259,watdiv_acyclic_209_17,Umbra-Default,0.111498,0.076962,
260,job_acyclic_108_66,Umbra-Default,0.645627,0.625655,
261,yago_acyclic_chain_9_33,Umbra-Default,0.055163,0.012504,
262,job_acyclic_105_23,Umbra-Default,0.194489,0.174164,
263,yago_acyclic_chain_6_36,Umbra-Default,0.049708,0.024409,
...,...,...,...,...,...
1861,epinions_acyclic_205_18,Umbra-Default,0.030523,0.002348,
1862,job_acyclic_103_24,Umbra-Default,0.164101,0.148505,
1863,watdiv_acyclic_208_09,Umbra-Default,0.060914,0.027938,
1864,epinions_acyclic_210_12,Umbra-Default,0.039679,0.006494,


In [26]:
method = "Umbra-L&E"
file = "../../umbra/results/benchmark_umbra_le.csv"

umbra = parse_umbra_timings(file, benchmark, method)
umbra.to_csv('timings_agg_revision.csv', index=False, header=False, mode='a')
umbra

Unnamed: 0,query,method,total_time,execution_time,hashjoin_time
256,watdiv_acyclic_209_17,Umbra-L&E,0.109684,0.053971,
257,job_acyclic_108_66,Umbra-L&E,0.152284,0.121531,
258,yago_acyclic_chain_9_33,Umbra-L&E,0.080188,0.009172,
259,job_acyclic_105_23,Umbra-L&E,0.092846,0.061791,
260,yago_acyclic_chain_6_36,Umbra-L&E,0.045349,0.006167,
...,...,...,...,...,...
1858,epinions_acyclic_205_18,Umbra-L&E,0.048367,0.001513,
1859,job_acyclic_103_24,Umbra-L&E,0.085505,0.061851,
1860,watdiv_acyclic_208_09,Umbra-L&E,0.075833,0.020177,
1861,epinions_acyclic_210_12,Umbra-L&E,0.058673,0.003569,


In [27]:
# method = "Umbra-Interpreted"
# file = "../../umbra/results/benchmark_umbra_interpreted.csv"

# umbra = parse_umbra_timings(file, benchmark, method)
# umbra.to_csv('timings_agg_revision.csv', index=False, header=False, mode='a')
# umbra

In [28]:
# method = "Umbra-Chained"
# file = "../../umbra/results/benchmark_umbra_chained.csv"

# umbra = parse_umbra_timings(file, benchmark, method)
# umbra.to_csv('timings_agg_revision.csv', index=False, header=False, mode='a')
# umbra