In [None]:
import duckdb
import matplotlib.pyplot as plt
import os
import pandas as pd
import seaborn as sns
import sys

directory_path = os.path.abspath(os.path.join('../utils/'))
if directory_path not in sys.path:
    sys.path.append(directory_path)
from functions import *
from constant import *

# Experiment parameters, need to be set before running this notebook.
EXPERIMENT_ID = ["spark_del_sf_1000"]  
EXPERIMENT_START_TIME = ["2023-06-29T14:39:03.335772Z"]
#PHASE_IDS = ["single_user_1_data_maintenance_1", "single_user_2_optimize_1", "single_user_2o_data_maintenance_2", "single_user_3_optimize_2", "single_user_3o_data_maintenance_3", "single_user_4_optimize_3"]
PHASE_IDS = ["single_user_1_data_maintenance_1", "single_user_2o_data_maintenance_2", "single_user_3o_data_maintenance_3"]

# Only one task type can be set, use prefixes such as 'single_user' or 'data_maintenance'. All executions times of a task prefix within a phase are summed up.
TASK_PREFIX = "single_user"
TASK_ABBRV = "SU"

In [None]:
# --- Check input validity and create DB connection --- #
assert len(EXPERIMENT_ID)==len(EXPERIMENT_START_TIME)

# Connect to database.
con = duckdb.connect(database=DUCKDB_PATH, read_only=True)

In [None]:
# --- Data manipulations --- #

# Retrieve relevant data.
EXP_DATA = pd.DataFrame()
for idx, id in enumerate(EXPERIMENT_ID):
    EXP_DATA = pd.concat([EXP_DATA, retrieve_grouped_event_df(con, id, EXPERIMENT_START_TIME[idx], PHASE_IDS)])
    EXP_DATA["exp_name"] = id
EXP_DATA = filterByEventType(EXP_DATA, "EXEC_TASK")
EXP_DATA = filterByEventPrefix(EXP_DATA, TASK_PREFIX)

# Create labels for tasks.
for idx, value in enumerate(EXP_DATA['group_name'].unique()):
        EXP_DATA.loc[EXP_DATA["group_name"] == value, "task_id"] = TASK_ABBRV + "-" + str(idx + 1)

# Calculate latency for each element.
EXP_DATA['time_diff_in_mins'] = EXP_DATA.apply(lambda x: time_diff_in_minutes(x['event_start_time'], x['event_end_time']), axis=1)

In [None]:
def get_name(str):
    if 'Del' in str or 'DBX' in str or 'del' in str:
        return 'Delta'
    elif 'ib' in str and 'cow' in str:
        return 'Iceberg (CoW)'
    elif 'ib' in str and 'mor' in str:
        return 'Iceberg (MoR)'
    elif 'hudi' in str and 'cow' in str:
        return 'Hudi (CoW)'
    elif 'hudi' in str and 'mor' in str:
        return 'Hudi (MoR)'

def get_dashes_val(str):
    if 'Delta' in str:
        return (1,0)
    elif 'Iceberg (CoW)' in str and 'CoW' in str:
        return (1,0)
    elif 'Iceberg (MoR)' in str and 'MoR' in str:
        return (1,1)
    elif 'Hudi (CoW)' in str:
        return (1,0)
    elif 'Hudi (MoR)' in str:
        return (1,1)

def get_markers_val(str):
    if 'Delta' in str:
        return 'X'
    elif 'Iceberg (CoW)' in str and 'CoW' in str:
        return 'X'
    elif 'Iceberg (MoR)' in str and 'MoR' in str:
        return 'P'
    elif 'Hudi (CoW)' in str:
        return 'X'
    elif 'Hudi (MoR)' in str:
        return 'P'

EXP_DATA['ExpLabel'] = EXP_DATA.apply(lambda x: get_name(x['exp_name']), axis=1)

custom_palette = {}
custom_palette['Delta'] = 'red'
custom_palette['Hudi (CoW)'] = 'green'
custom_palette['Hudi (MoR)'] = 'green'
custom_palette['Iceberg (CoW)'] = 'blue'
custom_palette['Iceberg (MoR)'] = 'blue'

dashes = []
markers = []
for palette_key in custom_palette.keys():
    dashes.append(get_dashes_val(palette_key))
    markers.append(get_markers_val(palette_key))

In [None]:
# Group data appropriately.
grouped_df = EXP_DATA.groupby(['task_id','ExpLabel'], as_index=False)['time_diff_in_mins'].sum()

# --- Plot the data --- #
sns.set(rc={'figure.figsize':(8,3)})
sns.set(font_scale=1.2)
sns.set_style("whitegrid")

# Plot latency (in order of ids).
sns.lineplot(x='task_id', y='time_diff_in_mins', hue='ExpLabel', data=grouped_df, palette=custom_palette, errorbar=None, style="ExpLabel", dashes=dashes, markers=markers, linewidth = 2, markersize=10)
#plt.legend(loc='upper left')
plt.legend().set_visible(False)
plt.ylabel("Latency (mins)")
plt.xlabel("Task ID")