In [1]:
import pandas as pd

carbon_policy_labels = {
    "carbon_waiting": "Lowest-\nWindow",
    "carbon_oracle":  "Lowest-\nWindow*",
    "carbon_lowest": "Lowest-\nSlot",
    "suspend-resume_oracle": "Wait AWhile",
    "suspend-resume-threshold_oracle": "Ecovisor",
    "carbon_cst_oracle": "Carbon-\nTime*",
    "carbon_cst_average": "Carbon-\nTime",
    "cost_oracle": "AllWait-\nThreshold",
    "carbon-cost_cst_average": "RES-First-\nCarbon-Time",
    "carbon-cost_waiting": "RES-First-\nLowest-\nWindow",
    "carbon-spot_cst_average": "Spot-First-\nCarbon-Time",
    "suspend-resume-spot_oracle": "Spot-First-\nWaitAwhile",
    "suspend-resume-spot-threshold_oracle": "Spot-First-\nEcovisor",
    "carbon-cost-spot_cst_average": "SPOT-RES-\nCarbon-Time"
}

# steal / copy this one from the existing GAIA notebooks
def load_task_details(cluster_type, task_trace, scheduling_policy, carbon_start_index, carbon_policy, carbon_trace, reserved, waiting_times_str):
    if cluster_type =="slurm":
        file_name = f"../results/{cluster_type}/{task_trace}/slurm-details-{scheduling_policy}-{carbon_start_index}-{carbon_policy}-{carbon_trace}-{reserved}-{waiting_times_str}.csv"             
    else:
        file_name = f"../results/{cluster_type}/{task_trace}/details-{scheduling_policy}-{carbon_start_index}-{carbon_policy}-{carbon_trace}-{reserved}-{waiting_times_str}.csv"             
    df = pd.read_csv(file_name)
    df["carbon_policy"] = carbon_policy_labels[scheduling_policy+"_"+carbon_policy]
    df["scheduling_policy"] = scheduling_policy
    df["start_index"] = carbon_start_index
    df["task_trace"] = task_trace    
    df = df[df['ID'] != -1]
    return df

In [2]:
import plotly.express as px
from datetime import datetime
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pytz

import importlib.util
import sys


spec = importlib.util.spec_from_file_location("carbon", "../src/carbon.py")
foo = importlib.util.module_from_spec(spec)
sys.modules["carbon"] = foo
spec.loader.exec_module(foo)

import carbon


traces = ['phased'] # , "pai_200"

waiting_times = ["48", "6x24", "4"]

scheduling_policies = [ 
   #  ("carbon", "lowest"),
    ("carbon", "oracle"),
   #  ("carbon", "cst_average"),
    # ("suspend-resume-threshold", "oracle"),
    # ("suspend-resume-threshold", "oracle"),
    # ("suspend-resume", "oracle"),
]


carbon_trace = carbon.get_carbon_model("AU-SA", 7000, extra_columns=True)
start_date_in_carbon_trace_as_timestamp = carbon_trace.df.iloc[0]["timestamp"]

fig_carbon = px.scatter(carbon_trace.df, x='datetime', y="carbon_intensity_avg", color="carbon_intensity_avg", color_continuous_scale=px.colors.sequential.speed)

def time_to_dates(seconds_since_simulation_start) -> str:
    adjusted_timestamp = seconds_since_simulation_start + start_date_in_carbon_trace_as_timestamp
    date = datetime.fromtimestamp(adjusted_timestamp, pytz.timezone('UTC'))
    return date 

for trace in traces:
    
    for policy in scheduling_policies:
        for waiting_time in waiting_times:

            scheduling_policy = policy[0]
            carbon_policy = policy[1]
            df = load_task_details("simulation", trace, policy[0], 7000, policy[1], "AU-SA", 0, waiting_time).sort_values(by=["start_time", "length"])

            df["start_time_date"] = df["start_time"].apply(time_to_dates)
            df["submission_date"] = df["arrival_time"].apply(time_to_dates)
            # df["deadline"] = (df["arrival_time"] + (int(waiting_time) * 3600)).apply(time_to_dates)
            df["exit_time_date"] = df["exit_time"].apply(time_to_dates)

            min_date_in_trace = time_to_dates(df["start_time"].min())
            max_date_in_trace = time_to_dates(df["exit_time"].max())

            fig_gantt = px.timeline(df, x_start="start_time_date", x_end="exit_time_date", y="ID", hover_data=["start_time", "arrival_time"])

            submission_markers = []

            for row in df.itertuples(index=False):
                submission_markers.append({'type': 'line', 'x0': row.submission_date, 'x1': row.start_time_date, 'y0': row.ID, 'y1': row.ID, 'xref': 'x1', 'yref':'y1', 'line': dict(color="MediumPurple", width=2, dash="dot")})

            fig = make_subplots(rows=2, cols=1, shared_xaxes=True)

            fig.add_trace(fig_gantt.data[0], row=1, col=1)
            fig.add_trace(fig_carbon.data[0], row=2, col=1)

            title_key = f"{scheduling_policy}_{carbon_policy}"
            title = f"{scheduling_policy}_{carbon_policy} ({carbon_policy_labels.get(title_key, '')})"

            fig.update_layout(
                title_text = f"{trace}'s scheduling via {title}, {waiting_time}",
                xaxis=dict(
                    type='date',
                ),
                xaxis2=dict(
                    type='date'
                ), 
                shapes=submission_markers
            )
            fig.update_xaxes(title_text="Date", range=[min_date_in_trace, max_date_in_trace])

            fig.update_yaxes(title_text="Job ID", fixedrange=True, row=1, col=1)
            fig.update_yaxes(title_text="Carbon intensity in gCO₂eq/kWh", fixedrange=True, row=2, col=1)

            yaxis2 = fig.layout.yaxis2

            fig.update_layout({'yaxis': {'range': [-0.5,df['ID'].max() + 1], 'tickmode': 'linear'}})
            fig.update_layout({'yaxis2': {'range': [0,0.5]}})

            fig.show()

In [3]:
import pulp
import math
from typing import Dict
sys.path.append('../src/')

import power_consumption_profiles as pcp
from task import Task, set_waiting_times
# Plan: have an LP Problem where we determine startup and work phases

import plotly.graph_objects as go
from functools import reduce

mockPowerFunction = pcp.FooPowerFunction(pcp.foo_phases_spec)
set_waiting_times("24")

# mockTask = Task(0, 0, task_length=int(mockPowerFunction.get_length()), CPUs=1, total_execution_time=0, power_consumption_function=mockPowerFunction)


# Define the problem
prob = pulp.LpProblem("StopResumeCarbonAwareScheduling", pulp.LpMinimize)

DEADLINE: int = 800

# The carbon trace is given in hours
seconds_carbon_trace = carbon_trace.extend(1)
seconds_carbon_trace.df = seconds_carbon_trace.df.head(DEADLINE)

# Example data
# TODO: seconds from the task-length to whatever this is now
WORK_LENGTH = int(mockPowerFunction.duration_work)  # Processing time for the job
print(f'WORK_LENGTH is {WORK_LENGTH}')

STARTUP_LENGTH = int(mockPowerFunction.duration_startup) #int(mockPowerFunction.duration_startup)  # Startup time for the job
print(f'STARTUP_LENGTH is {STARTUP_LENGTH}')

M = DEADLINE * 2

carbon_cost_at_time = seconds_carbon_trace.df['carbon_intensity_avg'].to_dict()

starting = pulp.LpVariable.dicts("starting", (t for t in range(DEADLINE)), cat="Binary")
startup_finished = pulp.LpVariable.dicts("start", (t for t in range(DEADLINE)), cat="Binary")
work = pulp.LpVariable.dicts("work", (t for t in range(DEADLINE)), cat="Binary")

# prob += carbon_cost

# This one will count up the seconds since each start, so we can calculate how which phase we are in
work_time_progressed = pulp.LpVariable.dict("work_time_progressed", (t for t in range(DEADLINE)), lowBound=0, upBound=WORK_LENGTH, cat=pulp.LpInteger)

# This one will count up the seconds since each start, so we can calculate how which phase we are in
startup_time_progressed = pulp.LpVariable.dict("startup_time_progressed", (t for t in range(DEADLINE)), lowBound=0, upBound=STARTUP_LENGTH, cat=pulp.LpInteger)

# set time_progressed to 0, whenever we start
for t in range(DEADLINE-1):
    #https://download.aimms.com/aimms/download/manuals/AIMMS3OM_IntegerProgrammingTricks.pdf 
    if (t>0):
        # be bigger than the previous value IF starting
        prob += startup_time_progressed[t] >= startup_time_progressed[t-1] + 1 - (1 - starting[t]) * M 
        prob += startup_time_progressed[t] <= startup_time_progressed[t-1] + 1 + (1 - starting[t]) * M 

    # IF not starting, be 0
    prob += startup_time_progressed[t] <= starting[t] * M 

    prob += work_time_progressed[0] == 0
    if (t > 0):
        prob += work_time_progressed[t] == work_time_progressed[t-1] + work[t]

# we need to linearize our phases.
# we do that by creating a boolean dict which will be true for each time the phase is active
phases = mockPowerFunction.phases

lin_function_dicts: Dict[str, Dict[str, Dict[str, pulp.LpVariable | float]]] = { }

running_index = 0

for phase_key, phases_of_key in phases.items():
    duration = 0
    if len(phases_of_key) == 0:
        continue

    lin_function_dicts[phase_key] = { }

    progress_variable = startup_time_progressed if phase_key == 'startup' else work_time_progressed
    state_variable = starting if phase_key == 'startup' else work

    for phase in phases_of_key:

        phase_name = phase['name'] + str(running_index)
        running_index += 1
        phase_variable_lower = pulp.LpVariable.dict(phase_name + "_lower", (t for t in range(DEADLINE)), cat="Binary")
        phase_variable_upper = pulp.LpVariable.dict(phase_name + "_upper", (t for t in range(DEADLINE)), cat="Binary")
        phase_variable = pulp.LpVariable.dict(phase_name, (t for t in range(DEADLINE)), cat="Binary")
        lin_function_dicts[phase_key][phase_name] = { }
        lin_function_dicts[phase_key][phase_name]['variable'] = phase_variable
        lin_function_dicts[phase_key][phase_name]['upper'] = phase_variable_upper
        lin_function_dicts[phase_key][phase_name]['lower'] = phase_variable_lower
        lin_function_dicts[phase_key][phase_name]['power'] = phase['power']

        # bounds are [lower, upper) for each phase
        lower_bound = max(duration, 0) 
        upper_bound = duration + 1 + int(phase["duration"])

        print(f'{phase_name} must be between {lower_bound} and {upper_bound}')

        for t in range(DEADLINE):

            #https://math.stackexchange.com/a/3260529 this is basically magic
            # this activates the phase_variable within (lower, upper)

            prob += progress_variable[t] - lower_bound <= M*phase_variable_lower[t]
            prob += lower_bound - progress_variable[t] <= M*(1-phase_variable_lower[t])

            prob += upper_bound - progress_variable[t] <= M*phase_variable_upper[t]
            prob += progress_variable[t] - upper_bound <= M*(1-phase_variable_upper[t])

            prob += phase_variable[t] >= phase_variable_lower[t] + phase_variable_upper[t] + state_variable[t] - 2
            prob += phase_variable[t] <= phase_variable_lower[t]
            prob += phase_variable[t] <= phase_variable_upper[t]
            prob += phase_variable[t] <= state_variable[t]
        
        duration += int(phase["duration"])

# our carbon cost is equal to each phase being active * its power * the amount of carbon per timeslot
all_phase_variables_with_power = []

for overarching_phase in lin_function_dicts.values():
    for phase_entry in overarching_phase.values():
        all_phase_variables_with_power.append((phase_entry['variable'], phase_entry['power']))

def carbon_cost_at_timeslot(t: int):
    return reduce(lambda problem, phase_tuple: problem + phase_tuple[0][t] * phase_tuple[1] * carbon_cost_at_time[t], all_phase_variables_with_power, pulp.LpAffineExpression())

prob += pulp.lpSum([carbon_cost_at_timeslot(t) for t in range(DEADLINE)]) 
# prob += pulp.lpSum([starting[t] * carbon_cost_at_time[t] + work[t] * carbon_cost_at_time[t] for t in range(DEADLINE)]) 


# spend enough time processing
prob += pulp.lpSum(work[t] for t in range(STARTUP_LENGTH, DEADLINE)) == WORK_LENGTH
prob += pulp.lpSum(work[t] for t in range(STARTUP_LENGTH)) == 0

for t in range(DEADLINE - 1):
    # Ensure the job undergoes the startup phase whenever it resumes
    # if [0 , 1], this will be 1
    # t1   t2
    # 0     0 => 0
    # 0     1 => 1 
    # 1     0 => -1 / 0
    prob += startup_finished[t] >= work[t + 1] - work[t]

    # we can not be in startup and work at the same time
    prob += startup_finished[t] + work[t] <= 1
    prob += starting[t] + work[t] <= 1


for i in range(STARTUP_LENGTH - 1, DEADLINE):
    prob += pulp.lpSum([starting[i - j] for j in range(STARTUP_LENGTH)]) >= STARTUP_LENGTH * startup_finished[i], f"Contiguity_{i}"


# The solution so far seems to take a really long time, let's also add a maximum amount of startups to hopefully reduce the search space
prob += pulp.lpSum([startup_finished[j] for j in range(DEADLINE)]) <= 5, f"Max_starts"

solver = pulp.GUROBI_CMD(timeLimit=60 * 20)

prob.solve(solver)

print(f"Status: {pulp.LpStatus[prob.status]}")

print(f"Job schedule:")
for t in range(DEADLINE):
    if pulp.value(starting[t]) is not None and pulp.value(starting[t])  > 0:
        print(f"  Time {t}: Starting")

    if pulp.value(startup_finished[t]) is not None and pulp.value(startup_finished[t])  > 0:
        print(f"  Time {t}: Startup finished")

    if pulp.value(work_time_progressed[t]) is not None and pulp.value(work_time_progressed[t])  > 0:
        print(f"  Time {t}: Progress: {pulp.value(work_time_progressed[t])}")

    if pulp.value(work[t]) is not None and pulp.value(work[t])  > 0:
        print(f"  Time {t}: Processing")


WORK_LENGTH is 300
STARTUP_LENGTH is 40
Start Python0 must be between 0 and 21
Download Data1 must be between 20 and 41
High2 must be between 0 and 51
Low3 must be between 50 and 101
High4 must be between 100 and 151
Low5 must be between 150 and 201
High6 must be between 200 and 251
Low7 must be between 250 and 301
Set parameter Username
Set parameter TimeLimit to value 1200
Set parameter LogFile to value "gurobi.log"
Using license file /opt/gurobi1103/gurobi.lic
Academic license - for non-commercial use only - expires 2025-08-12

Gurobi Optimizer version 11.0.3 build v11.0.3rc0 (linux64 - "Ubuntu 23.10")
Copyright (c) 2024, Gurobi Optimization, LLC

Read LP format model from file /tmp/bb079019366d4e708e8da109806ff5fc-pulp.lp
Reading time = 0.16 seconds
OBJ: 58353 rows, 23200 columns, 163173 nonzeros

CPU model: Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz, instruction set [SSE2|AVX|AVX2]
Thread count: 2 physical cores, 4 logical processors, using up to 4 threads

Optimize a model with 583

In [6]:

from typing import List


prototype_fig = make_subplots(rows=4, cols=1, shared_xaxes=True)

graph_times = list(carbon_cost_at_time.keys())
graph_carbon_costs = list(carbon_cost_at_time.values())

fig_carbon = px.scatter(x=graph_times, y=graph_carbon_costs)
prototype_fig.add_trace(fig_carbon.data[0], row=4, col=1)

# Okay, nice. Let's visualize our prototype scheduling approach
states = [(starting, 'starting'), (startup_finished, 'startup_finished'), (work, 'work')]
integers = [(startup_time_progressed, 'start_progress'), (work_time_progressed, 'work_progress')]

def make_trace_from_variables(targets: List[tuple]):
    df = pd.DataFrame(data=[], columns=['time', 'value', 'name'])

    for variable, name in targets:
        as_dict = {t: pulp.value(variable[t]) if pulp.value(variable[t]) is not None else 0 for t in range(DEADLINE)}

        foo = pd.DataFrame(list(as_dict.items()), columns=['time', 'value'])
        foo['name'] = name

        foo = foo[foo['value'] != 0]

        df = pd.concat([df, foo], ignore_index=True)

    plot = px.scatter(df, x='time', y='value', color='name')
    return plot
    
phases_variables_with_name = [
    *[(value['variable'], key) for (key, value) in lin_function_dicts['startup'].items()],
    *[(value['variable'], key) for (key, value) in lin_function_dicts['work'].items()]
]

boolean_plot = make_trace_from_variables(states)
integers_plot = make_trace_from_variables(integers)
phase_states_plot = make_trace_from_variables(phases_variables_with_name)

prototype_fig.add_traces(boolean_plot.data, rows=1, cols=1)
prototype_fig.add_traces(integers_plot.data, rows=2, cols=1)
prototype_fig.add_traces(phase_states_plot.data, rows=3, cols=1)

#prototype_fig.update_xaxes(title_text="Timestamp", range=[df['time'].min()-10, df['time'].max()+10])

prototype_fig.update_yaxes(title_text="Execution States", fixedrange=True, row=1, col=1)
prototype_fig.update_yaxes(title_text="Progress during State", fixedrange=True, row=2, col=1)
prototype_fig.update_yaxes(title_text="Carbon intensity in gCO₂eq/Wh", fixedrange=True, row=4, col=1)


# prototype_fig.update_layout({'yaxis': {'range': [-0.5, 1.5], 'tickmode': 'linear'}})
#prototype_fig.update_layout({'yaxis4': {'range': [0,0.1]}})

prototype_fig.show()


The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.


The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.


The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.



In [5]:
bar = []



print(bar)

[]
