In [None]:
import os
import random
from datetime import timedelta
from math import ceil

import pandas as pd
from meteostat import Hourly, Stations


class OutsideTemp:
    backup_outside_temp_list = [4.3, 4.3, 4.4, 4.4, 4.3, 4.3, 4.7, 5.2, 7.0, 5.3, 8.7, 10.0, 10.7,
                                11.3, 13.6, 10.5, 9.4, 9.0, 9.3, 9.5, 9.3, 9.3, 8.3, 8.3]
    file_name = "outside_temps.csv"
    temperature_df = pd.read_csv(file_name)

    def __init__(self, time_step_size, episode_length=timedelta(days=1)):
        self.time_step_size = time_step_size
        self.interp_df = self.interpolate(time_step_size, self.temperature_df)
        self.time_list = self.interp_df.index.to_list()
        self.episode_length = episode_length
        self.start = 0

    def get_temperature(self, i):
        """
        Interpolate temperature from the outside temperature list
        :return: the interpolated temperature
        """
        return self.interp_df.iloc[self.start + i].temp

    def get_time(self, i):
        return int(self.time_list[self.start + i].strftime("%H%M"))

    def new_sample(self):
        episode_length_steps = ceil(self.episode_length / self.time_step_size)

        # see if the requested number of days fits into the data
        remaining_temp = len(self.interp_df) - episode_length_steps
        if remaining_temp < 0:
            raise ValueError(f"Attempted to run for longer ({self.episode_length}) than temperature data is available ({len(self.temperature_df)})")
        self.start = random.randint(0, remaining_temp)

    @classmethod
    def interpolate(cls, time_step_size, df):
        """
        Interpolate the temperature data to match the time step size of the simulation.
        """
        df = df.set_index("time")
        df.index = pd.to_datetime(df.index)
        freq = f"{time_step_size.total_seconds() / 60:.0f}T"
        df = df.resample(freq).interpolate()
        return df

    @classmethod
    def _fetch_outside_temperature(cls):
        """
        The function fetch_outside_temperature fetches hourly temperature data from a weather station in Bavaria,
        filters out rows with missing data, and removes rows that are more than an hour away from their adjacent rows.
        The filtered temperature data is stored in the `outside_temp_n` csv file.
        :return:
        """
        # Find and fetch a station in Bavaria
        station_df = Stations().region("DE", "BY").fetch(1)
        station = station_df.iloc[0]
        start_date = station["hourly_start"]
        end_date = station["hourly_end"]

        # Fetch the hourly values and filter out NaN
        hourly_df = Hourly(station["wmo"], start_date, end_date).fetch()
        hourly_df = hourly_df.filter(items=["temp"])
        hourly_df = hourly_df.loc[pd.notnull(hourly_df["temp"])]

        # filter rows which are more than an hour away from previous row (NaN in that hour)
        hourly_df["time_diff"] = hourly_df.index.to_series().diff().dt.seconds
        hourly_df = hourly_df[hourly_df["time_diff"] <= 3610]
        hourly_df = hourly_df.drop(columns=["time_diff"])

        # Save the fetched data to a csv, with a new name
        i = 0
        while os.path.exists(f"{cls.file_name}_{i}.csv"):
            i += 1

        hourly_df.to_csv(f"{cls.file_name}_{i}.csv", index=True)


In [None]:
from datetime import timedelta
from enum import auto, Enum
from functools import partial
from random import randint

import gym
import numpy as np
from numpy import float32
from simplesimple import Building

from outside_temp import OutsideTemp


class BuildingEnv(gym.Env):
    """
    A gym environment for the Building model.
    """

    metadata = {'render.modes': ['human']}
    COMFORT_PENALTY = -10000

    class RewardTypes(str, Enum):
        ENERGY = auto()
        COMFORT = auto()
        CHANGE = auto()

    def __init__(
            self, heat_mass_capacity,
            heat_transmission,
            maximum_cooling_power,
            maximum_heating_power,
            time_step: timedelta,
            floor_area,
            episode_length=timedelta(days=2),
            desired_temperature=22,
    ):
        """
        :param heat_mass_capacity: The heat mass capacity of the building.
        :param heat_transmission: The heat transmission rate of the building.
        :param maximum_cooling_power: The maximum cooling power available to the building.
        :param maximum_heating_power: The maximum heating power available to the building.
        :param time_step: The time step size for the simulation.
        :param floor_area: The floor area that is conditioned by heating and cooling.
        :param episode_length: The length of each episode, defaults to two days.
        :param desired_temperature: The desired temperature in the building, defaults to 21 degrees Celsius.
        """
        # Set up the outside temperature model, and get the initial temperature and time.
        self.temp_model = OutsideTemp(time_step, episode_length=episode_length)
        self.temp_model.new_sample()
        self.prev_temp = self.temp_model.get_temperature(0)
        self.random_temp = partial(randint, 12, 33)

        # Set up the building model.
        self.building = Building(heat_mass_capacity=heat_mass_capacity,
                                 heat_transmission=heat_transmission,
                                 maximum_cooling_power=maximum_cooling_power,
                                 maximum_heating_power=maximum_heating_power,
                                 initial_building_temperature=self.random_temp(),
                                 time_step_size=time_step,
                                 conditioned_floor_area=floor_area)

        # Set up basic variables for the environment.
        self.desired_temperature = desired_temperature
        self.time_step = time_step
        self.episode_length_steps = episode_length / time_step
        self.i = 0
        self.current_rewards = {}

        # Set up the action and observation spaces for the environment.
        # This is a continuous action space, with two actions: heating setpoint and cooling setpoint.
        # The observation space is a continuous space, with three observations:
        # building temperature, thermal power, and current time of day in the format "%H%M".
        self.observation_space = gym.spaces.Box(low=np.array([-30, 0, 0]), high=np.array([60, 10000, 2400]), shape=(3,))
        self.action_space = gym.spaces.Box(low=maximum_cooling_power, high=maximum_heating_power, shape=(2,))

    def render(self, mode='human'):
        if mode == 'human':
            print(f'Current temperature: {self.building.current_temperature:.2f}')
            print(f'Thermal power: {self.building.thermal_power:.2f}')
        else:
            pass

    def step(self, action):
        self.i += 1
        power = action
        outside_temperature = self.temp_model.get_temperature(self.i)
        time = self.temp_model.get_time(self.i)

        self.prev_temp = self.building.current_temperature
        self.building.step(outside_temperature, power)

        obs = np.array([self.building.current_temperature, self.building.thermal_power, time], dtype=float32)
        reward = self.calculate_reward()
        done = self.i >= self.episode_length_steps
        info = {}
        #     "Current temperature": self.building.current_temperature,
        #     "Thermal power": self.building.thermal_power,
        # }

        return obs, reward, done, info

    def calculate_reward(self):
        r_energy_use = self.get_energy_use_reward()
        r_comfort = self.get_comfort_reward()
        r_temp_change = self.get_temp_change_reward()

        self.current_rewards = {
            self.RewardTypes.ENERGY: r_energy_use,
            self.RewardTypes.COMFORT: r_comfort,
            self.RewardTypes.CHANGE: r_temp_change,
        }

        return r_energy_use + r_comfort + r_temp_change

    def get_temp_change_reward(self):
        return 0
        # if self.prev_temp is None:
        #     return 0
        # return -abs(self.prev_temp - self.building.current_temperature)

    def get_comfort_reward(self):
        if self.desired_temperature - 1 <= self.building.current_temperature <= self.desired_temperature + 1:
            return 0
        return self.COMFORT_PENALTY

    def get_energy_use_reward(self):
        energy_use = abs(self.time_step.total_seconds() * self.building.thermal_power)
        return -energy_use

    def reset(self):
        self.i = 0
        self.temp_model.new_sample()
        self.prev_temp = self.temp_model.get_temperature(0)
        time = self.temp_model.get_time(0)
        self.building.current_temperature = self.random_temp()
        self.building.thermal_power = 0
        obs = np.array([self.building.current_temperature,
                        self.building.thermal_power,
                        time], dtype=float32)
        return obs


In [None]:
from gym.wrappers import Monitor
from matplotlib import pyplot as plt
from stable_baselines3.common.callbacks import BaseCallback

from building_env import BuildingEnv


class BuildingPlotCallback(BaseCallback):
    def __init__(self, log_dir: str):
        super(BuildingPlotCallback, self).__init__()
        self.log_dir = log_dir
        self.temperatures = []
        self.thermal_powers = []
        self.rewards = {BuildingEnv.RewardTypes.ENERGY: [],
                        BuildingEnv.RewardTypes.COMFORT: [],
                        BuildingEnv.RewardTypes.CHANGE: []}

    def _on_step(self):
        env = self.model.get_env()

        current_rewards = env.get_attr('current_rewards')[0]
        for key, value in current_rewards.items():
            self.rewards[key].append(value)

        building = env.get_attr('building')[0]
        self.temperatures.append(building.current_temperature)
        self.thermal_powers.append(abs(building.thermal_power))

    def _on_training_end(self):
        self.plot_output_2(self.temperatures, self.thermal_powers, "Temperature", "Power")
        self.plot_output_2(self.rewards[BuildingEnv.RewardTypes.ENERGY], self.rewards[BuildingEnv.RewardTypes.COMFORT],
                           "Energy Reward", "Comfort Reward")

        monitor: Monitor = self.model.get_env().venv.envs[0]
        episode_lengths = monitor.get_episode_lengths()
        episode_rewards = monitor.get_episode_rewards()
        avg_rewards = [i / j for i, j in zip(episode_rewards, episode_lengths)]
        plt.plot(avg_rewards)
        plt.xlabel("Episode")
        plt.ylabel("Average Reward")
        plt.show()

        episode_temperatures = []
        episode_powers = []

        total = 0
        for length in episode_lengths:
            episode_mean_temperature = sum(self.temperatures[total:total + length]) / length
            episode_mean_power = sum(self.thermal_powers[total:total + length]) / length
            episode_temperatures.append(episode_mean_temperature)
            episode_powers.append(episode_mean_power)
            total += length

        self.plot_output_2(episode_temperatures, episode_powers, "Avg Temperature", "Avg Power")

    @classmethod
    def plot_output_2(cls, list1, list2, label1, label2):
        fig, ax1 = plt.subplots()
        color = 'tab:red'
        ax1.set_xlabel('Time')
        ax1.set_ylabel(label1, color=color)
        ax1.plot(list1, color=color)
        ax1.tick_params(axis='y', labelcolor=color)

        ax2 = ax1.twinx()
        color = 'tab:blue'
        ax2.set_ylabel(label2, color=color)
        ax2.plot(list2, color=color)
        ax2.tick_params(axis='y', labelcolor=color)

        fig.tight_layout()
        plt.show()

    @classmethod
    def plot_output(cls, temperature, thermal_power):
        # plot the data
        plt.plot(temperature, label="Temperature")

        # add labels and title
        plt.xlabel('Time (steps)')
        plt.ylabel('Temperature (Celsius)')
        plt.title('Temperature over Time')

        # display the plot
        plt.show()


        plt.plot(thermal_power)
        plt.xlabel('Time (steps)')
        plt.ylabel('Power (Watts)')
        plt.title('Power over Time')
        plt.show()



In [None]:
from datetime import timedelta

from gym.wrappers import Monitor
from matplotlib import pyplot as plt
from stable_baselines3 import A2C
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnNoModelImprovement
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecNormalize

from building_env import BuildingEnv
from building_plot_callback import BuildingPlotCallback

floor_area = 100
env = eval_env = BuildingEnv(
    heat_mass_capacity=165000 * floor_area,
    heat_transmission=200,
    maximum_cooling_power=-10000,
    maximum_heating_power=10000,
    time_step=timedelta(minutes=15),
    floor_area=floor_area
)



log_dir = "tmp/gym"
env = make_vec_env(lambda: env, n_envs=1, monitor_dir=log_dir)
eval_env = make_vec_env(lambda: eval_env, n_envs=1)
# monitor_env = Monitor(env, filename="tmp/gym/monitor.csv")
env = VecNormalize(env)
eval_env = VecNormalize(eval_env)
plot_callback = BuildingPlotCallback(log_dir)

# Stop training if there is no improvement after more than 3 evaluations
stop_train_callback = StopTrainingOnNoModelImprovement(max_no_improvement_evals=3, min_evals=1, verbose=1)
eval_callback = EvalCallback(eval_env, eval_freq=1000, callback_after_eval=stop_train_callback, verbose=1)

# %%
model = A2C("MlpPolicy", env).learn(total_timesteps=1000, callback=[plot_callback])

# %%
# monitor: Monitor = env.venv.envs[0]
# rewards = monitor.get_episode_rewards()
# plt.plot(rewards)
# plt.show()
#
# # %%
# episode_lengths = monitor.get_episode_lengths()
# episode_rewards = monitor.get_episode_rewards()
# avg_rewards = [i/j for i, j in zip(episode_rewards, episode_lengths)]
# plt.plot(avg_rewards)
# plt.yscale("log")
# plt.show()
#
# # %%
# # plot_output(plot_callback.temperatures, plot_callback.thermal_powers)
# temperatures = plot_callback.temperatures
# thermal_powers = plot_callback.thermal_powers
# BuildingPlotCallback.plot_output_2(temperatures, thermal_powers, label1="Temperature", label2="Thermal power")
# plt.show()
