# Import dependencies

In [2]:
import gymnasium as gym
from gymnasium import Env
from gymnasium.spaces import Discrete, Box

import numpy as np
import random
import os

from stable_baselines3 import DQN,A2C,PPO

from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy

# Objectives:

The objective of the project is to design, train and evaluate a framework for an agent in a low-energy environment. It seeks to efficiently manage the heat and air in the room to reduce energy consumption and improve occupant comfort. 

Leveraging reinforcement learning, the goal is to train agents to dynamically adjust controls based on factors such as occupancy, solar radiation, energy availability, and so on. 

Ultimately, the project aims to help achieve optimal energy without compromising comfort, as likewise sustainable building design.

# The Environment

In [None]:
import numpy as np
import random
import gym
import gym.spaces as spaces

class EnergySavingEnv(gym.Env):
    def __init__(self):
        # Define the action space
        self.action_space = spaces.Discrete(28)  # Total number of actions: 7 temperature adjustments * 4 airflow adjustments
        
        # Define the observation space with normalized ranges
        self.observation_space = spaces.Box(low=np.array([-1, -1]), high=np.array([1, 1]), dtype=np.float32)  # Normalized observation space: temperature and energy level

        # Define the initial temperature (random between 24 and 26 degrees Celsius)
        self.initial_temperature = np.random.uniform(24, 26)

        # Set the current temperature to the initial temperature
        self.temperature = self.initial_temperature

        # Initialize energy level
        self.energy_level = 60

        # Define the ideal temperature
        self.ideal_temperature = 22

        # Define the resource manager properties
        self.energy_capacity = 100
        self.energy_threshold = 30  # Energy threshold for energy-saving mode
        self.energy_usage_factor = 2  # Factor to control energy usage for temperature adjustment
        self.base_airflow = 0  # Base airflow without energy consumption
        self.max_airflow = 3  # Maximum airflow that can be achieved with energy consumption
        self.airflow_energy_consumption = 5  # Energy consumption per unit increase in airflow

        # Define the number of time steps for the resource manager
        self.time_steps = 100

        # Initialize the current time step
        self.current_step = 0

        self.cleaning_energy_consumption = 5  # Energy consumed during home cleaning

        # Define the frequency of home cleaning (once a day/episode)
        self.cleaning_frequency = 1

        # Add solar energy parameters
        self.solar_panel_efficiency = 0.18  # Efficiency of solar panels
        self.solar_panel_area = 25  # Area covered by solar panels in square meters

        # Define outside electricity cost
        self.base_electricity_cost = 1  # Base cost per unit of electricity

        # Initialize solar radiation parameters
        self.solar_radiation_variation = 100  # Maximum variation in solar radiation
        self.solar_radiation_mean = 700  # Mean solar radiation during the day
        self.randomness_factor = 0.1  # Factor to control randomness

        # Define the maximum number of people in the room
        self.max_people = 5

        # Initialize the number of people in the room
        self.num_people = 0

        # Define the frequency of adding/removing people (every 20 time steps)
        self.add_remove_frequency = 25

        # Exploration parameters
        self.epsilon = 0.1  # Exploration rate
        self.min_epsilon = 0.01  # Minimum exploration rate
        self.epsilon_decay = 0.99  # Exploration decay rate

    def get_solar_radiation(self, time_of_day):
        # Simulate time-dependent solar radiation
        # For simplicity, we'll assume a pattern with variations based on time of day

        # Define solar radiation ranges for different times of the day
        if time_of_day < 6:  # Night
            solar_radiation = np.random.uniform(0, 100)
        elif time_of_day < 9:  # Early morning
            solar_radiation = np.random.uniform(500, 700)
        elif time_of_day < 15:  # Afternoon
            solar_radiation = np.random.uniform(700, 1000)
        elif time_of_day < 19:  # Evening
            solar_radiation = np.random.uniform(500, 700)
        else:  # Night
            solar_radiation = 0

        # Introduce randomness with occasional complete randomness
        if random.random() < self.randomness_factor:
            solar_radiation = np.random.uniform(self.solar_radiation_mean - self.solar_radiation_variation,
                                                 self.solar_radiation_mean + self.solar_radiation_variation)

        return solar_radiation

    def step(self, action):
        temperature_action = action // 4 + 1  # Temperature adjustment
        airflow_action = action % 4  # Airflow adjustment

        # Determine the temperature change based on the action
        temp_change = 0
        if temperature_action == 1:  # Decrease temperature by 1 degree
            temp_change = -1
        elif temperature_action == 2:  # Keep temperature (no change)
            temp_change = 0
        elif temperature_action == 3:  # Increase temperature by 1 degree
            temp_change = 1
        elif temperature_action == 4:  # Decrease temperature by 2 degrees
            temp_change = -2
        elif temperature_action == 5:  # Increase temperature by 2 degrees
            temp_change = 2
        elif temperature_action == 6:  # Decrease temperature by 3 degrees
            temp_change = -3
        elif temperature_action == 7:  # Increase temperature by 3 degrees
            temp_change = 3

        # Apply the temperature change
        self.temperature += temp_change

        # Clip temperature within the valid range
        self.temperature = max(16, min(33, self.temperature))

        # Calculate energy produced by solar panels
        solar_radiation = self.get_solar_radiation(self.current_step / self.time_steps * 24)
        solar_energy = self.solar_panel_area * self.solar_panel_efficiency * solar_radiation

        # Calculate energy usage for temperature adjustment
        energy_usage = (abs(temp_change)) * self.energy_usage_factor

        # Adjust airflow based on the action and number of people in the room
        airflow = self.base_airflow + airflow_action * (self.max_airflow / (self.action_space.n // 4 - 1))
        airflow_energy_usage = 0  # Initialize airflow energy usage
        
        # Adjust airflow energy consumption based on the number of people in the room
        if self.num_people > 0:
            airflow_energy_usage = self.airflow_energy_consumption * self.num_people

        # Calculate net energy (energy usage - solar energy)
        self.energy_level = min(self.energy_level + solar_energy, self.energy_capacity)
        self.energy_level -= (energy_usage + airflow_energy_usage)

        # Clip energy level within the valid range
        self.energy_level = max(0, min(self.energy_capacity, self.energy_level))

        # Calculate reward based on energy-saving state and temperature
        temperature_reward = -0.5 * (abs(self.temperature - self.ideal_temperature) ** 2)

        if self.energy_level >= self.energy_threshold:
            energy_reward = 1  # Reward for maintaining energy above threshold
        else:
            energy_reward = -1

        reward = temperature_reward + energy_reward

        # Check if the episode is done
        done = self.current_step >= self.time_steps

        # Increment the current time step
        self.current_step += 1

        # Add or remove people from the room every 20 time steps
        if self.current_step % self.add_remove_frequency == 0:
            if random.random() < 0.5:  # Randomly decide whether to add or remove people
                if self.num_people < self.max_people:
                    self.num_people += 1
            else:
                if self.num_people > 0:
                    self.num_people -= 1

        # Exploration decay
        self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

        # Choose action with ε-greedy exploration
        if random.random() < self.epsilon:
            action = self.action_space.sample()

        # Additional info can be an empty dictionary
        info = {}

        # Normalize observation space
        normalized_observation = np.array([(self.temperature - 16) / 17, self.energy_level / self.energy_capacity])

        # Return the next state, reward, whether the episode is done, and additional info
        return normalized_observation, reward, done, info

    def reset(self):
        # Reset the current time step
        self.current_step = 0
        
        # Randomly generate the number of people in the room
        self.num_people = random.randint(0, self.max_people)

        # Check if it's time for home cleaning
        if self.current_step % self.cleaning_frequency == 0:
            # Consume energy for home cleaning
            self.energy_level -= self.cleaning_energy_consumption

            # Clip energy level within the valid range
            self.energy_level = max(0, min(self.energy_capacity, self.energy_level))

        # Normalize observation space
        normalized_observation = np.array([(self.temperature - 16) / 17, self.energy_level / self.energy_capacity])

        # Return the initial state
        return normalized_observation

# Test the Environment

In [None]:
env_energy_saving = EnergySavingEnv()
env_energy_saving.observation_space.sample()

In [None]:
env_energy_saving.reset()

In [None]:
episodes = 2
for episode in range(1, episodes+1):
  obs = env_energy_saving.reset()
  done = False
  score = 0

  while not done:
    # env_energy_saving.render()
    action = env_energy_saving.action_space.sample()
    obs, reward, done, info = env_energy_saving.step(action)
    score += reward

  print('Episode: {} Score {}'.format(episode, score))

# Train model

We are going to train the model with different agents/models and compare the results obtained.

In [None]:
log_path = os.path.join('Training', 'Logs')

### DQN

In [None]:
energy_saving_model = DQN('MlpPolicy', env_energy_saving, verbose=1, tensorboard_log=log_path)

In [None]:
energy_saving_model.learn(total_timesteps=250000)

Saving the model:

In [None]:
path = os.path.join('Training', 'Saved Models', f'DQN_250k_env_room')
energy_saving_model.save(path)
del energy_saving_model

In [None]:
energy_model = DQN.load(path, env_energy_saving)

### Evaluation:

In [None]:
mean_reward, std_reward = evaluate_policy(energy_model, env_energy_saving, n_eval_episodes=10, render=True)

print(f"Mean reward:{mean_reward:.2f} +/- {std_reward:.2f}")

In [None]:
episodes = 5

for episode in range(1, episodes+1):
    obs = env_energy_saving.reset() 
    done = False
    score = 0

    while not done:
        # obs = obs.reshape((1, -1))
        action, _ = energy_model.predict(obs)  
        obs, reward, done, info = env_energy_saving.step(action)
        score += reward

    print('Episode: {} Score {}'.format(episode, score))

### Action-Critic Model

In [None]:
energy_saving_model = A2C('MlpPolicy', env_energy_saving, verbose=1, tensorboard_log=log_path)

In [None]:
energy_saving_model.learn(total_timesteps=250000)

In [None]:
path = os.path.join('Training', 'Saved Models', f'A2C_250k_env_room')
energy_saving_model.save(path)

del energy_saving_model

In [None]:
energy_model = A2C.load(path, env_energy_saving)

#### Evalutaion

In [None]:
mean_reward, std_reward = evaluate_policy(energy_model, env_energy_saving, n_eval_episodes=10, render=True)

print(f"Mean reward:{mean_reward:.2f} +/- {std_reward:.2f}")

In [None]:
episodes = 5

for episode in range(1, episodes+1):
    obs = env_energy_saving.reset() 
    done = False
    score = 0

    while not done:
        # obs = obs.reshape((1, -1))
        action, _ = energy_model.predict(obs)  
        obs, reward, done, info = env_energy_saving.step(action)
        score += reward

    print('Episode: {} Score {}'.format(episode, score))

### PPO

In [None]:
energy_saving_model = PPO('MlpPolicy', env_energy_saving, verbose=1, tensorboard_log=log_path)

In [None]:
energy_saving_model.learn(total_timesteps=250000)

In [None]:
path = os.path.join('Training', 'Saved Models', f'PPO_250k_env_room')
energy_saving_model.save(path)

del energy_saving_model

In [None]:
energy_model = PPO.load(path, env_energy_saving)

#### Evalutation:

In [None]:
mean_reward, std_reward = evaluate_policy(energy_model, env_energy_saving, n_eval_episodes=10, render=True)

print(f"Mean reward:{mean_reward:.2f} +/- {std_reward:.2f}")

In [None]:
episodes = 5

for episode in range(1, episodes+1):
    obs = env_energy_saving.reset() 
    done = False
    score = 0

    while not done:
        # obs = obs.reshape((1, -1))
        action, _ = energy_model.predict(obs)  
        obs, reward, done, info = env_energy_saving.step(action)
        score += reward

    print('Episode: {} Score {}'.format(episode, score))