## Experiment

In [None]:
from yardstick_benchmark.provisioning import Das
from yardstick_benchmark.monitoring import Telegraf
from yardstick_benchmark.games.minecraft.server import PaperMC
from yardstick_benchmark.games.minecraft.workload.diskload import DiskLoad
from yardstick_benchmark.games.minecraft.workload.complex_walkaround import ComplexWalkAround
from datetime import timedelta

import yardstick_benchmark
from time import sleep
from datetime import timedelta
from pathlib import Path
import os
import shutil

# output folder
dest = Path(f"/var/scratch/{os.getlogin()}/yardstick/output")
if dest.exists():
    shutil.rmtree(dest)

# ansible config (unchanged) 
ansible_config_path = "ansible.cfg"
content = """\
[defaults]
host_key_checking = False

[ssh_connection]
pipelining = True
ssh_args = -o ControlMaster=auto -o ControlPersist=60s
"""
with open(ansible_config_path, 'w') as f:
    f.write(content)
os.environ["ANSIBLE_CONFIG"] = str(Path.cwd() / ansible_config_path)

# provision 
das = Das()
nodes = das.provision(num=2)

try:
    yardstick_benchmark.clean(nodes)

    user = os.getlogin()
    pmc_wd = f"/local/{user}/papermc"
    yardstick_base = f"/local/{user}/yardstick"

    # metrics (Telegraf)
    telegraf = Telegraf(nodes)
    telegraf.add_input_jolokia_agent(nodes[0])
    telegraf.add_input_execd_minecraft_ticks(nodes[0])

    # CHANGED: pass wd + config + versions to ALL Telegraf actions
    for act in (telegraf.deploy_action, telegraf.start_action, telegraf.stop_action, telegraf.cleanup_action):
        act.extravars["wd_base"] = yardstick_base                              
        act.extravars["telegraf_version"] = "1.30.3"
    # server-only procstat needs the PaperMC pid path:
    telegraf.deploy_action.extravars["papermc_wd"] = pmc_wd

    # server (PaperMC)
    papermc = PaperMC(nodes[:1], version="1.19.4", build="550")

    # CHANGED: pass wd + jolokia_version to ALL PaperMC actions
    for act in (papermc.deploy_action, papermc.start_action, papermc.stop_action, papermc.cleanup_action):
        act.extravars["wd"] = pmc_wd
        act.extravars["jolokia_version"] = "2.0.3"

    # deploy/start metrics & server 
    telegraf.deploy()
    telegraf.start()

    papermc.deploy()
    papermc.start()
    
    # workload (DiskLoad, RCON-only worker script on workers)
    server  = nodes[0]
    workers = nodes[1:]

    # workload selector 
    WORKLOAD = os.environ.get("WORKLOAD", "complex")  # "complex" or "diskload"

    if WORKLOAD == "diskload":
        # DiskLoad (RCON-driven stress) 
        wl = DiskLoad(
            workers,               # workers only
            server.host,           # mc_host
            teleport_interval=6,
            radius=2000,
            bots_per_node=50,
        )
        for act in (wl.deploy_action, wl.start_action, wl.stop_action, wl.cleanup_action):
            act.extravars.update({
                "wd_base":            yardstick_base,   # per-host roots: /local/$USER/yardstick/<host>
                "builders_per_node":  50,
                "fills_per_interval": 2,
                "fill_size":          128,
                "fill_height":        8,
                "flush_every":        10,
                "forceload":          True,
                "rcon_port":          25575,
                "rcon_password":      "password",
            })

    elif WORKLOAD == "complex":
        # ComplexWalkAround (more realistic gameplay)
        wl = ComplexWalkAround(
            [server, *workers],          
            server_host=server.host,
            duration=timedelta(minutes=10),
            bots_per_node=50,
        )
        for act in (wl.deploy_action, wl.start_action, wl.stop_action, wl.cleanup_action):
            act.extravars.update({
                "wd_base":       yardstick_base,
                "wd":            f"{yardstick_base}/{{{{ inventory_hostname }}}}",  # per-host workdir
                "radius":        800,
                "goal_interval": 20,
                "build_interval":7,
                "mine_interval": 11,
                "flush_every":   60,
                "rcon_port":     25575,
                "rcon_password": "password",
            })
    else:
        raise ValueError(f"Unknown WORKLOAD '{WORKLOAD}' (use 'complex' or 'diskload')")

    # common run
    wl.deploy()
    wl.start()

    # run duration 
    sleep_time = 500
    print(f"sleeping for {sleep_time} seconds")
    sleep(sleep_time)

    # teardown
    wl.stop()
    wl.cleanup()

    sleep(5)
    papermc.stop()

    yardstick_benchmark.fetch(dest, nodes)

    papermc.cleanup()

    telegraf.stop()
    telegraf.cleanup()

    

finally:
    yardstick_benchmark.clean(nodes)
    das.release(nodes)


PLAY [Clean data from nodes] ***************************************************

TASK [Gathering Facts] *********************************************************
[0;32mok: [node027][0m
[0;32mok: [node028][0m

TASK [Remove data from nodes] **************************************************
[0;32mok: [node027][0m
[0;32mok: [node028][0m

PLAY RECAP *********************************************************************
[0;32mnode027[0m                    : [0;32mok=2   [0m changed=0    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   
[0;32mnode028[0m                    : [0;32mok=2   [0m changed=0    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   

PLAY [Deploy Telegraf] *********************************************************

TASK [Gathering Facts] *********************************************************
[0;32mok: [node027][0m
[0;32mok: [node028][0m

TASK [Create Telegraf directory] ********************************************

## Data Pre-Processing

In [26]:
import glob
import pandas as pd

raw_data_files = glob.glob(f"{dest}/**/metrics-*.csv", recursive=True)
print(raw_data_files)

for raw_data_file in raw_data_files:
    metrics_file = Path(raw_data_file)
    keys = {}
    with open(metrics_file) as fin:
        for line in fin:
            first_delim = line.find(",")
            second_delim = line.find(",", first_delim+1)
            key = line[first_delim+1:second_delim]
            if key not in keys:
                keys[key] = open(metrics_file.parent / f"{key}.csv", "w+")
            keys[key].write(line)
    for key, fd in keys.items():
        fd.close()

['/var/scratch/eko229/yardstick/output/node027/metrics-node027.csv', '/var/scratch/eko229/yardstick/output/node028/metrics-node028.csv']


## Post-Processing


In [27]:
import getpass, glob, numpy as np, pandas as pd, warnings
from pathlib import Path

DEBUG = False  # set True for full per-device tables
warnings.filterwarnings("ignore", category=DeprecationWarning)

# paths 
user = getpass.getuser()
dest = Path(f"/var/scratch/{user}/yardstick/output")

# CPU UTILIZATION 
cpu_dfs = []
for fn in glob.glob(str(dest / "**" / "cpu.csv"), recursive=True):
    df = pd.read_csv(
        fn, header=None, on_bad_lines="skip",
        names=["timestamp","measurement","core_id","cpu","host","physical_id",
               "time_active","time_guest","time_guest_nice","time_idle","time_iowait",
               "time_irq","time_nice","time_softirq","time_steal","time_system","time_user"]
    )
    df = df[df.cpu == "cpu-total"]
    df["time_active"] = pd.to_numeric(df["time_active"], errors="coerce")
    df["time_idle"]   = pd.to_numeric(df["time_idle"],   errors="coerce")
    df["util"]        = 100 * df["time_active"] / (df["time_active"] + df["time_idle"])
    cpu_dfs.append(df)

cpu_df       = pd.concat(cpu_dfs, ignore_index=True) if cpu_dfs else pd.DataFrame()
avg_cpu_util = cpu_df["util"].mean() if not cpu_df.empty else float("nan")

# MEMORY USAGE
mem_dfs = []
for fn in glob.glob(str(dest / "**" / "mem.csv"), recursive=True):
    df = pd.read_csv(fn, header=None, on_bad_lines="skip")
    df["avail_pct"] = pd.to_numeric(df.iloc[:,5], errors="coerce")
    df["used_pct"]  = 100 - df["avail_pct"]
    mem_dfs.append(df)

mem_df       = pd.concat(mem_dfs, ignore_index=True) if mem_dfs else pd.DataFrame()
avg_mem_used = mem_df["used_pct"].mean() if not mem_df.empty else float("nan")

# TICK DURATION & TPS 
tick_paths = glob.glob(str(dest / "**" / "minecraft_tick_times.csv"), recursive=True)
if not tick_paths:
    tick_paths = glob.glob(str(dest / "**" / "execd.csv"), recursive=True)

if not tick_paths:
    tick_df = pd.DataFrame(columns=["timestamp","measurement","host","endpoint","tick_raw"])
else:
    tick_df = pd.read_csv(
        tick_paths[0], header=None, on_bad_lines="skip",
        names=["timestamp","measurement","host","endpoint","tick_raw"]
    )

tick_df["tick_raw"] = pd.to_numeric(tick_df["tick_raw"], errors="coerce")
tick_df = tick_df.dropna(subset=["tick_raw"]).copy()

unit = "N/A"
if not tick_df.empty:
    med = tick_df["tick_raw"].median()
    if med >= 1e6:           # ns
        tick_df["tick_ms"], unit = tick_df["tick_raw"] / 1e6, "ns→ms"
    elif med >= 1e3:         # µs
        tick_df["tick_ms"], unit = tick_df["tick_raw"] / 1e3, "µs→ms"
    elif med < 0.5:          # seconds (only if tiny, e.g., 0.003 s)
        tick_df["tick_ms"], unit = tick_df["tick_raw"] * 1e3, "s→ms"
    else:                    # assume milliseconds
        tick_df["tick_ms"], unit = tick_df["tick_raw"], "ms"
else:
    tick_df["tick_ms"] = []

tick_df = tick_df[(tick_df["tick_ms"] > 0) & (tick_df["tick_ms"] < 10000)]
tick_df["tps"] = (1000.0 / tick_df["tick_ms"]).clip(upper=20)

avg_tick_ms = tick_df["tick_ms"].mean() if not tick_df.empty else float("nan")
med_tick_ms = tick_df["tick_ms"].median() if not tick_df.empty else float("nan")
p95_tick_ms = tick_df["tick_ms"].quantile(0.95) if not tick_df.empty else float("nan")

avg_tps = tick_df["tps"].mean() if not tick_df.empty else float("nan")
p05_tps = tick_df["tps"].quantile(0.05) if not tick_df.empty else float("nan")

print(f"[tick] source={Path(tick_paths[0]).name if tick_paths else 'N/A'} "
      f"unit={unit} median={med_tick_ms:.2f}ms p95={p95_tick_ms:.2f}ms "
      f"avg_tps={avg_tps:.2f} p05_tps={p05_tps:.2f}")

# DISK THROUGHPUT (diskio.csv) 
READ_BYTES_COL, WRITE_BYTES_COL = 8, 12
disk_points_w, disk_points_r, totals_per_host_w, totals_per_host_r = [], [], {}, {}

for fn in glob.glob(str(dest / "**" / "diskio.csv"), recursive=True):
    raw = pd.read_csv(fn, header=None, on_bad_lines="skip")
    if raw.empty or raw.shape[1] <= max(READ_BYTES_COL, WRITE_BYTES_COL):
        continue
    raw = raw.rename(columns={0:"ts",1:"measurement",2:"host",3:"name",
                              READ_BYTES_COL:"read_bytes",WRITE_BYTES_COL:"write_bytes"})
    d = raw[["ts","host","name","read_bytes","write_bytes"]].copy()
    d["ts"]          = pd.to_numeric(d["ts"], errors="coerce")
    d["read_bytes"]  = pd.to_numeric(d["read_bytes"],  errors="coerce")
    d["write_bytes"] = pd.to_numeric(d["write_bytes"], errors="coerce")
    d = d.dropna().sort_values(["host","name","ts"])
    if d.empty: continue

    d = d.groupby("host", group_keys=False, sort=False).apply(
        lambda g: g[g["name"].str.startswith("md")]
                  if g["name"].str.startswith("md").any()
                  else g[~g["name"].str.startswith(("md","sr"))]
    ).reset_index(drop=True)

    if d.empty: continue
    for col, bag, totals in (("write_bytes", disk_points_w, totals_per_host_w),
                              ("read_bytes",  disk_points_r, totals_per_host_r)):
        g = d[["ts","host","name",col]].copy().sort_values(["host","name","ts"])
        g["dt"]     = g.groupby(["host","name"])["ts"].diff()
        g["dbytes"] = g.groupby(["host","name"])[col].diff()
        g = g[(g["dt"] > 0) & (g["dbytes"] >= 0)]
        g[f"{col}_MBps_dev"] = (g["dbytes"] / g["dt"]) / (1024**2)
        bag.append(g[["host","name","ts",f"{col}_MBps_dev"]])

# collapse into summary
disk_w = pd.concat(disk_points_w, ignore_index=True) if disk_points_w else pd.DataFrame()
disk_r = pd.concat(disk_points_r, ignore_index=True) if disk_points_r else pd.DataFrame()
def aggregate_host_MBps(df, col):
    if df.empty: return 0.0, 0.0
    host_series = df.groupby(["host","ts"], as_index=False)[col].sum()
    return float(host_series[col].mean()), float(host_series[col].max())

avg_disk_write_MBps, peak_disk_write_MBps = aggregate_host_MBps(disk_w,"write_bytes_MBps_dev")
avg_disk_read_MBps,  peak_disk_read_MBps  = aggregate_host_MBps(disk_r,"read_bytes_MBps_dev")
avg_disk_MBps  = avg_disk_write_MBps + avg_disk_read_MBps
peak_disk_MBps = peak_disk_write_MBps + peak_disk_read_MBps

# PROCSTAT
proc_parts = []
for fn in glob.glob(str(dest / "**" / "procstat.csv"), recursive=True):
    d = pd.read_csv(fn, header=None, on_bad_lines="skip",
                    names=["ts","measurement","host","pattern","process",
                           "read_bytes","write_bytes","read_count","write_count"])
    if d.empty: continue
    d = d[d["process"] == "java"].copy()
    if d.empty: continue
    d["ts"]          = pd.to_numeric(d["ts"], errors="coerce")
    d["write_bytes"] = pd.to_numeric(d["write_bytes"], errors="coerce")
    d = d.dropna().sort_values(["host","ts"])
    d["dt"]  = d.groupby("host")["ts"].diff()
    d["dwb"] = d.groupby("host")["write_bytes"].diff()
    d = d[(d["dt"] > 0) & (d["dwb"] >= 0)]
    d["MBps"] = (d["dwb"] / d["dt"]) / (1024**2)
    proc_parts.append(d[["host","ts","MBps"]])

proc_df = pd.concat(proc_parts, ignore_index=True) if proc_parts else pd.DataFrame()
avg_proc_MBps  = float(proc_df["MBps"].mean()) if not proc_df.empty else 0.0
peak_proc_MBps = float(proc_df["MBps"].max())  if not proc_df.empty else 0.0
print(f"(procstat) avg Paper write {avg_proc_MBps:.3f} MB/s (peak {peak_proc_MBps:.3f})")

# NETWORK
net_rates = []
for fn in glob.glob(str(dest / "**" / "net.csv"), recursive=True):
    df = pd.read_csv(fn, header=None, on_bad_lines="skip")
    df["timestamp"]  = pd.to_numeric(df.iloc[:,0], errors="coerce")
    df["bytes_sent"] = pd.to_numeric(df.iloc[:,5], errors="coerce")
    df["iface"]      = df.iloc[:,3].astype(str)
    df = df.dropna(subset=["timestamp","bytes_sent"])
    for name, grp in df.groupby("iface"):
        g = grp.sort_values("timestamp").copy()
        g["dt"]     = g["timestamp"].diff()
        g["dbytes"] = g["bytes_sent"].diff()
        g = g[(g["dt"] > 0) & (g["dbytes"] >= 0)]
        g["MBps"]   = (g["dbytes"] / g["dt"]) / (1024**2)
        net_rates.append(g)

avg_net_MBps = float(pd.concat(net_rates)["MBps"].mean()) if net_rates else 0.0

# SUMMARY
summary = pd.DataFrame([{
    "avg_tick_ms":         avg_tick_ms,
    "avg_tps":             avg_tps,
    "avg_cpu_%":           avg_cpu_util,
    "avg_mem_used_%":      avg_mem_used,
    "avg_disk_MBps":       avg_disk_MBps,
    "avg_disk_write_MBps": avg_disk_write_MBps,
    "avg_disk_read_MBps":  avg_disk_read_MBps,
    "avg_proc_MBps":       avg_proc_MBps,
    "avg_net_MBps":        avg_net_MBps,
}])

print("\n=== Experiment Summary ===")
print(summary.round(3).to_string(index=False))

[tick] source=minecraft_tick_times.csv unit=ms median=1.16ms p95=3.05ms avg_tps=20.00 p05_tps=20.00
(procstat) avg Paper write 0.000 MB/s (peak 0.023)

=== Experiment Summary ===
 avg_tick_ms  avg_tps  avg_cpu_%  avg_mem_used_%  avg_disk_MBps  avg_disk_write_MBps  avg_disk_read_MBps  avg_proc_MBps  avg_net_MBps
       1.522     20.0      0.887          10.586          0.023                0.023                 0.0            0.0         0.017
