# 1. Topology

- Topologies define the available hardware of a datacenter
- Defined using JSON format

A topology based on surfsara LISA cluster ([file](topologies/surfsara.json)):
```json
{
    "clusters":
    [
        {
            "name": "C01",
            "hosts" :
            [
                {
                    "name": "H01",
                    "cpu":
                    {
                        "coreCount": 16,
                        "coreSpeed": 2100
                    },
                    "memory": {
                        "memorySize": 100000
                    },
                    "count": 279
                }
            ],
            "powerSource": {
                "carbonTracePath": "carbon_traces/carbon_2022.parquet"
            }
        }
    ]
}
``` 

# 2. Workloads

- Workload traces are a recording of the tasks executed on a system 

- Graph Greenifier requires two different traces:
    - **tasks.parquet** provides a general overview of the tasks executed during the workload.
    - **fragments.parquet** provides detailed information of each task during its runtime 

In [1]:
import pandas as pd

df_tasks = pd.read_parquet("workload_traces/2022-10-01_2022-10-31/tasks.parquet")
df_fragments = pd.read_parquet("workload_traces/2022-10-01_2022-10-31/fragments.parquet")

## Workload Traces

- Traces define which tasks should be executed and when

- Provided as bag-of-task

- In this demo we run 1 month of Surfsara Lisa -> 68,648 tasks

In [2]:
df_tasks.head()

Unnamed: 0,id,submission_time,duration,cpu_count,cpu_capacity,mem_capacity
0,2081278,2022-09-30 22:00:00,1194000,16,33600.0,100000
1,2081285,2022-09-30 22:00:00,7176000,16,33600.0,100000
2,2081310,2022-09-30 22:00:00,17218000,16,33600.0,100000
3,2081317,2022-09-30 22:00:00,17605000,16,33600.0,100000
4,2081324,2022-09-30 22:00:00,19007000,16,33600.0,100000


### Fragments

<img src="./figures/fragments.jpg" width=600, alt="Alternative text" />

##### One month of Surfsara tasks has 10,219,665

In [3]:
df_fragments.head()

Unnamed: 0,id,duration,cpu_count,cpu_usage
0,2081278,30000,16,14721.0
1,2081278,30000,16,14700.0
2,2081278,30000,16,14700.0
3,2081278,30000,16,14700.0
4,2081278,30000,16,14868.0


# Carbon Trace

Carbon Traces define the Carbon Intenisty of the available energy over time

Gathered using ENTSO-E

Specific to the location of the data center

Defined as a parquet file

In [4]:
df_carbon = pd.read_parquet("carbon_traces/carbon_2022.parquet")

df_carbon.head()

Unnamed: 0,timestamp,carbon_intensity
0,2021-12-31 23:00:00,168.138693
1,2021-12-31 23:15:00,167.050014
2,2021-12-31 23:30:00,164.552936
3,2021-12-31 23:45:00,167.493769
4,2022-01-01 00:00:00,164.517793


# Experiment

- An Experiment defines what the Graph Greenifier should run, and how.

- Eperiments are defined using a JSON format

Example Experiment ([file](experiments/surfsara_month.json)):
```json
{
    "name": "surfsara_month",
    "topologies": [{
        "pathToFile": "topologies/surfsara.json"
    }],
    "workloads": [{
        "pathToFile": "workload_traces/2022-10-01_2022-10-31",
        "type": "ComputeWorkload"
        }],
    "allocationPolicies": [{
        "type": "prefab",
        "policyName": "Mem"
    }],
    "exportModels": [
        {
            "exportInterval": 3600,
            "computeExportConfig": {
                "hostExportColumns": ["power_draw", "energy_usage", "cpu_usage", "cpu_utilization"],
                "taskExportColumns": ["submission_time", "schedule_time", "finish_time", "task_state"],
                "serviceExportColumns": ["tasks_total", "tasks_pending", "tasks_active", "tasks_completed", "tasks_terminated", "hosts_up"]
            }
        }
    ]
}
```

**Note:** Parameters are defined as lists, this makes it easy to run multiple experiments

# Running OpenDC

- OpenDC is executed directly from the terminal

- Only an experiment file is needed

In [7]:
import subprocess

pathToScenario = "experiments/surfsara_month.json"
result = subprocess.run(["OpenDCExperimentRunner/bin/OpenDCExperimentRunner.bat", "--experiment-path", pathToScenario])

## 6. Output

In [8]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


df_host = pd.read_parquet("output/surfsara_month/raw-output/0/seed=0/host.parquet")
df_powerSource = pd.read_parquet("output/surfsara_month/raw-output/0/seed=0/powerSource.parquet")
df_task = pd.read_parquet("output/surfsara_month/raw-output/0/seed=0/task.parquet")
df_service = pd.read_parquet("output/surfsara_month/raw-output/0/seed=0/service.parquet")


ArrowInvalid: Could not open Parquet input source '<Buffer>': Parquet file size is 4 bytes, smaller than the minimum file footer (8 bytes)

### Host
- Information about the host at each timestamp. 
- Examples of metrics: 
    - cpu_utilization
    - power_draw 
    - energy_usage 

In [None]:
print(f"The host file contains the following columns:\n {np.array(df_host.columns)}\n")
print(f"The host file consist of {len(df_host)} samples\n")
df_host.head()

### Tasks
- The task file contains all information about the different tasks at each timestamp. 
- Example use cases:
    - when is a task run
    - How long did it take
    - on which host was a task executed

In [None]:
print(f"The task file contains the following columns:\n {np.array(df_task.columns)}\n")
print(f"The task file consist of {len(df_task)} samples\n")
df_task.head()

### PowerSource
- The task file contains all information about the power sources at each timestamp. 
- Example use cases:
    - What is the total energy used during the workload?

In [None]:
print(f"The task file contains the following columns:\n {np.array(df_powerSource.columns)}\n")
print(f"The power file consist of {len(df_powerSource)} samples\n")
df_powerSource.head()

### Service

- The service file contains genaral information about the experiments. 
- Example uses:
    - How many tasks are running?
    - How many hosts are up?

In [None]:
print(f"The service file contains the following columns:\n {np.array(df_service.columns)}\n")
print(f"The service file consist of {len(df_service)} samples\n")
df_service.head()

## 7. Performance

- To properly compare the different experiments, we would like to aggregate them into meaningfull values.

In [None]:
runtime = pd.to_timedelta(df_service.timestamp.max() - df_service.timestamp.min(), unit="ms")
utilization = df_host.cpu_utilization.mean()


print(f"The small data center had a total runtime of {runtime}")
print(f"On average, the utilization of each host is {utilization * 100:.2f}%")

In [None]:
energy = df_powerSource.energy_usage.sum() / 3_600_000 # convert energy to kWh
carbon = df_powerSource.carbon_emission.sum() / 1000


print(f"The small data center used {energy:.2f} kWh during the workload")
print(f"The small data center used {carbon:.2f} kg CO2 during the workload")

In [None]:
import matplotlib.dates as mdates

timestamps = pd.to_datetime(df_powerSource[:-1].timestamp_absolute, unit="ms")


fig, ax = plt.subplots(figsize=(10,5))
ax.plot(timestamps, df_powerSource[:-1].carbon_emission/1000)

plt.title("Carbon emission during a month long surfsara workload", fontsize=15)
plt.xlabel("Date (yy-mm-dd)", fontsize=13)
plt.ylabel("Carbon Emission (kgCO2/h)", fontsize=13)
ax.xaxis.set_major_locator(plt.MaxNLocator(10))
myFmt = mdates.DateFormatter('%y-%m-%d')
ax.xaxis.set_major_formatter(myFmt)

plt.show()

# Accuracy was verified in the FootPrinter Paper

- One week of Surfsara trace

- Sampled every 30 seconds

- MAPE of 3.15%

<img src="./figures/validation.jpg" width=600, alt="Alternative text" />

### Footprinter was also used to compare the same workload in different locations

<img src="./figures/location_comparison.jpg" width=600, alt="Alternative text" />

#### The Location of a data center has a significant effect on the Carbon Emission

# Failures

- Machines can fail during operation

- OpenDC can simulate host failures 
    - Failure Traces
    - Failure Models

To use failures, we have to update the experiment file:

```json
{
    "name": "surfsara_day_failures",
    "topologies": [{
        "pathToFile": "topologies/surfsara_small.json"
    }],
    "workloads": [{
        "pathToFile": "workload_traces/2022-10-01_2022-10-31",
        "type": "ComputeWorkload"
        }],
    "allocationPolicies": [{
        "type": "prefab",
        "policyName": "Mem"
    }],
    "exportModels": [
        {
            "exportInterval": 3600,
            "computeExportConfig": {
                "hostExportColumns": ["power_draw", "energy_usage", "cpu_usage", "cpu_utilization"],
                "taskExportColumns": ["submission_time", "schedule_time", "finish_time", "task_state"],
                "serviceExportColumns": ["tasks_total", "tasks_pending", "tasks_active", "tasks_completed", "tasks_terminated", "hosts_up"]
            }
        }
    ],
    "failureModels": [
        {
        "type": "trace-based",
        "pathToFile": "failure_traces/Whatsapp_user_reported.parquet"
    }],
    "maxNumFailures": [10]
}
```

In [None]:
import subprocess

pathToScenario = "experiments/surfsara_month_failures.json"
subprocess.run(["OpenDCExperimentRunner/bin/OpenDCExperimentRunner", "--experiment-path", pathToScenario], shell = True)

CompletedProcess(args=['OpenDCExperimentRunner/bin/OpenDCExperimentRunner', '--experiment-path', 'experiments/surfsara_month_failures.json'], returncode=1)

In [None]:
df_host_failures = pd.read_parquet("output/surfsara_month_failures/raw-output/0/seed=0/host.parquet")
df_powerSource_failures = pd.read_parquet("output/surfsara_month_failures/raw-output/0/seed=0/powerSource.parquet")
df_task_failures = pd.read_parquet("output/surfsara_month_failures/raw-output/0/seed=0/task.parquet")
df_service_failures = pd.read_parquet("output/surfsara_month_failures/raw-output/0/seed=0/service.parquet")

In [None]:
import matplotlib.dates as mdates

timestamps = pd.to_datetime(df_service[:-1].timestamp_absolute, unit="ms")
timestamps_failures = pd.to_datetime(df_service_failures[:-1].timestamp_absolute, unit="ms")


fig, ax = plt.subplots(figsize=(10,5))
ax.plot(timestamps, df_service[:-1].tasks_active, label="no failures")
ax.plot(timestamps_failures, df_service_failures[:-1].tasks_active, label="failures")

plt.title("Tasks active during a workload")
plt.xlabel("Time")
plt.ylabel("Carbon Emission (CO2/h)")
ax.xaxis.set_major_locator(plt.MaxNLocator(3))
myFmt = mdates.DateFormatter('%y-%m-%d %H:%M:%S')
ax.xaxis.set_major_formatter(myFmt)

plt.legend()
plt.show()

In [None]:
import matplotlib.dates as mdates

timestamps = pd.to_datetime(df_powerSource[:-1].timestamp_absolute, unit="ms")
timestamps_failures = pd.to_datetime(df_powerSource_failures[:-1].timestamp_absolute, unit="ms")


fig, ax = plt.subplots(figsize=(10,5))
ax.plot(timestamps, df_powerSource[:-1].energy_usage, label="no failures", linewidth=2)
ax.plot(timestamps_failures, df_powerSource_failures[:-1].energy_usage, label="failures", linewidth=2)

plt.title("Energy Usage during a workload (with and without failures)", fontsize=15)
plt.xlabel("Time", fontsize=15)
plt.ylabel("Energy Usage (kWh)", fontsize=15)
ax.xaxis.set_major_locator(plt.MaxNLocator(3))
myFmt = mdates.DateFormatter('%y-%m-%d %H:%M:%S')
ax.xaxis.set_major_formatter(myFmt)

plt.legend(fontsize=15)
plt.show()

In [None]:
runtime_normal = pd.to_timedelta(df_service.timestamp.max() - df_service.timestamp.min(), unit="ms")
runtime_failures = pd.to_timedelta(df_service_failures.timestamp.max() - df_service_failures.timestamp.min(), unit="ms")

print("RUNTIME")
print(f"The workload without failures took:   {runtime_normal}")
print(f"The workload with failures took:      {runtime_failures}")
print(f"This is an increase of                {(runtime_failures - runtime_normal) / runtime_normal * 100:.2f}%\n\n")

energy_normal = df_powerSource.energy_usage.sum()
energy_failures = df_powerSource_failures.energy_usage.sum()

print("ENERGY")
print(f"The workload without failures used    {energy_normal / 3_600_000:.0f}  kWh")
print(f"The workload with failures used       {energy_failures / 3_600_000:.0f} kWh")
print(f"This is an increase of                {(energy_failures - energy_normal) / energy_normal * 100:.2f} %\n\n")

carbon_normal = df_powerSource.carbon_emission.sum()
carbon_failures = df_powerSource_failures.carbon_emission.sum()

print("CARBON")
print(f"The workload without failures emitted {carbon_normal:.0f} grams of carbon")
print(f"The workload with failures emitted    {carbon_failures:.0f} grams of carbon")
print(f"This is an increase of                {(carbon_failures - carbon_normal) / carbon_normal * 100:.2f} %")


# Checkpointing

- Tasks need to fully rerun on failure

- OpenDC can simulate Task checkpointing 
    1. Tasks take periodic snapshots of their state
    2. Tasks can be restarted at the snapshot

To use Checkpointing, we have to update the experiment file:

```json
{
    "name": "surfsara_month_checkpoint",
    "runs": 1,
    "topologies": [{
        "pathToFile": "topologies/surfsara.json"
    }],
    "workloads": [{
        "pathToFile": "workload_traces/2022-10-01_2022-10-31",
        "type": "ComputeWorkload"
        }],
    "allocationPolicies": [{
        "type": "prefab",
        "policyName": "Mem"
    }],
    "exportModels": [
        {
            "exportInterval": 3600,
            "computeExportConfig": {
                "hostExportColumns": ["power_draw", "energy_usage", "cpu_usage", "cpu_utilization"],
                "taskExportColumns": ["submission_time", "schedule_time", "finish_time", "task_state"],
                "serviceExportColumns": ["tasks_total", "tasks_pending", "tasks_active", "tasks_completed", "tasks_terminated", "hosts_up"]
            }
        }
    ],
    "failureModels": [
        {
        "type": "trace-based",
        "pathToFile": "failure_traces/Whatsapp_user_reported.parquet"
    }],
    "checkpointModels": [
        {
            "checkpointInterval": 600000,
            "checkpointDuration": 60000,
            "checkpointIntervalScaling": 1.0
        }
    ]
}    
```

In [None]:
import subprocess

pathToScenario = "experiments/surfsara_month_checkpoint.json"
subprocess.run(["OpenDCExperimentRunner/bin/OpenDCExperimentRunner", "--experiment-path", pathToScenario])

In [None]:
df_host_checkpoint = pd.read_parquet("output/surfsara_month_checkpoint/raw-output/0/seed=0/host.parquet")
df_powerSource_checkpoint = pd.read_parquet("output/surfsara_month_checkpoint/raw-output/0/seed=0/powerSource.parquet")
df_task_checkpoint = pd.read_parquet("output/surfsara_month_checkpoint/raw-output/0/seed=0/task.parquet")
df_service_checkpoint = pd.read_parquet("output/surfsara_month_checkpoint/raw-output/0/seed=0/service.parquet")

In [None]:
import matplotlib.dates as mdates

timestamps = pd.to_datetime(df_service[:-1].timestamp_absolute, unit="ms")
timestamps_failures = pd.to_datetime(df_service_failures[:-1].timestamp_absolute, unit="ms")
timestamps_checkpoint = pd.to_datetime(df_service_checkpoint[:-1].timestamp_absolute, unit="ms")


fig, ax = plt.subplots(figsize=(10,5))
ax.plot(timestamps, df_service[:-1].tasks_active, label="no failures")
ax.plot(timestamps_failures, df_service_failures[:-1].tasks_active, label="failures")
ax.plot(timestamps_checkpoint, df_service_checkpoint[:-1].tasks_active, label="checkpoint")

plt.title("Tasks active during a workload")
plt.xlabel("Time")
plt.ylabel("Carbon Emission (CO2/h)")
ax.xaxis.set_major_locator(plt.MaxNLocator(3))
myFmt = mdates.DateFormatter('%y-%m-%d %H:%M:%S')
ax.xaxis.set_major_formatter(myFmt)

plt.legend()
plt.show()

In [None]:
import matplotlib.dates as mdates

timestamps = pd.to_datetime(df_powerSource[:-1].timestamp_absolute, unit="ms")
timestamps_failures = pd.to_datetime(df_powerSource_failures[:-1].timestamp_absolute, unit="ms")
timestamps_checkpoint = pd.to_datetime(df_powerSource_checkpoint[:-1].timestamp_absolute, unit="ms")


fig, ax = plt.subplots(figsize=(10,5))
ax.plot(timestamps, df_powerSource[:-1].energy_usage, label="no failures", linewidth=2)
ax.plot(timestamps_failures, df_powerSource_failures[:-1].energy_usage, label="failures", linewidth=2)
ax.plot(timestamps_checkpoint, df_powerSource_checkpoint[:-1].energy_usage, label="checkpoint", linewidth=2)

plt.title("Energy Usage during a workload (with and without failures)", fontsize=15)
plt.xlabel("Time", fontsize=15)
plt.ylabel("Energy Usage (kWh)", fontsize=15)
ax.xaxis.set_major_locator(plt.MaxNLocator(3))
myFmt = mdates.DateFormatter('%y-%m-%d %H:%M:%S')
ax.xaxis.set_major_formatter(myFmt)

plt.legend(fontsize=15)
plt.show()

In [None]:
runtime_normal = pd.to_timedelta(df_service.timestamp.max() - df_service.timestamp.min(), unit="ms")
runtime_failures = pd.to_timedelta(df_service_failures.timestamp.max() - df_service_failures.timestamp.min(), unit="ms")
runtime_checkpoint = pd.to_timedelta(df_service_checkpoint.timestamp.max() - df_service_checkpoint.timestamp.min(), unit="ms")

print("RUNTIME")
print(f"The workload without failures took:        {runtime_normal}")
print(f"The workload with failures took:           {runtime_failures}")
print(f"The workload with checkpointing took:      {runtime_checkpoint}")
print(f"Failures caused an increase of             {(runtime_failures - runtime_normal) / runtime_normal * 100:.2f}%")
print(f"Checkpoints caused an increase of          {(runtime_checkpoint - runtime_normal) / runtime_normal * 100:.2f}%\n\n")

energy_normal = df_powerSource.energy_usage.sum()
energy_failures = df_powerSource_failures.energy_usage.sum()
energy_checkpoint = df_powerSource_checkpoint.energy_usage.sum()

print("ENERGY")
print(f"The workload without failures used         {energy_normal / 3_600_000:.0f} kWh")
print(f"The workload with failures used            {energy_failures / 3_600_000:.0f} kWh")
print(f"The workload with checkpointing used       {energy_checkpoint / 3_600_000:.0f} kWh")
print(f"Failures caused an increase of             {(energy_failures - energy_normal) / energy_normal * 100:.2f} %")
print(f"Checkpoints caused an increase of          {(energy_checkpoint - energy_normal) / energy_normal * 100:.2f} %\n\n")

carbon_normal = df_powerSource.carbon_emission.sum()
carbon_failures = df_powerSource_failures.carbon_emission.sum()
carbon_checkpoint = df_powerSource_checkpoint.carbon_emission.sum()

print("CARBON")
print(f"The workload without failures emitted      {carbon_normal:.0f} grams of carbon")
print(f"The workload with failures emitted         {carbon_failures:.0f} grams of carbon")
print(f"The workload with checkpointing emitted    {carbon_checkpoint:.0f} grams of carbon")
print(f"Failures caused an increase of             {(carbon_failures - carbon_normal) / carbon_normal * 100:.2f} %")
print(f"Checkpoints caused an increase of          {(carbon_checkpoint - carbon_normal) / carbon_normal * 100:.2f} %\n\n")

print("TASKS TERMINATED")
print(f"With failures, the number of terminated tasks was:      {df_service_failures.tasks_terminated.sum()}")
print(f"With checkpoints, the number of terminated tasks was:   {df_service_checkpoint.tasks_terminated.sum()}")