In [None]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [None]:
import os
import sys
import grid2op
from grid2op.Agent import DoNothingAgent, BaseAgent
from tqdm.notebook import tqdm  # for easy progress bar
display_tqdm = False  # this is set to False for ease with the unit test, feel free to set it to True
import numpy as np
import pdb
import matplotlib.pyplot as plt

In [None]:
try:
    from lightsim2grid import LightSimBackend
    bk_cls = LightSimBackend
except ImportError as exc:
    print(f"Error: {exc} when importing faster LightSimBackend")
    from grid2op.Backend import PandaPowerBackend
    bk_cls = PandaPowerBackend

env_name = "l2rpn_neurips_2020_track1_small"
env = grid2op.make(env_name, backend=bk_cls())

print("Is this environment suitable for redispatching: {}".format(env.redispatching_unit_commitment_availble))

In [None]:
# what are the dispatchable generators for this environment
print( env.gen_redispatchable,  np.sum(env.gen_redispatchable), env.gen_redispatchable.shape[0], env.gen_cost_per_MW )

In [None]:
print( env.get_thermal_limit() ) 

In [None]:
# gen-index, ramp-rate, marginal-cost!

boolean_array = env.gen_redispatchable 
generator_redispatch_indices = np.where(boolean_array)[0]
#print(generator_redispatch_indices)

ramp_rates = env.gen_max_ramp_up[generator_redispatch_indices]
#print(ramp_rates)

marginal_cost = env.gen_cost_per_MW[generator_redispatch_indices]
#print(marginal_cost)

zipped_array = np.column_stack( (generator_redispatch_indices, ramp_rates, marginal_cost) )
print(zipped_array)

In [None]:
# filter based on delta values
delta = 2
filtered_rows = zipped_array[ zipped_array[:, 1] > delta ]
print(filtered_rows)

k = filtered_rows.shape[0] # numbkerOfGeninAction
print(k)

In [None]:
# filter based on manually selected generator indices

# Indices to filter
indices_to_filter = [3, 13, 16, 20, 21]

# Filter rows based on indices
filtered_rows = zipped_array[ np.isin(zipped_array[:, 0], indices_to_filter) ]

print("Filtered Rows:")
print(filtered_rows)

k = filtered_rows.shape[0] # numbkerOfGeninAction
print(k)

In [None]:
genIndex_float = filtered_rows[:, 0]
genIndex_int = genIndex_float.astype(int)
print( genIndex_int )

# Each variable takes one of [0, delta, -delta]

In [None]:
def find_combinations(target_sum, num_numbers, delta, current_combination, all_combinations):
    if num_numbers == 0:
        if target_sum == 0:
            all_combinations.append(list(current_combination))
        return
    
    for value in [-delta, delta, 0]:
        current_combination.append(value)
        find_combinations(target_sum - value, num_numbers - 1, delta, current_combination, all_combinations)
        current_combination.pop()

def generate_combinations(target_sum, k, delta):
    combinations = []
    find_combinations(target_sum, k, delta, [], combinations)
    return combinations

k = 4      # Number of discrete generator numbers participating
delta = 4  # discretitized values for generator ramps {+delta, -delta, and 0}

combinations = generate_combinations(0, k, delta)
print("Number of combinations:", len(combinations))
print("Combinations:", combinations)

In [None]:
# making a list of tuples where the first element of the tuple is the generartor index and the second is the amount of dispatch.
action_space_list = []
for dispatch in combinations:
    #print(dispatch)
    zipped_array = np.column_stack( (genIndex_int, dispatch) )
    #print( zipped_array )
    list_of_tuples = [tuple(row) for row in zipped_array]
    action_space_list.append(list_of_tuples)

In [None]:
# construction of the action space as a list of tuples
print(len(action_space_list), action_space_list)
#action_space_list.reverse()
print(" ")
print("After Removal")
action_space_list.pop()                                 # if you want to remove the all-zero re-dispatch action (might have to add a do-nothing action explicitly)
print(len(action_space_list), action_space_list)

In [None]:
# just to see how the list of tuples looks like
print( action_space_list[:2] )
print( "First Action ", action_space_list[0] )

In [None]:
# developing an environment in order to see the impact of each of the above designed re-dispatch actions.

import os, sys, grid2op
from grid2op.Agent import DoNothingAgent, BaseAgent
from tqdm.notebook import tqdm  # for easy progress bar
display_tqdm = False  # this is set to False for ease with the unitt test, feel free to set it to True
import pdb
import numpy as np
import matplotlib.pyplot as plt

try:
    from lightsim2grid import LightSimBackend
    bk_cls = LightSimBackend
except ImportError as exc:
    print(f"Error: {exc} when importing faster LightSimBackend")
    from grid2op.Backend import PandaPowerBackend
    bk_cls = PandaPowerBackend

env_name = "l2rpn_neurips_2020_track1_small"
env = grid2op.make(env_name, backend=bk_cls())

print("Is this environment suitable for redispatching: {}".format(env.redispatching_unit_commitment_availble))

In [None]:
# testing out the "action_space_list" on a given observation "obs"

actions = []

relevant_indices = [3, 13, 16, 20, 21]

for index in range( len(action_space_list) ):    
    # perform a valid redispatching action
    env.set_id(0)  # make sure to use the same environment input data.
    obs_init = env.reset()  # reset the environment
    act = env.action_space()
    act = env.action_space( { "redispatch": action_space_list[index] } )
    
    print( act )
    #print( env.action_space( { "redispatch": action_space_list[index] } ).to_vect() )
    actions.append( env.action_space( { "redispatch": action_space_list[index] } ).to_vect() )
    
    obs, reward, done, info = env.step(act)
    print( "actual dispatch at time step 0: {}".format(obs.actual_dispatch) )
    print( obs.actual_dispatch[relevant_indices] )
    print( action_space_list[index] )
    print("")

In [None]:
actions.append( env.action_space( {} ).to_vect() ) # adding the do nothing action in an adhoc fashion (since re-dispatching all zero is not do-nothing in principle)

In [None]:
actions.reverse()

In [None]:
actions

In [None]:
# ideally, run it only once to store each action in the vector format in the "track1_51_redis_actions_numpy" array.
my_array = np.array(actions)
np.savez("track1_51_redis_actions_numpy.npz", my_array=my_array)

In [None]:
# Load the array from the .npz file
loaded_data = np.load("track1_51_redis_actions_numpy.npz")
#loaded_data = np.load("track1_51_redis_actions_numpy.npz", allow_pickle=True)

# Access the array using the name you assigned
all_actions_stored_numpy = loaded_data["my_array"]

#print(all_actions_stored_numpy)
for action in all_actions_stored_numpy:
    print(action)

In [None]:
env.set_id(0)  # make sure to use the same environment input data.
obs_init = env.reset()  # reset the environment
env.fast_forward_chronics(5000)
obs, reward, done, info = env.step(act)
relevant_indices = [3, 13, 16, 20, 21]

for index in range( len(action_space_list) ):    
    # perform a valid redispatching action
    env.set_id(0)  # make sure to use the same environment input data.
    obs_init = env.reset()  # reset the environment
    act = env.action_space( { "redispatch": action_space_list[index] } )
    
    print( act ) # ultimately this is stored in the .npz file when loading the action space in DDQN
    
    obs, reward, done, info = env.step(act)
    print( "actual dispatch at time step 0: {}".format(obs.actual_dispatch) )
    print( obs.actual_dispatch[relevant_indices] )
    print( action_space_list[index] )
    print("")

# What if each variable takes one of [0, delta, delta/2, -delta, -delta/2]?

In [None]:
def find_combinations(target_sum, num_numbers, delta, current_combination, all_combinations):
    if num_numbers == 0:
        if target_sum == 0:
            all_combinations.append(list(current_combination))
        return
    
    for value in [0, delta, delta/2, -delta, -delta/2]:
        current_combination.append(value)
        find_combinations(target_sum - value, num_numbers - 1, delta, current_combination, all_combinations)
        current_combination.pop()

def generate_combinations(target_sum, k, delta):
    combinations = []
    find_combinations(target_sum, k, delta, [], combinations)
    return combinations

k = 5  # Number of discrete numbers
delta = 2  # Difference between values

combinations = generate_combinations(0, k, delta)
print("Number of combinations:", len(combinations))
print("Combinations:", combinations)