In [5]:
from typing import Any, List, Mapping, Tuple, Union

from agents.rbc import RBCAgent1 as Agent
from citylearn.citylearn import CityLearnEnv

  __DEFAULT = ''
  __STORAGE_SUFFIX = '_without_storage'
  __PARTIAL_LOAD_SUFFIX = '_and_partial_load'
  __PV_SUFFIX = '_and_pv'


In [19]:
class RewardFunctionV1:
    r"""Base and default reward function class.

    The default reward is the electricity consumption from the grid at the current time step returned as a negative value.

    Parameters
    ----------
    env_metadata: Mapping[str, Any]:
        General static information about the environment.
    **kwargs : dict
        Other keyword arguments for custom reward calculation.
    """
    
    def __init__(self, env_metadata: Mapping[str, Any], exponent: float = None, **kwargs):
        self.env_metadata = env_metadata
        self.exponent = exponent

    @property
    def env_metadata(self) -> Mapping[str, Any]:
        """General static information about the environment."""

        return self.__env_metadata
    
    @property
    def central_agent(self) -> bool:
        """Expect 1 central agent to control all buildings."""

        return self.env_metadata['central_agent']
    
    @property
    def exponent(self) -> float:
        return self.__exponent
    
    @env_metadata.setter
    def env_metadata(self, env_metadata: Mapping[str, Any]):
        self.__env_metadata = env_metadata

    @exponent.setter
    def exponent(self, exponent: float):
        self.__exponent = 1.0 if exponent is None else exponent

    def reset(self):
        """Use to reset variables at the start of an episode."""

        pass

    def calculate(self, observations: List[Mapping[str, Union[int, float]]]) -> List[float]:
        r"""Calculates reward.

        Parameters
        ----------
        observations: List[Mapping[str, Union[int, float]]]
            List of all building observations at current :py:attr:`citylearn.citylearn.CityLearnEnv.
            time_step` that are got from calling :py:meth:`citylearn.building.Building.observations`.

        Returns
        -------
        reward: List[float]
            Reward for transition to current timestep.
        """
        print(observations)

        net_electricity_consumption = [o['net_electricity_consumption'] for o in observations]
        reward_list = [-(max(o, 0)**self.exponent) for o in net_electricity_consumption]

        if self.central_agent:
            reward = [sum(reward_list)]
        else:
            reward = reward_list

        return reward


In [20]:
def action_space_to_dict(aspace):
    """ Only for box space """
    return {"high": aspace.high,
            "low": aspace.low,
            "shape": aspace.shape,
            "dtype": str(aspace.dtype)
            }


def env_reset(env):
    observations = env.reset()
    action_space = env.action_space
    observation_space = env.observation_space
    #building_info = env.buildings()
    #building_info = list(building_info.values())
    action_space_dicts = [action_space_to_dict(asp) for asp in action_space]
    observation_space_dicts = [action_space_to_dict(osp) for osp in observation_space]
    obs_dict = {"action_space": action_space_dicts,
                "observation_space": observation_space_dicts,
                #"building_info": building_info,
                "observation": observations}
    return obs_dict

In [21]:
env= CityLearnEnv("citylearn_challenge_2022_phase_all",reward_function = RewardFunctionV1)
env.central_agent = False
agent= Agent()

In [22]:
obs_dict = env_reset(env)


In [23]:
actions=agent.register_reset(obs_dict)

In [24]:
print(actions)

[array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.]), array([-0.])]


In [25]:
while True:
    observations,reward,done,_ = env.step(actions)
    
    break
    if done :
        break
    else:
        actions= agent.compute_action(observations)

[{'month': 8, 'hour': 1, 'day_type': 1, 'daylight_savings_status': 0, 'indoor_dry_bulb_temperature': nan, 'average_unmet_cooling_setpoint_difference': nan, 'indoor_relative_humidity': nan, 'non_shiftable_load': 0.85116667, 'dhw_demand': 0.0, 'cooling_demand': 0.0, 'heating_demand': 0.0, 'solar_generation': 0.0, 'occupant_count': 0.0, 'indoor_dry_bulb_temperature_set_point': 0.0, 'power_outage': 0.0, 'indoor_dry_bulb_temperature_without_control': nan, 'cooling_demand_without_control': 0.0, 'heating_demand_without_control': 0.0, 'dhw_demand_without_control': 0.0, 'non_shiftable_load_without_control': 0.85116667, 'indoor_relative_humidity_without_control': nan, 'indoor_dry_bulb_temperature_set_point_without_control': 0.0, 'hvac_mode': 1.0, 'outdoor_dry_bulb_temperature': 20.1, 'outdoor_relative_humidity': 79.0, 'diffuse_solar_irradiance': 0.0, 'direct_solar_irradiance': 0.0, 'outdoor_dry_bulb_temperature_predicted_6h': 19.4, 'outdoor_dry_bulb_temperature_predicted_12h': 22.8, 'outdoor_dry

In [None]:
kpis_rbc = env.evaluate()
kpis_rbc = kpis_rbc.pivot(index='cost_function', columns='name', values='value').round(3)
kpis_rbc = kpis_rbc.dropna(how='all')
display(kpis_rbc)

In [None]:
from stable_baselines3.sac import SAC as Agent
from citylearn.citylearn import CityLearnEnv
from citylearn.wrappers import NormalizedObservationWrapper, StableBaselines3Wrapper


In [None]:
env = CityLearnEnv('citylearn_challenge_2023_phase_2_local_evaluation', central_agent=True)
env = NormalizedObservationWrapper(env)
env = StableBaselines3Wrapper(env)
model = Agent('MlpPolicy', env)

In [None]:
episodes =50
model.learn(total_timesteps=env.unwrapped.time_steps*episodes)


In [None]:
# test
observations= env.reset()
steps = 0
while True:
    actions, _ = model.predict(observations, deterministic=True)
    observations, _, done,_ = env.step(actions)
    steps +=1
    if done == True:
        break



In [None]:
steps

In [None]:
kpis = env.unwrapped.evaluate()
kpis = kpis.pivot(index='cost_function', columns='name', values='value').round(3)
kpis = kpis.dropna(how='all')
display(kpis)