In [1]:
import gymnasium
import solar_plant_gym_env
import pandas as pd
env = gymnasium.make('solar_plant_gym_env/SolarPlant')


  logger.warn(


In [2]:
env.action_space

Discrete(3)

In [3]:
env.observation_space

Box(0.0, 1.0, (3,), float32)

In [4]:
import matplotlib.pyplot as plt

def plot_power_vs_time(df):
    """
    Plots the current power, battery charge, and total output power against the current time.
    
    Args:
        df: A pandas DataFrame containing columns: 'current_time', 'current_power', 'battery_charge', 'total_output_power'
    """
    # Ensure that the 'current_time' is in the correct format
    df['current_time'] = pd.to_datetime(df['current_time'], unit='s')

    # Create the plot
    plt.figure(figsize=(10, 6))

    # Plot each of the columns
    plt.plot(df['current_time'], df['current_power'], label='Current Power', color='b', linestyle='-', marker='o')
    plt.plot(df['current_time'], df['battery_charge'], label='Battery Charge', color='g', linestyle='-', marker='x')
    plt.plot(df['current_time'], df['total_output_power'], label='Total Output Power', color='r', linestyle='-', marker='s')

    # Label the axes
    plt.xlabel('Current Time', fontsize=12)
    plt.ylabel('Power (W)', fontsize=12)

    # Add a title
    plt.title('Power and Battery Charge vs Time', fontsize=14)

    # Display the legend
    plt.legend()

    # Show the plot
    plt.grid(True)
    plt.tight_layout()
    plt.show()


In [5]:
from collections import defaultdict
import gymnasium as gym
import numpy as np


class SolarBatteryAgent:
    def __init__(
        self,
        env: gym.Env,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            env: The training environment
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            discount_factor: The discount factor for computing the Q-value
        """
        self.env = env
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def get_action(self, obs: tuple[float, float, float]) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        obs = tuple(obs)
        # with probability epsilon return a random action to explore the environment
        if np.random.random() < self.epsilon:
            return self.env.action_space.sample()
        # with probability (1 - epsilon) act greedily (exploit)
        else:
            return int(np.argmax(self.q_values[obs]))

    def update(
        self,
        obs: tuple[float, float, float],
        action: int,
        reward: float,
        terminated: bool,
        next_obs: tuple[float, float, float],
    ):
        """Updates the Q-value of an action."""
        # Convert numpy arrays to tuples for dictionary keys
        obs = tuple(obs)
        next_obs = tuple(next_obs)
        # print (obs)
        # print (next_obs)
        
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[obs][action]
        )

        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

In [6]:
import gymnasium as gym
import solar_plant_gym_env

# hyperparameters
learning_rate = 0.01
n_episodes = 100
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)  # reduce the exploration over time
final_epsilon = 0.1

env = gym.make('solar_plant_gym_env/SolarPlant',render_mode="human")

env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=n_episodes)

agent = SolarBatteryAgent(
    env=env,
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

  logger.warn(


In [7]:
from tqdm import tqdm

test_df= pd.DataFrame()

for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False
    # test_df = info['info_df']
    # plot_power_vs_time(info['info_df'])
    # play one episode
    while not done:
        action = agent.get_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)
        rendered = env.render()
        print(rendered)

        # update the agent
        agent.update(obs, action, reward, terminated, next_obs)

        # update if the environment is done and the current obs
        done = terminated or truncated
        obs = next_obs

    agent.decay_epsilon()

  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")
  0%|          | 0/100 [00:00<?, ?it/s]


AttributeError: 'SolarPlant' object has no attribute '_render_frame'

In [None]:
test_df

In [None]:
# Test the plot function
plot_power_vs_time(test_df)


In [None]:
from matplotlib import pyplot as plt
# visualize the episode rewards, episode length and training error in one figure
fig, axs = plt.subplots(1, 3, figsize=(20, 8))

# np.convolve will compute the rolling mean for 100 episodes

axs[0].plot(np.convolve(env.return_queue, np.ones(100)))
axs[0].set_title("Episode Rewards")
axs[0].set_xlabel("Episode")
axs[0].set_ylabel("Reward")

axs[1].plot(np.convolve(env.length_queue, np.ones(100)))
axs[1].set_title("Episode Lengths")
axs[1].set_xlabel("Episode")
axs[1].set_ylabel("Length")

axs[2].plot(np.convolve(agent.training_error, np.ones(100)))
axs[2].set_title("Training Error")
axs[2].set_xlabel("Episode")
axs[2].set_ylabel("Temporal Difference")

plt.tight_layout()
plt.show()