# Internal control experiments

This notebooks runs some experiments with internal bandit NRM control. 

For the application of the NRM model to resource management to one
computational job, the global resource optimization problem is the following:

$$
\begin{array}{l}
    \min \quad e_{\text{total}} \\
	\text{s.t.} \quad  t > \tau t_{\text{ref}}
\end{array}
$$

Where $e_{\text{total}}$ denotes the total energy spent by the system during
the lifetime of the job, whose duration is denoted by $t^T$. We denote by
$t_{\text{ref}}$ a reference measurement of the runtime of the job on an
unmanaged system. $\tau <1$ is a parameter controlling the amount of runtime
degradation allowed for the job.

The value of this global objective can be easily measured a-posteriori for a
computational job using power instrumentation techniques. Assuming both
workload and platform behavior to be deterministic, this objective is measured
using two runs of the system: A first run without resource management to
acquire $t_{\text{ref}}$, and one run with NRM enabled. In order for NRM's
round-based control strategy to address this problem, we need an online loss
value however. This loss is obtained using the following loose assumptions:

- The passive power consumption of the node is fixed and known. [1]

- The total power consumption in a given time period can be estimated as
  the sum of the static node consumption over that period and the RAPL power
  measurement over that period. [2]

- The impact of a choice of power-cap on the job's runtime can be
  interpolated linearly from its impact on CPU counters. [3]


Denoting as in the previous section the round counter by $0<r<T$, the known
passive static power consumption by $p_{\text{static}}$, the starting time of
the job by $t^0$ and the end time of round $r$ by $t^r$, we can write the total
energy expenditure of the job based on RAPL power measurements $p^r$ using
assumptions 1 and 2 as:

$$
	e_{\text{total}} = \sum_{r=1}^{r=T} (p^r + p_{\text{static}}) (t^{r-1} - t^{r})
$$

Using assumption 3 means that we can reasonably estimate the
change in job runtime incurred by the choice of power-cap in round $r$ by
evaluating $\frac{s^r_{\text{ref}}}{s^r}$. We use this as part of our proxy
cost in two ways. First, this quantity is used to evaluate breaching of the
constraint on $t$, and second, it is used to adjust for an expected increase in
the number of rounds due to the impact on job runtime. This gives rise to the
following value for the loss at round $r$:

$$
	\ell^r = \mathbb{\huge 1}_{\left( \frac{s^r}{s^r_{\text{ref}}}>\tau \right)}
   \left( \frac{s^r_{\text{ref}}}{s^r} \left( p^r + p_{\text{static}} \right) \right)
$$


In [1]:
cd ../../..

/home/fre/workspace/hnrm


In [2]:
%%capture
%%bash
./shake.sh build # for the daemon 
./shake.sh client # for the upstream client
./shake.sh pyclient # for the shared client library

In [3]:
experimentSamplingSize = 2
powerCapRanges = [60, 75, 60, 100, 110, 120, 150, 180, 210]
staticPower = 200000000
referenceMeasurementRoundInterval = 10

In [4]:
%load_ext nb_black
import json

daemonCfgs = {}

for i in range(0, experimentSamplingSize):
    for cap in powerCapRanges:
        daemonCfgs[(i, "pcap" + str(cap))] = {
            "controlCfg": {"fixedPower": {"fromuW": cap * 1000000}}
        }
    daemonCfgs[(i, "controlOn")] = {
        "controlCfg": {
            "staticPower": {"fromuW": staticPower},
            "referenceMeasurementRoundInterval": referenceMeasurementRoundInterval,
            "learnCfg": {"lagrangeConstraint": 1},
            "speedThreshold": 0.9,
            "minimumControlInterval": {"fromuS": 1000000},
        },
        "raplCfg": {
            "raplActions": [{"fromuW": 1000000 * p} for p in powerCapRanges],
            "raplFrequency": {"fromHz": 1},
            "raplPath": "/sys/devices/virtual/powercap/intel-rapl",
        },
    }


def perfwrapped(cmd, args):
    return [
        {
            "cmd": cmd,
            "args": args,
            "sliceID": "toto",
            "manifest": {
                "app": {
                    "slice": {"cpus": 1, "mems": 1},
                    "perfwrapper": {
                        "perfLimit": {"fromOps": 100000},
                        "perfFreq": {"fromHz": 1},
                    },
                },
                "name": "perfwrap",
            },
        }
    ]


stream = perfwrapped("stream_c", [])

lammps = perfwrapped(
    "mpiexec",
    ["-n", "24", "amg", "-problem", "2", "-n", "90", "90", "90", "-P", "2", "12", "1"],
)

<IPython.core.display.Javascript object>

### Helpers

For performing experiments:

In [5]:
import time
from collections import defaultdict


def do_workload(host, daemonCfg, workload):
    host.start_daemon(daemonCfg)
    print("Starting the workload")
    host.run_workload(workload)
    history = defaultdict(list)
    # print(host.get_state())
    getCPD = True
    try:
        while host.check_daemon() and not host.workload_finished():
            measurement_message = host.workload_recv()
            msg = json.loads(measurement_message)
            if "pubMeasurements" in msg:
                if getCPD:
                    getCPD = False
                    time.sleep(3)
                    cpd = host.get_cpd()
                    print(cpd)
                    cpd = dict(cpd)
                    print("Sensor identifier list:")
                    for sensorID in [sensor[0] for sensor in cpd["sensors"]]:
                        print("- %s" % sensorID)
                    print("Actuator identifier list:")
                    for sensorID in [sensor[0] for sensor in cpd["actuators"]]:
                        print("- %s" % sensorID)
                content = msg["pubMeasurements"][1][0]
                t = content["time"]
                sensorID = content["sensorID"]
                x = content["sensorValue"]
                print(
                    ".",
                    end=""
                    # "Measurement: originating at time %s for sensor %s of value %s"
                    #% (content["time"], content["sensorID"], content["sensorValue"])
                )
                history["sensor-" + sensorID].append((t, x))
            if "pubCPD" in msg:
                print("R")
            if "pubAction" in msg:
                # print(host.get_state())
                # print(msg)
                t, contents, meta, controller = msg["pubAction"]
                if "bandit" in controller.keys():
                    for key in meta.keys():
                        history["actionType"].append((t, key))
                    if "referenceMeasurementDecision" in meta.keys():
                        print("(ref)", end="")
                    elif "initialDecision" in meta.keys():
                        print("(init)", end="")
                    elif "innerDecision" in meta.keys():
                        print("(inner)", end="")
                        counter = 0
                        for value in meta["innerDecision"]["constraints"]:
                            history["constraint-" + str(counter)].append(
                                (t, value["fromConstraintValue"])
                            )
                            counter = counter + 1
                        counter = 0
                        for value in meta["innerDecision"]["objectives"]:
                            history["objective-" + str(counter)].append(
                                (t, value["fromObjectiveValue"])
                            )
                            counter = counter + 1
                        history["loss"].append((t, meta["innerDecision"]["loss"]))
                for (arm, (visits, stat)) in controller["armstats"]:
                    history["armstat-" + str(arm)].append((t, stat))
                    history["visits-" + str(arm)].append((t, visits))
                for content in contents:
                    actuatorID = content["actuatorID"] + "(action)"
                    x = content["actuatorValue"]
                    history[actuatorID].append((t, x))
                    for arm in controller["bandit"]["lagrange"]["lagrangeConstraint"][
                        "weights"
                    ]:
                        value = arm["action"][0]["actuatorValue"]
                        history[str(value / 1000000) + "-probability"].append(
                            (t, arm["probability"]["getProbability"])
                        )
                        history[str(value / 1000000) + "-cumulativeLoss"].append(
                            (t, arm["cumulativeLoss"]["getCumulativeLoss"])
                        )
                # print(
                # "Action: originating at time %s for actuator %s of value %s"
                #% (t,actuatorID,x)
                # )
            host.check_daemon()
        print("")
    except:
        return history
    host.stop_daemon()
    return history

<IPython.core.display.Javascript object>

In [6]:
import nrm.tooling as nrm

host = nrm.Local()

<IPython.core.display.Javascript object>

In [7]:
results = {}
for key, cfg in daemonCfgs.items():
    results[key] = do_workload(host, cfg, stream)

connecting
connected to tcp://localhost:2345
Starting the workload
Problem 
    [1m[96m{[0m sensors = Map 
        [1m[93m[[0m 
            [35m([0m SensorID [36m{[0m sensorID = [1m[97m"[0m[1m[94mDownstreamCmdKey (DownstreamCmdID 5efe873b-fde9-4955-ac99-9e8c6c8118a1)[0m[1m[97m"[0m [36m}[0m
            [35m,[0m Sensor 
                [36m{[0m range = 0.0 ... 9.071503388e9
                [36m,[0m maxFrequency = 1.0
                [36m}[0m 
            [35m)[0m 
        [1m[93m,[0m 
            [35m([0m SensorID [36m{[0m sensorID = [1m[97m"[0m[1m[94mRaplKey (PackageID 0)[0m[1m[97m"[0m [36m}[0m
            [35m,[0m Sensor 
                [36m{[0m range = 0.0 ... 300.0
                [36m,[0m maxFrequency = 3.0
                [36m}[0m 
            [35m)[0m 
        [1m[93m][0m 
    [1m[96m,[0m actuators = Map 
        [1m[93m[[0m 
            [35m([0m ActuatorID [36m{[0m actuatorID = [1m[97m"[0m[1m[94mRaplKey

..................................
connecting
connected to tcp://localhost:2345
Starting the workload
Problem 
    [1m[96m{[0m sensors = Map 
        [1m[93m[[0m 
            [35m([0m SensorID [36m{[0m sensorID = [1m[97m"[0m[1m[94mDownstreamCmdKey (DownstreamCmdID ed08812b-af63-4edd-988e-80c98a4f11dc)[0m[1m[97m"[0m [36m}[0m
            [35m,[0m Sensor 
                [36m{[0m range = 0.0 ... 9.691557968e9
                [36m,[0m maxFrequency = 1.0
                [36m}[0m 
            [35m)[0m 
        [1m[93m,[0m 
            [35m([0m SensorID [36m{[0m sensorID = [1m[97m"[0m[1m[94mRaplKey (PackageID 0)[0m[1m[97m"[0m [36m}[0m
            [35m,[0m Sensor 
                [36m{[0m range = 0.0 ... 300.0
                [36m,[0m maxFrequency = 3.0
                [36m}[0m 
            [35m)[0m 
        [1m[93m][0m 
    [1m[96m,[0m actuators = Map 
        [1m[93m[[0m 
            [35m([0m ActuatorID [36m{[0m actuator

.....................................
connecting
connected to tcp://localhost:2345
Starting the workload
Problem 
    [1m[96m{[0m sensors = Map 
        [1m[93m[[0m 
            [35m([0m SensorID [36m{[0m sensorID = [1m[97m"[0m[1m[94mDownstreamCmdKey (DownstreamCmdID 069ac6a9-d440-4ae3-80bb-3f6bf2c0e17c)[0m[1m[97m"[0m [36m}[0m
            [35m,[0m Sensor 
                [36m{[0m range = 0.0 ... 9.315827066e9
                [36m,[0m maxFrequency = 1.0
                [36m}[0m 
            [35m)[0m 
        [1m[93m,[0m 
            [35m([0m SensorID [36m{[0m sensorID = [1m[97m"[0m[1m[94mRaplKey (PackageID 0)[0m[1m[97m"[0m [36m}[0m
            [35m,[0m Sensor 
                [36m{[0m range = 0.0 ... 300.0
                [36m,[0m maxFrequency = 3.0
                [36m}[0m 
            [35m)[0m 
        [1m[93m][0m 
    [1m[96m,[0m actuators = Map 
        [1m[93m[[0m 
            [35m([0m ActuatorID [36m{[0m actua

.................................
connecting
connected to tcp://localhost:2345
Starting the workload
Problem 
    [1m[96m{[0m sensors = Map 
        [1m[93m[[0m 
            [35m([0m SensorID [36m{[0m sensorID = [1m[97m"[0m[1m[94mDownstreamCmdKey (DownstreamCmdID cc115ca9-9a51-4f3c-9044-0cc90ff6b5b4)[0m[1m[97m"[0m [36m}[0m
            [35m,[0m Sensor 
                [36m{[0m range = 0.0 ... 9.469366442e9
                [36m,[0m maxFrequency = 1.0
                [36m}[0m 
            [35m)[0m 
        [1m[93m,[0m 
            [35m([0m SensorID [36m{[0m sensorID = [1m[97m"[0m[1m[94mRaplKey (PackageID 0)[0m[1m[97m"[0m [36m}[0m
            [35m,[0m Sensor 
                [36m{[0m range = 0.0 ... 300.0
                [36m,[0m maxFrequency = 3.0
                [36m}[0m 
            [35m)[0m 
        [1m[93m][0m 
    [1m[96m,[0m actuators = Map 
        [1m[93m[[0m 
            [35m([0m ActuatorID [36m{[0m actuatorI

<IPython.core.display.Javascript object>

In [10]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import scipy.integrate as integrate

from functools import reduce


def history_to_dataframe(key, history):
    iteration, name = key

    def mkdf(columnName, measurements):
        dataframe = pd.DataFrame([(k,v) for k,v in measurements])
        if "Downstream" in columnName:
            cname="sensor-Downstream"
        else:
            cname=columnName
        dataframe.columns = ["time", cname]
        return dataframe

    data_frames = [
        mkdf(columnName, measurements).melt(id_vars=["time"])
        for (columnName, measurements) in history.items()
    ]
    
    df = pd.concat(data_frames)
    df["time"] = df.time - df.time.min()
    return df.assign(name=name).assign(iteration=iteration)


result_df = pd.concat(
    [history_to_dataframe(key, history) for key, history in results.items()]
)

result_df.to_csv("dev/hnrm-experiments/bandits/internal-control-experiments.csv")


<IPython.core.display.Javascript object>