#Setup

In [None]:
!apt-get install ffmpeg freeglut3-dev xvfb  # For visualization
!pip install "stable_baselines3==2.5.0"

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
freeglut3-dev is already the newest version (2.8.1-6).
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.13).
0 upgraded, 0 newly installed, 0 to remove and 29 not upgraded.


In [None]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from stable_baselines3.common.env_checker import check_env


#Environment

In [None]:
def seasonal_duck_curve(hour, season, noise_scale=0.03):
    """ Computes electricity demand based on seasonal duck curve """
    A = 400
    seasonal_params = {
        1: (100, 7, 80, 12, 120, 14, 250, 18, 30, 4),  # Summer
        2: (180, 6, 40, 12, 80, 14, 220, 17, 60, 4),  # Winter
        3: (130, 7, 90, 12, 100, 14, 180, 18, 40, 4)  # Spring/Autumn
    }
    B, t_morning, C, t_dip, D, mu_dip, E, t_evening, F, t_early = seasonal_params[season]
    demand = A + B / (1 + np.exp(-1 * (hour - t_morning))) - C / (1 + np.exp(-1 * (hour - t_dip))) - D * np.exp(-((hour - mu_dip)**2) / 4) + E / (1 + np.exp(-1 * (hour - t_evening)))
    return demand * random.uniform(0.9, 1.1)

def electricity_price_function(hour, season, demand, noise_scale=0.03):
    """ Computes electricity price based on seasonal demand """
    season_params = {
        1: (30, 15, 10),  # Summer
        2: (28, 14, 9),   # Winter
        3: (25, 12, 8)    # Spring/Autumn
    }
    A_q, B_q, C_q = season_params[season]
    base_price = A_q + B_q * np.cos(2 * np.pi * hour / 24) + C_q * np.cos(4 * np.pi * hour / 24)
    return max(base_price * random.uniform(0.9, 1.1), 0)


In [None]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
import random

class ElectricityMarketEnv(gym.Env):
    """
    Custom Gym environment for an electricity market with battery storage.
    The agent decides when to charge/discharge to maximize profit.
    """
    def __init__(self, max_timesteps=365, degradation_rate=0.99):
        super(ElectricityMarketEnv, self).__init__()
        self.timestep = 0
        self.max_timesteps = max_timesteps
        self.season = self.get_season_from_timestep(0)  # Set initial season based on real months
        self.episode_count = 0  # Track training episodes
        self.degradation_rate = degradation_rate  # Battery degradation factor

        # Battery parameters
        self.initial_battery_capacity = 100  # Initial max storage capacity
        self.battery_capacity = self.initial_battery_capacity  # Maximum storage capacity
        self.battery_soc = 50  # Initial state of charge (SoC)
        self.efficiency = 0.95  # Charging/discharging efficiency

        # Electricity market parameters
        self.base_min_price, self.base_max_price = self._calculate_price_bounds()
        self.base_min_demand, self.base_max_demand = self._calculate_demand_bounds()

        self.min_price, self.max_price = self.base_min_price * 0.9, self.base_max_price * 1.1
        self.min_demand, self.max_demand = self.base_min_demand * 0.9, self.base_max_demand * 1.1


        # Action space: Charge (+) or discharge (-) within battery capacity
        self.action_space = spaces.Box(low=-self.battery_capacity, high=self.battery_capacity, shape=(1,), dtype=np.float32)

        # Observation space: [Battery SoC, Battery Capacity, Electricity Price, Demand, Hour, Season]
        self.observation_space = spaces.Box(
            low=np.array([0, 50, self.min_price, self.min_demand, 0, 1], dtype=np.float32),
            high=np.array([self.initial_battery_capacity, self.initial_battery_capacity, self.max_price, self.max_demand, 23, 3], dtype=np.float32),
            dtype=np.float32
        )

        self.reset()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.battery_capacity = self.initial_battery_capacity  # Reset battery capacity
        self.timestep = 0
        self.battery_soc = 50  # Reset battery SoC
        self.season = self.get_season_from_timestep(self.timestep)  # Set season dynamically
        self.episode_count += 1  # Track episode count
        return self._get_state(), {}

    def step(self, action):
        """
        Executes one step in the environment.
        Action: Charge (>0) or discharge (<0) electricity.
        """
        self.timestep += 1
        self.season = self.get_season_from_timestep(self.timestep)
        done = self.timestep >= self.max_timesteps or self.battery_soc <= 0

        self.battery_capacity *= self.degradation_rate
        self.battery_capacity = max(self.battery_capacity, 50)  # Minimum capacity limit

        # Get new price and demand based on season
        hour = self.timestep % 24
        demand = seasonal_duck_curve(hour, self.season)
        price = electricity_price_function(hour, self.season, demand)

        # Clip action to valid range (-battery_capacity, +battery_capacity)
        action = np.clip(action[0], -self.battery_capacity, self.battery_capacity)

        # Charge/discharge the battery
        if action > 0:  # Charging (cost money)
            charge_amount = min(action, self.battery_capacity - self.battery_soc)
            cost = charge_amount * price / self.efficiency
            self.battery_soc += charge_amount * self.efficiency
            reward = -cost  # Negative reward for spending money
            #print(f"Step {self.timestep}, Season: {self.get_season_name(self.season)}: Charging {charge_amount:.2f} units at price {price:.2f}. Cost: {cost:.2f}. SoC: {self.battery_soc:.2f}.")
        else:  # Discharging (sell to market)
            discharge_amount = min(-action, self.battery_soc, demand)
            revenue = discharge_amount * price * self.efficiency
            self.battery_soc -= discharge_amount / self.efficiency
            reward = revenue  # Positive reward for selling
            #print(f"Step {self.timestep}, Season: {self.get_season_name(self.season)}: Discharging {discharge_amount:.2f} units at price {price:.2f}. Revenue: {revenue:.2f}. SoC: {self.battery_soc:.2f}.")

        next_state = np.array([self.battery_soc, self.battery_capacity, price, demand, hour, self.season], dtype=np.float32)
        return next_state, reward, done, False, {}

    def _get_state(self):
        """ Returns the current state: [SoC, Price, Demand, Hour, Season] """
        hour = self.timestep % 24
        demand = seasonal_duck_curve(hour, self.season)
        price = electricity_price_function(hour, self.season, demand)
        #print(f"State - SoC: {self.battery_soc:.2f}, Price: {price:.2f}, Demand: {demand:.2f}, Hour: {hour}, Season: {self.season}.")
        return np.array([self.battery_soc, self.battery_capacity, price, demand, hour, self.season], dtype=np.float32)



    def get_season_from_timestep(self, timestep):
        """ Determines season based on day of the year """
        month = (timestep // 30) % 12  # Approximate month from timestep
        if month in [11, 0, 1]:
            return 2  # Winter
        elif month in [5, 6, 7]:
            return 1  # Summer
        else:
            return 3  # Spring/Autumn

    def get_season_name(self, season):
        return {1: "Summer", 2: "Winter", 3: "Spring/Autumn"}.get(season, "Unknown")


    def evaluate_agent(self, agent, num_episodes=3):
        """ Evaluates the agent over multiple episodes and prints performance metrics per season. """
        total_rewards = []
        season_rewards = {1: [], 2: [], 3: []}

        for ep in range(num_episodes):
            state, _ = self.reset()
            done = False
            episode_reward = 0
            seasonal_reward = {1: 0, 2: 0, 3: 0}

            while not done:
                action, _ = agent.predict(state)
                state, reward, done, _, _ = self.step(action)
                episode_reward += reward
                seasonal_reward[self.season] += reward

            total_rewards.append(episode_reward)
            for season in seasonal_reward:
                season_rewards[season].append(seasonal_reward[season])

            print(f"Episode {ep + 1}: Total Reward = {episode_reward:.2f}")

        avg_reward = np.mean(total_rewards)
        avg_seasonal_rewards = {season: np.mean(rewards) if rewards else 0 for season, rewards in season_rewards.items()}

        print(f"\nEvaluation Results:")
        print(f"Average Reward over {num_episodes} episodes: {avg_reward:.2f}")
        print(f"Seasonal Performance:")
        for season, avg in avg_seasonal_rewards.items():
            print(f"  {self.get_season_name(season)}: {avg:.2f}")

        return avg_reward, avg_seasonal_rewards

    def _calculate_price_bounds(self):
        """ Determine min/max price dynamically based on electricity_price_function """
        min_price = float('inf')
        max_price = float('-inf')
        for season in [1, 2, 3]:
            for hour in range(24):
                demand = seasonal_duck_curve(hour, season)
                price = electricity_price_function(hour, season, demand)
                min_price = min(min_price, price)
                max_price = max(max_price, price)
        return min_price, max_price

    def _calculate_demand_bounds(self):
        """ Determine min/max demand dynamically based on seasonal_duck_curve """
        min_demand = float('inf')
        max_demand = float('-inf')
        for season in [1, 2, 3]:
            for hour in range(24):
                demand = seasonal_duck_curve(hour, season)
                min_demand = min(min_demand, demand)
                max_demand = max(max_demand, demand)
        return min_demand, max_demand



In [None]:
env = ElectricityMarketEnv()
obs, _ = env.reset()



#Evaluation Functions


In [None]:
from stable_baselines3.common.evaluation import evaluate_policy


#Lets start with training

In [None]:
SEEDS = [22, 68, 34, 90, 45]
steps = 1000000

In [None]:
from stable_baselines3 import PPO, A2C, SAC, TD3, DDPG

In [None]:
from stable_baselines3.common.callbacks import CheckpointCallback, CallbackList

# DDPG

In [None]:
!pip install --upgrade numpy cloudpickle




In [None]:
!pip install torch==2.6.0




In [None]:

model_type = "DDPG"
ddpg = DDPG("MlpPolicy", env, verbose=0)
ppo_model = DDPG.load("./DDPG_22_1000000_800000_steps", env=env, custom_objects={'observation_space': ddpg.observation_space, 'action_space': ddpg.action_space})
env.evaluate_agent(ppo_model, num_episodes=365)

Episode 1: Total Reward = 2473.09
Episode 2: Total Reward = 2109.35
Episode 3: Total Reward = 2439.66
Episode 4: Total Reward = 2083.98
Episode 5: Total Reward = 2259.57
Episode 6: Total Reward = 2359.34
Episode 7: Total Reward = 2082.73
Episode 8: Total Reward = 2235.52
Episode 9: Total Reward = 2267.57
Episode 10: Total Reward = 2233.16
Episode 11: Total Reward = 2431.97
Episode 12: Total Reward = 2080.42
Episode 13: Total Reward = 2007.47
Episode 14: Total Reward = 2013.83
Episode 15: Total Reward = 2228.43
Episode 16: Total Reward = 2340.74
Episode 17: Total Reward = 2438.43
Episode 18: Total Reward = 2326.15
Episode 19: Total Reward = 2057.54
Episode 20: Total Reward = 2048.88
Episode 21: Total Reward = 2390.29
Episode 22: Total Reward = 2220.73
Episode 23: Total Reward = 2114.55
Episode 24: Total Reward = 2299.41
Episode 25: Total Reward = 2021.80
Episode 26: Total Reward = 2114.10
Episode 27: Total Reward = 1997.88
Episode 28: Total Reward = 2247.31
Episode 29: Total Reward = 20

(np.float64(2221.2560383576215),
 {1: np.float64(0.0), 2: np.float64(2221.2560383576215), 3: np.float64(0.0)})

#PPO Snapshot Assemble

Lets create plosts based on eta

In [None]:
import gymnasium as gym
import numpy as np
import torch
from torch.optim.lr_scheduler import CosineAnnealingLR
from stable_baselines3 import A2C
from stable_baselines3.common.vec_env import DummyVecEnv
import matplotlib.pyplot as plt

# ---- Hyperparameters ---- #
TOTAL_TIMESTEPS = 10000
SNAPSHOT_CYCLES = 5
CYCLE_LENGTH = TOTAL_TIMESTEPS // SNAPSHOT_CYCLES
ETA_MIN_VALUES = [1e-3]

env = DummyVecEnv([lambda: ElectricityMarketEnv()])

def train_and_evaluate(seed):
    model = PPO('MlpPolicy', env, verbose=0, seed=seed)
    optimizer = model.policy.optimizer

    snapshots = []

    timesteps = 0
    while timesteps < TOTAL_TIMESTEPS:
        model.learn(total_timesteps=CYCLE_LENGTH, reset_num_timesteps=False)
        timesteps += CYCLE_LENGTH

        snapshot = model.policy.state_dict()
        snapshots.append(snapshot)
        print(f"Snapshot {len(snapshots)} saved at timestep {timesteps}")

    def ensemble_predict(observation):
        with torch.no_grad():
            actions = []
            values = []
            log_probs = []
            obs_tensor = torch.tensor(observation, dtype=torch.float32)

            for snapshot in snapshots:
                model.policy.load_state_dict(snapshot)
                action, value, log_prob = model.policy.forward(obs_tensor)

                actions.append(action.numpy())
                values.append(value.numpy())
                log_probs.append(log_prob.numpy())

            avg_action = np.mean(actions, axis=0)

            avg_value = np.mean(values)
            avg_log_prob = np.mean(log_probs)

            return avg_action.flatten()

    eval_env = ElectricityMarketEnv()
    obs, _ = eval_env.reset()
    total_rewards = []

    for episode in range(50):
        done = False
        episode_reward = 0
        obs, _ = eval_env.reset()

        while not done:
            action = ensemble_predict(obs.reshape(1, -1))
            obs, reward, done, truncated, _ = eval_env.step(action)
            episode_reward += reward
            if done or truncated:
                break

        total_rewards.append(episode_reward)

    avg_reward = np.mean(total_rewards)
    return avg_reward

avg_rewards = []
for seed in SEEDS:
      avg_reward = train_and_evaluate(seed)

      avg_rewards.append(avg_reward)
      print(seed)

avg_rewards

Snapshot 1 saved at timestep 2000
Snapshot 2 saved at timestep 4000
Snapshot 3 saved at timestep 6000


KeyboardInterrupt: 