# Analyzing the logs

In [None]:
# Run from the top of the repository
%cd ..

In [None]:
import pandas as pd
import polars as pl
import yaml
import itertools
import os

with open("calkit.yaml") as f:
    calkit_config = yaml.safe_load(f)

# Load the BOOM and Kowalski configurations
boom_params = {}
for i in calkit_config["pipeline"]["stages"]["benchmark-boom"]["iterate_over"]:
    boom_params[i["arg_name"]] = i["values"]

kowalski_params = {}
for i in calkit_config["pipeline"]["stages"]["benchmark-kowalski"][
    "iterate_over"
]:
    kowalski_params[i["arg_name"]] = i["values"]

# Form the matrix of parameters for each
param_names = list(boom_params.keys())
param_vals = list(boom_params.values())
vals_product = list(itertools.product(*param_vals))
for i in calkit_config["pipeline"]["stages"]["benchmark-boom-more"][
    "iterate_over"
][0]["values"]:
    vals_product.append(tuple(i))
results_boom = []

for vals in vals_product:
    current_params = dict(zip(param_names, vals))
    boom_config = (
        f"na={current_params['n_alert_workers']}-"
        f"nml={current_params['n_ml_workers']}-"
        f"nf={current_params['n_filter_workers']}"
    )
    boom_consumer_log_fpath = f"logs/boom-{boom_config}/consumer.log"
    boom_scheduler_log_fpath = f"logs/boom-{boom_config}/scheduler.log"
    # To calculate BOOM wall time, take first timestamp from the consumer log
    # as the start and the last timestamp of the scheduler as the end
    if not os.path.isfile(boom_consumer_log_fpath):
        print(f"WARNING: {boom_consumer_log_fpath} does not exist")
        continue
    with open(boom_consumer_log_fpath) as f:
        line = f.readline()
        t1_b = pd.to_datetime(
            line.split()[2].replace("\x1b[2m", "").replace("\x1b[0m", "")
        )
    with open(boom_scheduler_log_fpath) as f:
        lines = f.readlines()
        line = lines[-3]
        t2_b = pd.to_datetime(
            line.split()[2].replace("\x1b[2m", "").replace("\x1b[0m", "")
        )
    current_params["start_time"] = t1_b
    current_params["end_time"] = t2_b
    boom_wall_time = t2_b - t1_b
    results_boom.append(current_params)

df_boom = pl.DataFrame(results_boom).with_columns(
    (pl.col("end_time") - pl.col("start_time"))
    .dt.total_seconds()
    .alias("wall_time_s")
)

# Kowalski only has one parameter name, so we don't need the product
param_name = list(kowalski_params.keys())[0]
param_vals = kowalski_params[param_name]
results_kowalski = []

for val in param_vals:
    res = {param_name: val}
    kowalski_config = f"n={val}"
    log_fpath = f"logs/kowalski-{kowalski_config}/supervisord.log"
    if not os.path.isfile(log_fpath):
        print(f"WARNING: {log_fpath} does not exist")
        continue
    # To calculate Kowalski wall time, just use the supervisord logs
    with open(log_fpath) as f:
        lines = f.readlines()
        for line in lines:
            if "alert-broker-ztf entered RUNNING state" in line:
                t1_k = pd.to_datetime("T".join(line.split()[:2]))
                break
        for line in lines:
            if "received SIGTERM indicating exit request" in line:
                t2_k = pd.to_datetime("T".join(line.split()[:2]))
                break
    # Print the total time of each, in minutes
    kowalski_wall_time = t2_k - t1_k  # type: ignore
    res["start_time"] = t1_k
    res["end_time"] = t2_k
    results_kowalski.append(res)

df_kowalski = pl.DataFrame(results_kowalski).with_columns(
    (pl.col("end_time") - pl.col("start_time"))
    .dt.total_seconds()
    .alias("wall_time_s")
)

# Write results to file for later analysis
os.makedirs("results", exist_ok=True)
df_boom.write_csv("results/boom.csv")
df_kowalski.write_csv("results/kowalski.csv")

# Calculate throughput factor from a specific config for each
boom_wall_time = (
    df_boom.filter(
        (pl.col("n_alert_workers") == 3)
        & (pl.col("n_ml_workers") == 3)
        & (pl.col("n_filter_workers") == 1)
    )
    .select("wall_time_s")
    .row(0)[0]
)
kowalski_wall_time = (
    df_kowalski.filter(pl.col("n_workers") == 7)
    .select(pl.col("wall_time_s"))
    .row(0)[0]
)

boom_throughput_factor = kowalski_wall_time / boom_wall_time
print(f"BOOM throughput factor: {boom_throughput_factor:.1f}")

In [None]:
pl.Config.set_tbl_rows(50)
df_boom

In [None]:
df_kowalski

In [None]:
# TODO: Put actual results here
kafka_ingest_strate_mbps = "XXXXX"
kafka_ingest_factor = "XXXXX"

template = r"""% This file was automatically generated; edits will be overwritten!
\newcommand{\boomthroughputfactor}{BOOM_THROUGHPUT_FACTOR}
\newcommand{\kakfaingestratembps}{KAFKA_INGEST_STRATE_MBPS}
\newcommand{\kakfaingestfactor}{KAFKA_INGEST_FACTOR}
"""

# This might be better with an f-string, but this is TeX and who wants to deal
# with escaping all the braces?
template = (
    template.replace(
        "BOOM_THROUGHPUT_FACTOR", f"{round(boom_throughput_factor):1d}"
    )
    .replace("KAFKA_INGEST_STRATE_MBPS", kafka_ingest_strate_mbps)
    .replace("KAFKA_INGEST_FACTOR", kafka_ingest_factor)
)

with open("paper/results.tex", "w") as f:
    f.write(template)

In [None]:
# Plot Kowalski scaling
import os

import plotly.graph_objects as go

normalized_throughput = (
    df_kowalski["wall_time_s"].max() / df_kowalski["wall_time_s"]
)

fig = go.Figure()
fig.add_trace(
    go.Scatter(
        x=df_kowalski["n_workers"],
        y=normalized_throughput,
        mode="markers+lines",
        marker=dict(size=10),
        name="Kowalski",
    )
)
fig.update_layout(
    xaxis_title="Number of workers",
    yaxis_title="Normalized throughput",
    title=None,
)
# Shrink top margin
fig.update_layout(margin=dict(t=65))

# Now add BOOM
# For a given worker sum, select the highest normalized throughput, i.e.,
# group by n_workers and select the max throughput
df_boom = df_boom.with_columns(
    n_workers=(
        pl.col("n_alert_workers")
        + pl.col("n_ml_workers")
        + pl.col("n_filter_workers")
    ),
    normalized_throughput=(
        pl.col("wall_time_s").max() / pl.col("wall_time_s")
    ),
)
df_boom_max = (
    df_boom.group_by("n_workers")
    .agg(max_throughput=pl.col("normalized_throughput").max())
    .sort("n_workers")
)

fig.add_trace(
    go.Scatter(
        x=df_boom_max["n_workers"],
        y=df_boom_max["max_throughput"],
        mode="markers+lines",
        marker=dict(size=10),
        name="BOOM",
        marker_symbol="triangle-up",
        marker_color="orange",
    )
)

os.makedirs("paper/figures", exist_ok=True)
fig.write_image("paper/figures/scaling.png", width=450, height=350)
fig