In [1]:
import numpy as np

from ax.plot.contour import plot_contour
from ax.plot.trace import optimization_trace_single_method
from ax.service.managed_loop import optimize
from ax.metrics.branin import branin
from ax.utils.measurement.synthetic_functions import hartmann6
from ax.utils.notebook.plotting import render, init_notebook_plotting

init_notebook_plotting()

In [2]:
import yaml
import json

# Start from a configuration base for the platform we experiment on
CONFIGURATION_BASE = "./exp_configurations/theta_config.yml"

yaml_config = None

with open(CONFIGURATION_BASE, "r", encoding="utf-8") as cfg_base:
    yaml_config = yaml.load(cfg_base, Loader=yaml.FullLoader)

# Remove keys that are only used inside the file with Yaml anchors/aliases
del yaml_config["storage"]["disk_templates"]
del yaml_config["storage"]["node_templates"]

print(json.dumps(yaml_config, indent=4))

In [3]:
# Define the parameters that will be given to Ax for the optimization loop
# Bounds / value lists are not final

AX_PARAMS = [
    {
        "name": "backbone_bw",
        "type": "range",
        "bounds": [220, 240],
        "value_type": "int",
    },
    {
        "name": "permanent_storage_read_bw",
        "type": "range",
        "bounds": [5, 90],
        "value_type": "int",
    },
    {
        "name": "permanent_storage_write_bw",
        "type": "range",
        "bounds": [5, 90],
        "value_type": "int",
    },
    {
        "name": "preload_percent",
        "type": "choice",
        "is_ordered": True,
        "values": [0.1, 0.2, 0.3],
        "value_type": "float",
    },
    {
        "name": "amdahl",
        "type": "range",
        "bounds": [0.1, 1.0],
        "value_type": "float",
    },
    {
        "name": "walltime_extension",
        "type": "range",
        "bounds": [1.0, 1.3],
        "value_type": "float",
    },
    {
        "name": "disk_rb",
        "type": "range",
        "bounds": [600, 6000],
        "value_type": "int",
    },
    {
        "name": "disk_wb",
        "type": "range",
        "bounds": [300, 3000],
        "value_type": "int",
    },
    {
        "name": "stripe_size",
        "type": "choice",
        "values": [2097152, 4096000, 131072000, 524288000, 1048576000],
        "is_ordered": True,
        "value_type": "int",
    },
    {
        "name": "stripe_count",
        "type": "range",
        "bounds": [1, 40],   # NOTE : never using all OSTs for any allocatin so far
        "value_type": "int",
    },
    {
        "name": "non_linear_coef_read",
        "type": "range",
        "bounds": [0.2, 1],
        "value_type": "float",
    },
    {
        "name": "non_linear_coef_write",
        "type": "range",
        "bounds": [0.2, 1],
        "value_type": "float",
    },
        {
        "name": "read_variability",
        "type": "range",
        "bounds": [0.2, 1],
        "value_type": "float",
    },
        {
        "name": "write_variability",
        "type": "range",
        "bounds": [0.2, 1],
        "value_type": "float",
    },
]

In [4]:
import numpy as np
from scipy.stats import pearsonr

def cohend(d1, d2):
    # calculate the size of samples
    n1, n2 = len(d1), len(d2)
    # calculate the variance of the samples
    s1, s2 = np.var(d1, ddof=1), np.var(d2, ddof=1)
    # calculate the pooled standard deviation
    s = np.sqrt(((n1 - 1) * s1 + (n2 - 1) * s2) / (n1 + n2 - 2))
    # calculate the means of the samples
    u1, u2 = np.mean(d1), np.mean(d2)
    # calculate the effect size
    return (u1 - u2) / s

In [5]:
import random
import threading
import subprocess
import pathlib

DATASET = "theta2022_week4"

experiment_config_seq_nb = 0;
sequence_nb_lock = threading.Lock()

def runSimulationWithParam(parametrization):
    
    backbone_bw = parametrization.get("backbone_bw")
    permanent_storage_read_bw = parametrization.get("permanent_storage_read_bw")
    permanent_storage_write_bw = parametrization.get("permanent_storage_write_bw")
    preload_percent = parametrization.get("preload_percent")
    amdahl = parametrization.get("amdahl")
    walltime_extension = parametrization.get("walltime_extension")
    disk_rb = parametrization.get("disk_rb")
    disk_wb = parametrization.get("disk_wb")
    stripe_size = parametrization.get("stripe_size")
    stripe_count = parametrization.get("stripe_count")
    non_linear_coef_read = parametrization.get("non_linear_coef_read")
    non_linear_coef_write = parametrization.get("non_linear_coef_write")
    read_variability = parametrization.get("read_variability")
    write_variability = parametrization.get("write_variability")
    
    # Update config file according to parameters provided by Ax
    
    yaml_config["general"]["backbone_bw"] = f"{backbone_bw}GBps"
    yaml_config["general"]["permanent_storage_read_bw"] = f"{permanent_storage_read_bw}GBps"
    yaml_config["general"]["permanent_storage_write_bw"] = f"{permanent_storage_write_bw}GBps"
    yaml_config["general"]["preload_percent"] = preload_percent
    yaml_config["general"]["amdahl"] = amdahl
    yaml_config["general"]["walltime_extension"] = walltime_extension
    yaml_config["general"]["non_linear_coef_read"] = non_linear_coef_read
    yaml_config["general"]["non_linear_coef_write"] = non_linear_coef_write
    yaml_config["general"]["read_variability"] = read_variability
    yaml_config["general"]["write_variability"] = write_variability
    
    # WARINING : HERE WE SET THE SAME READ/WRITE BANDWIDTH FOR ALL DISKS - THIS WILL NOT ALWAYS BE THE CASE.
    for storage_node in yaml_config["storage"]["nodes"]:
        for disk in storage_node["template"]["disks"]:
            disk["template"]["read_bw"] = disk_rb
            disk["template"]["write_bw"] = disk_wb

    yaml_config["lustre"]["stripe_size"] = stripe_size
    yaml_config["lustre"]["stripe_count"] = stripe_count

    
    # Save config as file with a unique name for each parameter set
    global sequence_nb_lock
    global experiment_config_seq_nb
    
    random_part = "".join(random.choices(["A", "B", "C", "D", "E", "F", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"], k=4))
    
    sequence_nb_lock.acquire()
    output_configuration = f"./exp_configurations/exp_config_{experiment_config_seq_nb}_{random_part}"
    experiment_config_seq_nb += 1
    sequence_nb_lock.release()
    
    with open(output_configuration, "w", encoding="utf-8") as exp_config:
        print("Dumping configuration to " + output_configuration)
        yaml.dump(yaml_config, exp_config)
        
    # Now run simulatin with the current configuration file
    completed = subprocess.run(["../build/storalloc_wrench", output_configuration, f"../../raw_data_processing/theta/{DATASET}.yaml", random_part], capture_output=True)
    print(f"Simulation with tag {random_part} has completed with status : {completed.returncode}")
    if completed.returncode != 0:
        raise RuntimeError("Simulation did not complete")
    
    result_filename = f"simulatedJobs_{DATASET}__{yaml_config['general']['config_name']}_{yaml_config['general']['config_version']}_{random_part}.yml"
    print(f"Now looking for result file : {result_filename}")
    
    result_file = pathlib.Path(f"./{result_filename}")
    if not result_file.exists() or not result_file.is_file():
        raise RuntimeError(f"Result file {result_filename} was not found")
        
    print(result_file.resolve())
    move_result = subprocess.run(["mv", result_file.resolve(), f"./exp_results/{result_filename}"], capture_output=True)
    if move_result.returncode != 0:
        raise RuntimeError(f"Result file was not moved correctly")

    # No exploit results
    results = None
    with open(f"./exp_results/{result_filename}", "r", encoding="utf-8") as job_results:
        results = yaml.load(job_results, Loader=yaml.CLoader)
        
    # WAIT TIME : 
    wait_time_diffs = []
    sim_wait_time = []
    real_wait_time = []
    
    # RUNTIME
    runtime_diffs = []
    sim_runtime = []
    real_runtime = []
    
    # IO TIME
    io_time_diff = []
    sim_io_time = []
    sim_read_time = []
    sim_write_time = []
    real_io_time = []
    real_read_time = []
    real_write_time = []

    for job in results:
        # WAIT TIME
        wait_time_diffs.append(abs(job["job_waiting_time_s"] - job["real_waiting_time_s"]))
        sim_wait_time.append(job["job_waiting_time_s"])
        real_wait_time.append(job["real_waiting_time_s"])
        # RUNTIME
        runtime_diffs.append(abs(job["job_runtime_s"] - job["real_runtime_s"]))
        sim_runtime.append(job["job_runtime_s"])
        real_runtime.append(job["real_runtime_s"])
        
        
        # IO TIME
        r_io_time = (job["real_cReadTime_s"] +  job["real_cWriteTime_s"] +  job["real_cMetaTime_s"]) / job["real_cores_used"]
        real_io_time.append(r_io_time)
        real_read_time.append(job["real_cReadTime_s"] / job["real_cores_used"])
        real_write_time.append(job["real_cWriteTime_s"] / job["real_cores_used"])

        s_io_time = 0
        s_r_time = 0
        s_w_time = 0
        for action in job["actions"]:
            if action["act_type"] == "COMPUTE" or action["act_type"] == "SLEEP":
                continue
            if action["act_status"] != "COMPLETED":
                continue
            if action["act_type"] == "FILEREAD":
                s_r_time += action["act_duration"]
            if action["act_type"] == "FILEWRITE":
                s_w_time += action["act_duration"]
            s_io_time += action["act_duration"]

        sim_io_time.append(s_io_time)
        sim_read_time.append(s_r_time)
        sim_write_time.append(s_w_time)
        io_time_diff.append(abs(s_io_time - r_io_time))
    
    
    # Correlations / Cohen's d
    
    mean_wait_time_difference = np.mean(wait_time_diffs)
    # Pearson's correlation
    wait_time_corr, _ = pearsonr(sim_wait_time, real_wait_time)
    # Cohen's D 
    wait_time_cohen_d = cohend(sim_wait_time, real_wait_time)
    
    mean_runtime_difference = np.mean(runtime_diffs)
    # Pearson's correlation
    runtime_corr, _ = pearsonr(sim_runtime, real_runtime)
    # Cohen's D 
    runtime_cohen_d = cohend(sim_runtime, real_runtime)
    
    mean_io_time_difference = np.mean(io_time_diff)
    # Pearson's correlation
    io_time_corr, _ = pearsonr(sim_io_time, real_io_time)
    # Cohen's D 
    io_time_cohen_d = cohend(sim_io_time, real_io_time)
    
                                                                                                                                                                           # Adding weight to the io_time metric
    return {"optimization_metric": (abs(1 - runtime_corr) + abs(1 - wait_time_corr) + abs(1 - io_time_corr) + abs(runtime_cohen_d) + abs(wait_time_cohen_d) + abs(io_time_cohen_d))}
    """
    return {
        "runtime_corr": abs(1 - runtime_corr), "runtime_cohen": abs(runtime_cohen_d), 
        "wait_time_corr": abs(1 - wait_time_corr), "wait_time_cohen": abs(wait_time_cohen_d), 
        "io_time_corr": abs(1 - io_time_corr), "io_time_cohen": abs(io_time_cohen_d)
    }
    """

In [6]:
from ax.service.ax_client import AxClient, ObjectiveProperties
from ax.utils.measurement.synthetic_functions import hartmann6
from ax.utils.notebook.plotting import render, init_notebook_plotting

ax_client = AxClient()

"""
# Multi-objectives version.
"runtime_corr": ObjectiveProperties(minimize=True, threshold=0.05), "runtime_cohen": ObjectiveProperties(minimize=True, threshold=0.05), 
"wait_time_corr": ObjectiveProperties(minimize=True, threshold=0.05), "wait_time_cohen": ObjectiveProperties(minimize=True, threshold=0.05), 
"io_time_corr": ObjectiveProperties(minimize=True, threshold=0.05), "io_time_cohen": ObjectiveProperties(minimize=True, threshold=0.05)
"""


ax_client.create_experiment(
    name="StorallocWrench_ThetaExperiment",
    parameters=AX_PARAMS,
    objectives={ 
        "optimization_metric": ObjectiveProperties(minimize=True, threshold=0.2),
    },
    parameter_constraints=["walltime_extension + amdahl >= 1.4", "disk_rb >= disk_wb", "permanent_storage_read_bw >= permanent_storage_write_bw"],
    outcome_constraints=[],
)

In [7]:
for i in range(45):
    parameters, trial_index = ax_client.get_next_trial()
    data = None
    try:
        data = runSimulationWithParam(parameters)
    except RuntimeError as e:
        print(e)
        ax_client.log_trial_failure(trial_index=trial_index)
        continue
    else:
        ax_client.complete_trial(trial_index=trial_index, raw_data=data)

In [8]:
best_parameters, values = ax_client.get_best_parameters()
best_parameters

In [9]:
means, covariances = values
means