In [5]:
import yaml
from typing import Any, TypeVar

import grid2op
import gymnasium
import numpy as np
from grid2op import Reward
from grid2op.gym_compat import GymEnv
from ray.rllib.algorithms import ppo  # import the type of agents
from mahrl.grid2op_env.utils import (
    CustomDiscreteActions,
    get_possible_topologies,
    setup_converter,
)


ENV_NAME = "rte_case5_example"
ENV_IS_TEST = True
LIB_DIR = "/Users/barberademol/Documents/GitHub/mahrl_grid2op/"
# LIB_DIR = "/home/daddabarba/VirtualEnvs/mahrl/lib/python3.10/site-packages/grid2op/data"
NB_STEP_TRAIN = 10
RHO_THRESHOLD = 0.95
CHANGEABLE_SUBSTATIONS = [0, 2, 3]

OBSTYPE = TypeVar("OBSTYPE")
ACTTYPE = TypeVar("ACTTYPE")
RENDERFRAME = TypeVar("RENDERFRAME")

class CustomizedGrid2OpEnvironment(gymnasium.Env):
    """Encapsulate Grid2Op environment and set action/observation space."""

    def __init__(self, env_config: dict[str, Any]):
        # 1. create the grid2op environment
        if "env_name" not in env_config:
            raise RuntimeError(
                "The configuration for RLLIB should provide the env name"
            )
        nm_env = env_config.pop("env_name", None)
        self.env_glop = grid2op.make(nm_env, **env_config["grid2op_kwargs"])

        # 1.a. Setting up custom action space
        possible_substation_actions = get_possible_topologies(
            self.env_glop, CHANGEABLE_SUBSTATIONS
        )

        # 2. create the gym environment
        self.env_gym = GymEnv(self.env_glop)
        self.env_gym.reset()

        # 3. customize action and observation space space to only change bus
        # create converter
        converter = setup_converter(self.env_glop, possible_substation_actions)

        # set gym action space to discrete
        self.env_gym.action_space = CustomDiscreteActions(
            converter, self.env_glop.action_space()
        )

        # customize observation space
        ob_space = self.env_gym.observation_space
        ob_space = ob_space.keep_only_attr(
            ["rho", "gen_p", "load_p", "topo_vect", "p_or", "p_ex", "timestep_overflow"]
        )

        self.env_gym.observation_space = ob_space

        # 4. specific to rllib
        self.action_space = gymnasium.spaces.Discrete(len(possible_substation_actions))
        self.observation_space = gymnasium.spaces.Dict(
            dict(self.env_gym.observation_space.spaces.items())
        )

        self.last_rho = 0  # below threshold TODO

    def reset(
        self,
        *,
        seed: int | None = None,
        options: dict[str, Any] | None = None,
    ) -> tuple[OBSTYPE, dict[str, Any]]:  # type: ignore
        obs, info = self.env_gym.reset()
        self.last_rho = np.max(obs["rho"])
        return obs, info

    def step(self, action: int) -> tuple[OBSTYPE, float, bool, bool, dict[str, Any]]:
        # for the first action or whenever the lines are not near overloading, do nothing
        if self.last_rho < RHO_THRESHOLD:
            action = -1

        obs, reward, done, truncated, info = self.env_gym.step(action)
        self.last_rho = np.max(obs["rho"])
        return obs, reward, done, truncated, info

    def render(self) -> RENDERFRAME | list[RENDERFRAME] | None:
        raise NotImplementedError

# Create PPOConfig object
ppo_config = ppo.PPOConfig()
ppo_config = ppo_config.training(
    gamma=0.95,
    lr=0.003,
    vf_loss_coeff=0.5,
    entropy_coeff=0.01,
    clip_param=0.2,
    lambda_=0.95,
    sgd_minibatch_size=4,
    train_batch_size=32,
    # seed=14,
)
ppo_config = ppo_config.environment(
    env=CustomizedGrid2OpEnvironment,
    env_config={
        "env_name": ENV_NAME,
        "grid2op_kwargs": {
            "test": ENV_IS_TEST,
            "reward_class": Reward.L2RPNReward,
        },
    },
)

# Convert to YAML string
yaml_string = yaml.dump(ppo_config)

# Write YAML string to a file
with open("ppo_config.yaml", "w") as yaml_file:
    yaml_file.write(yaml_string)




In [13]:
from ray.rllib.policy.policy import PolicySpec
from ray.rllib.algorithms.algorithm_config import DEFAULT_POLICY_MAPPING_FN

class AlgorithmConfigConstructor(yaml.constructor.SafeConstructor):
    def construct_algorithm_config(self, node):
        data = self.construct_mapping(node, deep=True)
        return DEFAULT_POLICY_MAPPING_FN(**data)

# Add the constructor to the PyYAML loader
yaml.add_constructor('tag:yaml.org,2002:python/object:ray.rllib.algorithms.algorithm_config.DEFAULT_POLICY_MAPPING_FN', AlgorithmConfigConstructor.construct_algorithm_config)

class PPOConfigConstructor(yaml.constructor.SafeConstructor):
    def construct_rllib_ppo_config(self, node):
        data = self.construct_mapping(node, deep=True)
        return ppo.PPOConfig(**data)

class PolicySpecConstructor(yaml.constructor.SafeConstructor):
    def construct_rllib_policy_spec(self, node):
        data = self.construct_mapping(node, deep=True)
        return PolicySpec(**data)

# Add the constructor to the PyYAML loader
yaml.add_constructor('tag:yaml.org,2002:python/object:ray.rllib.policy.policy.PolicySpec', PolicySpecConstructor.construct_rllib_policy_spec)
yaml.add_constructor('tag:yaml.org,2002:python/object:ray.rllib.algorithms.ppo.ppo.PPOConfig', PPOConfigConstructor.construct_rllib_ppo_config)

with open("ppo_config.yaml", "r") as yaml_file:
    loaded_config = yaml.load(yaml_file, Loader=yaml.FullLoader)

print(loaded_config)

ImportError: cannot import name 'DEFAULT_POLICY_MAPPING_FN' from 'ray.rllib.algorithms.algorithm_config' (/Users/barberademol/Documents/GitHub/mahrl_grid2op/venv_mahrl/lib/python3.10/site-packages/ray/rllib/algorithms/algorithm_config.py)