In [4]:
import os

import datetime
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import seaborn as sns

In [5]:
CLUSTER_SIZES = [
    "spark-worker-m",
    "spark-worker-l"
]

DATABIO_SIZES = {
  "databio-s-1-2":         54_343,
  "databio-s-2-7":        274_266,
  "databio-s-1-0":        321_138,
  "databio-m-7-0":      2_764_185,
  "databio-m-7-3":      4_410_928,
  "databio-l-0-8":    164_214_743,
  "databio-l-4-8":    227_869_400,
  "databio-l-7-8":    307_298_107,
  "databio-xl-3-0": 1_087_646_273
}

RESULTS_PATH_PREFIX = "../results"

In [6]:
def load_benchmarks_interval_join(directories: list[str]):
    datasets = []

    for directory in directories:
        for cluster_size in CLUSTER_SIZES:
            data_files_path = "/".join([RESULTS_PATH_PREFIX, cluster_size, directory])
            print(f"Loading benchmark results from {data_files_path}.")
            
            data_files = [file for file in os.listdir(data_files_path) if file.endswith('.csv')]
            dataset = pd.concat(map(lambda data_file: pd.read_csv(f"{data_files_path}/{data_file}", on_bad_lines="warn"), data_files))
            dataset = dataset.query('result == "success"')
        
            for column_name in ["elapsed_time"]:
                dataset[column_name] = pd.to_numeric(dataset[column_name], errors="coerce").astype("Int64")

            dataset["_directory"] = directory
            dataset["data_suite_type"] = dataset.apply(lambda row: "databio" if row["data_suite"].startswith("databio") else "synthetic", axis=1)
            dataset["data_suite_name"] = dataset.apply(lambda row: row["data_suite"] if row["data_suite_type"] == "databio" else row["data_suite"][:row["data_suite"].rfind('-')], axis=1)
            dataset["data_suite_name_short"] = dataset.apply(lambda row: row["data_suite"].replace("databio-", "") if row["data_suite_type"] == "databio" else row["data_suite_name"], axis=1)
            dataset["data_suite_size"] = dataset.apply(lambda row: DATABIO_SIZES[row["data_suite_name"]] if row["data_suite_type"] == "databio" else row["data_suite"][row["data_suite"].rfind('-') + 1:], axis=1)
            dataset["data_suite_size"] = pd.to_numeric(dataset["data_suite_size"], errors='coerce').astype(int)        
            dataset["elapsed_time_s"] = dataset["elapsed_time"] // 1000
            dataset["cluster_size"] = cluster_size
    
            datasets.append(dataset)

    return datasets    

In [8]:
def plot_all_joins_synthetic_benchmark(join_names = None, directories = ["interval-join"]):
    datasets = load_benchmarks_interval_join(directories)

    synthetic_data = pd.concat(datasets)
    synthetic_data = synthetic_data.query(f"data_suite_type == 'synthetic'")

    if join_names is None:
        join_names = synthetic_data["join_name"].unique()
        join_names = list(join_names)
        join_names.sort()
    else:
        synthetic_data = synthetic_data[synthetic_data["join_name"].isin(join_names)]


    data_sizes = synthetic_data["data_suite_size"].unique()
    data_sizes = list(data_sizes)
    data_sizes.sort()

    data_suites = synthetic_data["data_suite_name"].unique()
    data_suites = list(data_suites)
    data_suites.sort()

    plot_figure, plot_axes = plt.subplots(len(data_suites), len(CLUSTER_SIZES), figsize=(16, 4 * len(data_suites)), sharex = True, sharey = "all")
    plt.ticklabel_format(useOffset=False)
    shared_ax = None

    for (j, cluster_size) in enumerate(CLUSTER_SIZES):
        plot_data = synthetic_data
        plot_data = plot_data.query(f"cluster_size == '{cluster_size}'")
        
        for (i, data_suite_name) in enumerate(data_suites):
            subplot_data = plot_data.query(f"data_suite_name == '{data_suite_name}'")
    
            sns.barplot(x="data_suite_size", y="elapsed_time_s", hue="join_name", data=subplot_data, ax=plot_axes[i, j])
            # plot_axes[i, j].set_xticklabels(plot_axes[i, j].get_xticks(), rotation=45)
            plot_axes[i, j].legend()
            plot_axes[i, j].set_title(f"Cluster: {cluster_size}, dataset: {data_suite_name}.")
            
            plot_axes[i, j].xaxis.set_tick_params(labelbottom=True)
            plot_axes[i, j].yaxis.set_tick_params(labelbottom=True)

    
    plt.show()
    

In [9]:
def plot_all_joins_databio_benchmark(join_names = None, directories = ["interval-join"]):
    datasets = load_benchmarks_interval_join(directories)

    synthetic_data = pd.concat(datasets)
    synthetic_data = synthetic_data.query(f"data_suite_type == 'databio'")

    if join_names is None:
        join_names = synthetic_data["join_name"].unique()
        join_names = list(join_names)
        join_names.sort()
    else:
        synthetic_data = synthetic_data[synthetic_data["join_name"].isin(join_names)]


    data_sizes = synthetic_data["data_suite_size"].unique()
    data_sizes = list(data_sizes)
    data_sizes.sort()

    data_suites = synthetic_data["data_suite_name"].unique()
    data_suites = list(data_suites)
    data_suites.sort()

    plot_figure, plot_axes = plt.subplots(1, len(CLUSTER_SIZES), figsize=(16, 4), sharex = True, sharey = "all")
    plt.ticklabel_format(useOffset=False)
    shared_ax = None

    for (j, cluster_size) in enumerate(CLUSTER_SIZES):
        plot_data = synthetic_data
        plot_data = plot_data.query(f"cluster_size == '{cluster_size}'")
        
        sns.barplot(x="data_suite_name_short", y="elapsed_time_s", hue="join_name", data=plot_data, ax=plot_axes[j])
        # plot_axes[i, j].set_xticklabels(plot_axes[i, j].get_xticks(), rotation=45)
        plot_axes[j].legend()
        plot_axes[j].set_title(f"Cluster: {cluster_size}.")
        
        plot_axes[j].xaxis.set_tick_params(labelbottom=True)
        plot_axes[j].yaxis.set_tick_params(labelbottom=True)

    plt.xticks(rotation=45)
    plt.show()

In [13]:
def load_benchmarks_preprocessing():
    datasets = []

    for cluster_size in CLUSTER_SIZES:
        data_files_path = "/".join([RESULTS_PATH_PREFIX, cluster_size, "preprocessing"])
        print(f"Loading benchmark results from {data_files_path}.")
        
        data_files = [file for file in os.listdir(data_files_path) if file.endswith('.csv')]

        if len(data_files) > 0:
            dataset = pd.concat(map(lambda data_file: pd.read_csv(f"{data_files_path}/{data_file}", on_bad_lines="warn"), data_files))
        
            for column_name in ["lhs_rows_count", "rhs_rows_count"]:
                dataset[column_name] = pd.to_numeric(dataset[column_name], errors="coerce").astype("Int64")
    
            dataset["data_suite_type"] = dataset.apply(lambda row: "databio" if row["data_suite"].startswith("databio") else "synthetic", axis=1)
            dataset["data_suite_name"] = dataset.apply(lambda row: row["data_suite"] if row["data_suite_type"] == "databio" else row["data_suite"][:row["data_suite"].rfind('-')], axis=1)
            dataset["data_suite_name_short"] = dataset.apply(lambda row: row["data_suite"].replace("databio-", "") if row["data_suite_type"] == "databio" else row["data_suite_name"], axis=1)
            dataset["data_suite_size"] = dataset.apply(lambda row: DATABIO_SIZES[row["data_suite_name"]] if row["data_suite_type"] == "databio" else row["data_suite"][row["data_suite"].rfind('-') + 1:], axis=1)
            dataset["data_suite_size"] = pd.to_numeric(dataset["data_suite_size"], errors='coerce').astype(int)        
            dataset["cluster_size"] = cluster_size
    
            datasets.append(dataset)

    return datasets

In [None]:
def plot_preprocessing_synthetic_benchmark(preprocessors = None):
    datasets = load_benchmarks_preprocessing()
    Y_AXES = ["lhs_rows_count", "rhs_rows_count"]

    synthetic_data = pd.concat(datasets)
    synthetic_data = synthetic_data.query("data_suite_type == 'synthetic'")

    if preprocessors is None:
        preprocessors = synthetic_data["preprocessor"].unique()
        preprocessors = list(preprocessors)
        preprocessors.sort()
    else:
        synthetic_data = preprocessors[synthetic_data["preprocessor"].isin(preprocessors)]

    data_sizes = synthetic_data["data_suite_size"].unique()
    data_sizes = list(data_sizes)
    data_sizes.sort()

    data_suites = synthetic_data["data_suite_name"].unique()
    data_suites = list(data_suites)
    data_suites.sort()

    plot_figure, plot_axes = plt.subplots(len(data_suites), len(Y_AXES), figsize=(16, 4 * len(data_suites)), sharex = True, sharey = "all")
    plt.ticklabel_format(useOffset=False)
    shared_ax = None

    for (j, values_to_plot) in enumerate(Y_AXES):
        plot_data = synthetic_data
        
        for (i, data_suite_name) in enumerate(data_suites):
            subplot_data = plot_data.query(f"data_suite_name == '{data_suite_name}'")
            
            sns.barplot(x="data_suite_size", y=values_to_plot, hue="preprocessor", data=subplot_data, ax=plot_axes[i, j])
            plot_axes[i, j].set_yscale('log')
            # plot_axes[i, j].set_xticklabels(plot_axes[i, j].get_xticks(), rotation=45)
            plot_axes[i, j].legend()
            plot_axes[i, j].set_title(f"Count: {values_to_plot}, dataset: {data_suite_name}.")
            
            plot_axes[i, j].xaxis.set_tick_params(labelbottom=True)
            plot_axes[i, j].yaxis.set_tick_params(labelbottom=True)

    
    plt.show()
    

In [None]:
def plot_preprocessing_databio_benchmark(preprocessors = None):
    datasets = load_benchmarks_preprocessing()
    Y_AXES = ["lhs_rows_count", "rhs_rows_count"]

    synthetic_data = pd.concat(datasets)
    synthetic_data = synthetic_data.query("data_suite_type == 'databio'")

    if preprocessors is None:
        preprocessors = synthetic_data["preprocessor"].unique()
        preprocessors = list(preprocessors)
        preprocessors.sort()
    else:
        synthetic_data = preprocessors[synthetic_data["preprocessor"].isin(preprocessors)]

    data_sizes = synthetic_data["data_suite_size"].unique()
    data_sizes = list(data_sizes)
    data_sizes.sort()

    data_suites = synthetic_data["data_suite_name"].unique()
    data_suites = list(data_suites)
    data_suites.sort()

    plot_figure, plot_axes = plt.subplots(1, len(Y_AXES), figsize=(16, 4), sharex = True, sharey = "all")
    plt.ticklabel_format(useOffset=False)
    shared_ax = None

    for (j, values_to_plot) in enumerate(Y_AXES):
        plot_data = synthetic_data
        
        sns.barplot(x="data_suite_name_short", y=values_to_plot, hue="preprocessor", data=plot_data, ax=plot_axes[j])
        plot_axes[j].set_yscale('log')
        # plot_axes[i, j].set_xticklabels(plot_axes[i, j].get_xticks(), rotation=45)
        plot_axes[j].legend()
        plot_axes[j].set_title(f"Count: {values_to_plot}.")
        
        plot_axes[j].xaxis.set_tick_params(labelbottom=True)
        plot_axes[j].yaxis.set_tick_params(labelbottom=True)

    
    plt.show()
    