In [None]:
#!/usr/bin/env python3

import json
import re
import requests
import copy
import random
import pprint
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime, timezone, timedelta
from matplotlib_helper import *
from dataclasses import dataclass

In [None]:
# Metadata

metadata = {
    'emission_rates': {
        'ylabel': 'gCO2/s',
        'title': 'Instantaneous emission rates'
    },
    'emission_integral': {
        'ylabel': 'gCO2',
        'title': 'Emission integral over its duration'
    },
}

d_timing_labels = {
    "input_transfer_start": "Input transfer",
    # "input_transfer_start": "Start of input transfer",
    # "input_transfer_end": "End of input transfer",
    "compute_start": "Compute",
    # "compute_start": "Start of compute",
    # "compute_end": "End of compute",
    "output_transfer_start": "Output transfer",
    # "output_transfer_start": "Start of output transfer",
    # "output_transfer_end": "End of output transfer",
}

d_events = {
    'input_transfer': {
        'interval_keys': ("input_transfer_start", "input_transfer_end"),
        'label': 'Input transfer',
    },
    'compute': {
        'interval_keys': ("compute_start", "compute_end"),
        'label': 'Compute',
    },
    'output_transfer': {
        'interval_keys': ("output_transfer_start", "output_transfer_end"),
        'label': 'Output transfer',
    },
}

In [None]:
def get_max_value(data_details: dict, series_name: str):
    max_value = 0
    for region in data_details:
        compute_data = data_details[region][series_name]["compute"]
        transfer_data = data_details[region][series_name]["transfer"]
        max_value = max(max_value, max(compute_data.values(), default=0), max(transfer_data.values(), default=0))
    return max_value

def resample_timeseries(df: pd.DataFrame, interval: str):
    df["Timestamp"] = pd.to_datetime(df["Timestamp"])
    df.set_index("Timestamp", inplace=True)
    df_resampled = df.resample(interval).ffill().reset_index()
    return df_resampled

def create_dataframe_for_plotting(timeseries: dict[str, float], min_start: datetime, max_end: datetime) -> pd.DataFrame:
    """Convert a time series data to a dataframe, while removing out of bound timestamps.
    
        Args:
            timeseries: A dictionary of timestamp strings and values.
            min_start: The minimum cutoff time for the timeseries.
            max_end: The maximum cutoff time for the timeseries.
    """
    timeseries_in_datatime = {datetime.fromisoformat(key): value for key, value in timeseries.items()}
    df = pd.DataFrame(list(timeseries_in_datatime.items()), columns=["Timestamp", "Value"])
    if df.empty:
        return df
    resampled = resample_timeseries(df, "30s")
    mask = (resampled["Timestamp"] >= pd.to_datetime(min_start)) & (resampled["Timestamp"] <= pd.to_datetime(max_end))
    return resampled[mask]

def add_timing(ax, name: str, time: pd.Timestamp, max_value: float, color: str):
    if 'start' in name:
        ax.vlines(time, ymin=0, ymax=max_value, color='gray', alpha=0.5, linestyles="solid" if 'compute' in name else "dashed")
        if 'input' in name:
            ha = 'right'
            rotation = -30
        elif 'output' in name:
            ha = 'left'
            rotation = 30
        else:
            ha = 'center'
            rotation = 0
        ax.text(time, max_value, d_timing_labels[name], color=color, alpha=0.95, ha=ha, va="bottom", rotation=rotation)

In [None]:
# Plotting

def plot_carbon_api_response(data: dict, show_events=True, show_transfer_breakdown = True, show_compute=True, show_transfer=True):
    fig, axes = plt.subplots(1, 1, figsize=(12, 6))

    d_region_colors = {}
    for region in data['details']:
        d_region_colors[region] = get_next_color()

    # Extract emission integral data
    for (ax, series_name) in zip([axes], ["emission_rates", "emission_integral"]):
        if series_name == 'emission_integral':
            continue
        # # Get max value for y-axis
        # max_y_value = get_max_value(data['details'], series_name)

        for region in data['details']:
            # if region == 'Azure:eastus':
            #     continue
            print(f"Plotting {region} - {series_name}")
            compute_data = data["details"][region][series_name]["compute"]
            transfer_data = data["details"][region][series_name]["transfer"]
            transfer_network_data = data["details"][region][series_name]["transfer.network"]
            transfer_endpoint_data = data["details"][region][series_name]["transfer.endpoint"]
            timings = data["details"][region]['timings'][0] # Assume single occurence per job
            min_start = datetime.fromisoformat(timings['min_start'])
            max_end = datetime.fromisoformat(timings['max_end'])
            carbon_emissions_compute = data['raw-scores'][region]['carbon-emission-from-compute']
            carbon_emissions_transfer = data['raw-scores'][region]['carbon-emission-from-migration']

            # Convert timestamp strings to datetime objects
            compute_df = create_dataframe_for_plotting(compute_data, min_start, max_end)
            transfer_df = create_dataframe_for_plotting(transfer_data, min_start, max_end)
            transfer_network_df = create_dataframe_for_plotting(transfer_network_data, min_start, max_end)
            transfer_endpoint_df = create_dataframe_for_plotting(transfer_endpoint_data, min_start, max_end)

            # Plot timeseries data as step functions
            color = d_region_colors[region]
            if show_compute:
                compute_time = pd.to_timedelta(timings['compute_duration']).floor('s').to_pytimedelta()
                label_compute = f"{region} - Compute ({carbon_emissions_compute:.3f} gCO2 over {compute_time})"
                ax.step(compute_df["Timestamp"], compute_df["Value"], label=label_compute, color=color, linestyle="solid")
            if show_transfer and not transfer_df.empty:
                hop_count = data["details"][region]["route.hop_count"]
                carbon_route_raw_strings = data["details"][region]['route']
                router_hop_isos = '|'.join(filter(lambda x: x is not None, map(lambda x: parse_carbon_route(x, "region"), carbon_route_raw_strings)))
                total_transfer_time = pd.to_timedelta(timings['total_transfer_time']).floor('s').to_pytimedelta()
                if show_events:
                    label_transfer = f"{region} - Transfer ({carbon_emissions_transfer:.3f} gCO2 over {total_transfer_time})"
                else:
                    label_transfer = f"{region} - Transfer ({hop_count} hops: {router_hop_isos})"
                ax.step(transfer_df["Timestamp"], transfer_df["Value"], label=label_transfer, color=color, linestyle="dashed")
                if show_transfer_breakdown:
                    ax.step(transfer_network_df["Timestamp"], transfer_network_df["Value"], label=f"{region} - Transfer (network)", color=color, linestyle="dotted")
                    ax.step(transfer_endpoint_df["Timestamp"], transfer_endpoint_df["Value"], label=f"{region} - Transfer (endpoint)", color=color, linestyle="dashdot")

            # Add events based on the timings
            max_y_value = max(compute_df["Value"].max(), transfer_df["Value"].max())
            for event in d_events if show_events else []:
                df = compute_df if event == 'compute' else transfer_df
                if df.empty:
                    continue
                # Vertical lines and texts
                for name in d_events[event]['interval_keys']:
                    add_timing(ax, name, pd.to_datetime(timings[name]), max_y_value, color=d_region_colors[region])
                # Fill area for events under the curve
                (start_event, end_event) = d_events[event]['interval_keys']
                mask = (df['Timestamp'] >= pd.to_datetime(timings[start_event])) & (df['Timestamp'] <= pd.to_datetime(timings[end_event]))
                alpha = 0.5 if 'compute' in event else 0.25
                ax.fill_between(x=df['Timestamp'], y1=df['Value'], where=mask, color=color, alpha=alpha)

        ax.set_title(metadata[series_name]['title'])
        ax.set_xlabel("Time")
        ax.set_ylabel(metadata[series_name]['ylabel'])
        ax.grid(True)

    plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
    plt.show()


In [None]:
def parse_carbon_route(route: str, output="coordinates"):
    # route = "Router at (39.0127, -77.5342) (emap:US-MIDA-PJM)"
    regex = r"Router at \((?P<lat>-?\d+\.\d+), (?P<lon>-?\d+\.\d+)\) \(emap:(?P<region>.+)\)"
    match = re.match(regex, route)
    if match:
        if output == "coordinates":
            return (float(match.group("lat")), float(match.group("lon")))
        elif output == "region":
            return match.group("region")
        elif output is None:
            return match.groupdict()
        else:
            raise ValueError(f"Invalid output type: {output}")
    else:
        return None

In [None]:
runtime_s = timedelta(seconds=3600).total_seconds()

# See parameter definition in class `Workload` at https://github.com/c3lab-net/energy-data/blob/master/api/models/workload.py
payload = {
    "runtime": runtime_s,
    "schedule": {
        "type": "onetime",
        "start_time": "2022-01-02T00:00:00-00:00",
        "max_delay": 24*3600 - runtime_s
    },
    "dataset": {
        "input_size_gb": 150,
        "output_size_gb": 150,
    },
    # Provide EITHER candidate_providers OR candidate_locations
    "candidate_providers": [
        "AWS"
    ],
    # "candidate_locations": [
    #     {
    #         "id": "AWS:us-east-1"
    #     },
    #     {
    #         "id": "AWS:us-west-1"
    #     },
    #     {
    #         "id": "AWS:eu-central-1"
    #     },
    # ],
    "use_prediction": False,
    # emap has the most coverage worldwide
    "carbon_data_source": "emap",
    "watts_per_core": 5,
    "core_count": 20,
    "original_location": "AWS:us-east-1",
    # Note: turn off optimize_carbon if you don't need the timing information, e.g. just raw timeseries carbon data
    "optimize_carbon": True,
    # "use_new_optimization": True,   # defaults value, can ignore
    # Most of the case we consider both compute and network
    "carbon_accounting_mode": "compute-and-network",
    # Accepted values are defined in enum `InterRegionRouteSource` at https://github.com/c3lab-net/energy-data/blob/master/api/models/cloud_location.py
    "inter_region_route_source": "itdk",
}

In [None]:
CARBON_API_URL='http://yak-03.sysnet.ucsd.edu/carbon-aware-scheduler/'

# Make the API call
response = requests.get(CARBON_API_URL, json=payload)

# Check if the API call was successful (status code 200)
assert response.ok, f"Error: API call failed with status code {response.status_code}: {response.text}"
data = response.json(parse_float=lambda s: float('%.6g' % float(s)))
print(json.dumps(data, indent=4))

In [None]:
# Show all information (works best without too many regions)
# plot_carbon_api_response(data)

# Show only transfer without breakdown
plot_carbon_api_response(data, show_events=False, show_transfer_breakdown=False, show_compute=False)

In [None]:
# Profile individual workloads

payload = {
    "runtime": 3600,
    "schedule": {
        "type": "onetime",
        "start_time": "2022-01-02T00:00:00-07:00",
        "max_delay": 3600 * 24 - 3600
    },
    "dataset": {
        "input_size_gb": 20,
        "output_size_gb": 12
    },
    "candidate_locations": [
        {
            "id": "AWS:us-east-1"
        },
        {
            "id": "AWS:us-west-1"
        },
    ],
    "use_prediction": False,
    "carbon_data_source": "emap",
    "watts_per_core": 125/20,
    "core_count": 1,
    "original_location": "AWS:us-east-1",
    "carbon_accounting_mode": "compute-and-network",
    "optimize_carbon": True,
    "use_new_optimization": True,
}

@dataclass
class Workload:
    name: str
    core_count: int
    runtime_s: float
    input_gb: float
    output_gb: float

"""csv
name,core_count,runtime_s,input_gb,output_gb
compression (gzip),13,2.1,1.2,0.19
compression (bzip),31,3.2,1.2,0.142
code compilation (linux v5.16),40,391,1.2,0.195
video resize (4k->1080p),4,261,4.6,0.251
video effect (4k, h.264),8,307,1.2,0.122
video effect (4k, h.265),8,307,0.15,0.117
ML inference (resnet50, 1k images),1,246,0.154,0
"""

profiled_workloads = [
    Workload("compression (gzip)", 13, 2.1, 1.2, 0.19),
    Workload("compression (bzip)", 31, 3.2, 1.2, 0.142),
    Workload("code compilation (linux v5.16)", 40, 391, 1.2, 0.195),
    Workload("video resize (4k->1080p)", 4, 261, 4.6, 0.251),
    Workload("video effect (4k, h.264)", 8, 307, 1.2, 0.122),
    Workload("video effect (4k, h.265)", 8, 307, 0.15, 0.117),
    Workload("ML inference (resnet50, 1k images)", 1, 246, 0.154, 0),
]

CARBON_API_URL = 'http://yak-03.sysnet.ucsd.edu/carbon-aware-scheduler/'

for workload in profiled_workloads:
    workload.runtime_s *= 10
    workload.input_gb *= 10
    workload.output_gb *= 10
    payload['runtime'] = workload.runtime_s
    payload['schedule']['max_delay'] = timedelta(days=1).total_seconds() - workload.runtime_s
    payload['dataset']['input_size_gb'] = workload.input_gb
    payload['dataset']['output_size_gb'] = workload.output_gb
    payload['core_count'] = workload.core_count
    payload['use_new_optimization'] = False
    print(f'{workload.name} - {workload.core_count}c * {workload.runtime_s}s', f'{workload.input_gb}G/{workload.output_gb}G')
    print('Core-hours/GB:', workload.core_count * workload.runtime_s / timedelta(hours=1).total_seconds() / (workload.input_gb + workload.output_gb))

    response = requests.get(CARBON_API_URL, json=payload)
    assert response.ok, f"Error: API call failed with status code {response.status_code}: {response.text}"
    data = response.json(parse_float=lambda s: float('%.6g' % float(s)))
    # print(json.dumps(data, indent=4))
    plot_carbon_api_response(data)



In [None]:
# Ad-hoc: verify that new optimization is no worse than the old one, by sweeping over all parameters

payload = {
    "runtime": 5400,
    "schedule": {
        "type": "onetime",
        "start_time": "2023-05-24T22:00:00-05:00",
        "max_delay": 36000
    },
    "dataset": {
        "input_size_gb": 20,
        "output_size_gb": 12
    },
    "candidate_locations": [
        {
            "id": "AWS:us-east-1"
        },
        {
            "id": "AWS:us-west-1"
        },
        # {
        #     "id": "AWS:us-west-1"
        # },
        # {
        #     "id": "AWS:us-west-1"
        # },
        # {
        #     "id": "AWS:us-west-1"
        # },
        # {
        #     "id": "AWS:us-west-1"
        # },
        # {
        #     "id": "AWS:us-west-1"
        # },
    ],
    "use_prediction": False,
    "carbon_data_source": "emap",
    "watts_per_core": 5,
    "core_count": 40,
    "original_location": "AWS:us-east-1",
    "carbon_accounting_mode": "compute-and-network",
    "optimize_carbon": True,
}


def compare_results(payload1: dict, payload2: dict):
    response1 = requests.get(CARBON_API_URL, json=payload1)
    response2 = requests.get(CARBON_API_URL, json=payload2)
    if response1.ok != response2.ok:
        print("Inconsistent response status")
        print(payload)
        print(response1)
        print(response2)
        return
    elif response1.ok == False:
        print("Both responses are not ok")
        print(payload)
        print(response1.status_code)
        print(response1.json())
        return
    parse_float=lambda s: float('%.9g' % float(s))
    json1 = response1.json(parse_float=parse_float)
    json2 = response2.json(parse_float=parse_float)
    del json1['request']
    del json2['request']
    if json1 != json2:
        should_report = False
        for region in json2['raw-scores']:
            if json2['raw-scores'][region]['carbon-emission'] > json1['raw-scores'][region]['carbon-emission']:
                print(f"Worse result for {region}")
                should_report = True
        if should_report:
            print("Inconsistent response content")
            print(payload)
            print(json1)
            print(json2)
            return
    else:
        print("Same result")
    print(json1)
    print(json2)
    print('old:', end='')
    for region in json1['raw-scores']:
        print('\t', region, json1['raw-scores'][region]['carbon-emission'], end='')
    print()
    print('new:', end='')
    for region in json2['raw-scores']:
        print('\t', region, json2['raw-scores'][region]['carbon-emission'], end='')
    print()

pp = pprint.PrettyPrinter(indent=4)

CARBON_API_URL='http://yak-03.sysnet.ucsd.edu/carbon-aware-scheduler/'

# # Fuzzing errorneous input
# payloads = [
#     {'runtime': 14802, 'schedule': {'type': 'onetime', 'start_time': '2022-08-18T12:18:57+00:00', 'max_delay': 36000}, 'dataset': {'input_size_gb': 20, 'output_size_gb': 69}, 'candidate_locations': [{'id': 'AWS:us-east-1'}, {'id': 'AWS:us-west-1'}], 'use_prediction': False, 'carbon_data_source': 'emap', 'watts_per_core': 5, 'core_count': 5, 'original_location': 'AWS:us-east-1', 'carbon_accounting_mode': 'compute-and-network', 'optimize_carbon': True, 'use_new_optimization': False},
# ]

# for payload in payloads:
#     payload_new_optimization = copy.deepcopy(payload)
#     payload_new_optimization['use_new_optimization'] = True
#     compare_results(payload, payload_new_optimization)

# Make the API call
for i in range(365):
    for runtime_seconds in [random.randint(1, 3600 * 24) for _ in range(1)]:
        start_time = datetime(2022, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + timedelta(days=i)
        start_time += timedelta(seconds=random.randint(0, 3600 * 24))
        data_input_size_gb = random.randint(1, 100)
        data_output_size_gb = random.randint(1, 100)
        core_count = random.randint(1, 100)

        payload['schedule']['start_time'] = start_time.isoformat()
        payload['runtime'] = runtime_seconds
        payload['dataset']['input_size_gb'] = data_input_size_gb
        payload['dataset']['output_size_gb'] = data_output_size_gb
        payload['core_count'] = core_count
        payload['use_new_optimization'] = False
        print(start_time, f'{core_count}c * {runtime_seconds}s', f'{data_input_size_gb}G/{data_output_size_gb}G')

        payload_new_optimization = copy.deepcopy(payload)
        payload_new_optimization['use_new_optimization'] = True
        compare_results(payload, payload_new_optimization)
