In [59]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from scipy import stats


def generate_demand_profile(num_steps, base_demand):
    demand_profile = np.ones(num_steps) * base_demand
    return demand_profile


def generate_solar_forecast_profile(num_steps, base_solar, solar_std, daylight_hours):
    solar_forecast_profile = np.zeros(num_steps)
    # Generate solar forecast during daylight hours
    solar_forecast_profile[daylight_hours[0] : daylight_hours[1]] = np.random.normal(
        loc=base_solar, scale=solar_std, size=daylight_hours[1] - daylight_hours[0]
    )
    return solar_forecast_profile


class ElectricityDispatchEnv(gym.Env):
    def __init__(self):
        self.action_space = gym.spaces.Box(
            low=-1, high=1, shape=(4,)
        )  # Grid, battery, generator, solar actions
        self.observation_space = gym.spaces.Box(
            low=0, high=np.inf, shape=(4,)
        )  # Demand, solar forecast, battery state, cost

        self.max_steps = 96  # 15-minute intervals for 24 hours
        self.step_count = 0

        self.demand = None
        self.solar_forecast = None
        self.battery_state = 0
        self.cost = 0
        self.num_steps = 96
        self.base_demand = 150
        self.demand_std = 30
        self.base_solar = 80
        self.solar_std = 20
        self.daylight_hours = [24, 72]
        self.grid_usage = []
        self.battery_usage = []
        self.generator_usage = []
        self.peak_hours = [36, 48, 60]
        self.grid_price_low = 0.05
        self.grid_price_high = 0.15
        self.generator_price_low = 0.08
        self.generator_price_high = 0.12
        self.battery_price_low = 0.005
        self.battery_price_high = 0.015
        self.battery_capacity = 200
        self.generator_min_output = 20
        self.generator_max_output = 100

    def reset(self):
        self.step_count = 0
        self.battery_state = 0
        self.cost = 0

        self.demand = self._generate_demand()
        self.solar_forecast = self._generate_solar_forecast()
        self.battery_state = self.battery_capacity
        self.grid_usage = []
        self.battery_usage = []
        self.generator_usage = []
        return np.array(
            [self.demand[0], self.solar_forecast[0], self.battery_state, self.cost]
        )

    def step(self, action):
        grid_action = np.clip(action[0], 0, 1)
        battery_action = np.clip(action[1], -1, 1)
        generator_action = np.clip(action[2], 0, 1)
        solar_action = np.clip(action[3], 0, 1)

        solar_supply = solar_action * self.solar_forecast[self.step_count]
        grid_supply = grid_action * (self.demand[self.step_count] - solar_supply)

        if self.battery_state < self.battery_capacity:
            battery_supply = battery_action * (
                self.battery_capacity - self.battery_state
            )
        else:
            battery_supply = 0

        generator_output = self.generator_min_output + generator_action * (
            self.generator_max_output - self.generator_min_output
        )
        generator_supply = generator_output * (
            self.demand[self.step_count] - solar_supply - grid_supply - battery_supply
        )

        # Ensure the total supply meets the demand
        total_supply = solar_supply + grid_supply + battery_supply + generator_supply
        if total_supply < self.demand[self.step_count]:
            # If total supply is less than demand, prioritize sources
            remaining_demand = self.demand[self.step_count] - total_supply
            if remaining_demand > 0:
                grid_supply += min(
                    remaining_demand,
                    self.demand[self.step_count]
                    - solar_supply
                    - battery_supply
                    - generator_supply,
                )
                remaining_demand = (
                    self.demand[self.step_count]
                    - solar_supply
                    - grid_supply
                    - battery_supply
                    - generator_supply
                )
            if remaining_demand > 0:
                generator_supply += min(
                    remaining_demand,
                    self.demand[self.step_count]
                    - solar_supply
                    - grid_supply
                    - battery_supply,
                )

        self.grid_usage.append(grid_supply)
        self.battery_usage.append(battery_supply)
        self.generator_usage.append(generator_supply)

        # Update prices based on fluctuations
        grid_price = (
            self.grid_price_high
            if self.step_count in self.peak_hours
            else self.grid_price_low
        )
        generator_price = np.random.uniform(
            self.generator_price_low, self.generator_price_high
        )
        battery_price = np.random.uniform(
            self.battery_price_low, self.battery_price_high
        )

        self.cost += (
            grid_supply * grid_price
            + generator_supply * generator_price
            + battery_supply * battery_price
        )

        reward = -self.cost  # Negative cost as reward

        # Encourage the use of renewable sources
        reward += solar_supply * 0.05
        reward += battery_supply * 0.02

        # Penalize excessive grid usage
        if grid_supply > self.demand[self.step_count] * 0.6:
            reward -= (grid_supply - self.demand[self.step_count] * 0.6) * 0.1

        # Reward for meeting the demand
        if total_supply >= self.demand[self.step_count]:
            reward += 20
        else:
            reward -= 20 * (self.demand[self.step_count] - total_supply)

        # Update battery state considering charging and discharging efficiency
        self.battery_state = np.clip(
            self.battery_state - battery_supply * (1 if battery_supply > 0 else 0.9),
            0,
            self.battery_capacity,
        )

        self.step_count += 1
        done = self.step_count >= self.max_steps

        next_state = np.array(
            [
                self.demand[self.step_count] if not done else 0,
                self.solar_forecast[self.step_count] if not done else 0,
                self.battery_state,
                self.cost,
            ]
        )

        return next_state, reward, done, {}

    def _generate_demand(self):
        demand = generate_demand_profile(self.num_steps, self.base_demand)
        demand += np.random.normal(0, self.demand_std, self.num_steps)
        return demand

    def _generate_solar_forecast(self):
        solar_forecast = generate_solar_forecast_profile(
            self.num_steps, self.base_solar, self.solar_std, self.daylight_hours
        )
        solar_forecast += np.random.normal(0, self.solar_std, self.num_steps)
        return solar_forecast


class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        q_values = self.fc3(x)
        return q_values


class DQNAgent:
    def __init__(self, state_dim, action_dim, lr, gamma, epsilon):
        self.model = DQN(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.gamma = gamma
        self.epsilon = epsilon

    def act(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.uniform(-1, 1, size=4)
        state_tensor = torch.FloatTensor(state)
        q_values = self.model(state_tensor)
        action = q_values.detach().numpy()
        return action

    def train(self, state, action, reward, next_state, done):
        state_tensor = torch.FloatTensor(state)
        action_tensor = torch.FloatTensor(action)
        reward_tensor = torch.FloatTensor(reward).unsqueeze(1)
        next_state_tensor = torch.FloatTensor(next_state)
        done_tensor = torch.FloatTensor(done).unsqueeze(1)

        q_values = self.model(state_tensor)
        next_q_values = self.model(next_state_tensor)

        q_value = q_values.gather(1, action_tensor.argmax(1).unsqueeze(1))
        next_q_value, _ = next_q_values.max(1, keepdim=True)

        target = reward_tensor + (1 - done_tensor) * self.gamma * next_q_value

        loss = nn.MSELoss()(q_value, target.detach())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()


# Create the environment
env = ElectricityDispatchEnv()

# Define the DQN agent
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
lr = 0.001
gamma = 0.99
epsilon = 0.1
epsilon_decay = 0.995
dqn_agent = DQNAgent(state_dim, action_dim, lr, gamma, epsilon)

# Run the simulation with DQN agent
num_episodes = 2000
batch_size = 64

total_costs = []

for episode in range(num_episodes):
    state = env.reset()
    done = False
    total_reward = 0

    replay_buffer = []
    grid_prices = []
    solar_prices = []
    battery_prices = []
    generator_prices = []

    while not done:
        action = dqn_agent.act(state)
        next_state, reward, done, _ = env.step(action)
        replay_buffer.append((state, action, reward, next_state, done))

        if len(replay_buffer) > batch_size:
            batch = random.sample(replay_buffer, batch_size)
            states, actions, rewards, next_states, dones = zip(*batch)
            dqn_agent.train(states, actions, rewards, next_states, dones)

        state = next_state
        total_reward += reward

        grid_prices.extend([env.grid_price_low] * int(env.grid_usage[-1]))
        solar_prices.extend([0] * int(env.solar_forecast[-1]))
        battery_prices.extend(
            [
                (
                    env.battery_price_low
                    if env.battery_usage[-1] < 0
                    else env.battery_price_high
                )
            ]
            * int(abs(env.battery_usage[-1]))
        )
        generator_prices.extend(
            [
                env.generator_price_low
                + (env.generator_price_high - env.generator_price_low)
                * env.generator_usage[-1]
                / env.generator_max_output
            ]
            * int(env.generator_usage[-1])
        )

    epsilon *= epsilon_decay

    # Calculate total energy usage
    total_energy_usage = (
        sum(env.grid_usage)
        + sum(env.solar_forecast)
        + sum(env.battery_usage)
        + sum(env.generator_usage)
    )

    # Calculate percentage of each source
    grid_percentage = sum(env.grid_usage) / total_energy_usage * 100
    solar_percentage = sum(env.solar_forecast) / total_energy_usage * 100
    battery_percentage = sum(env.battery_usage) / total_energy_usage * 100
    generator_percentage = sum(env.generator_usage) / total_energy_usage * 100

    # Calculate energy used for charging the battery
    energy_used_for_charging = sum(usage for usage in env.battery_usage if usage < 0)

    # Calculate the mode of prices for each source
    # Calculate the mode of prices for each source
    if len(grid_prices) > 0:
        grid_price_mode = stats.mode(grid_prices, keepdims=True).mode[0]
    else:
        grid_price_mode = 0.0

    if len(solar_prices) > 0:
        solar_price_mode = stats.mode(solar_prices, keepdims=True).mode[0]
    else:
        solar_price_mode = 0.0

    if len(battery_prices) > 0:
        battery_price_mode = stats.mode(battery_prices, keepdims=True).mode[0]
    else:
        battery_price_mode = 0.0

    if len(generator_prices) > 0:
        generator_price_mode = stats.mode(generator_prices, keepdims=True).mode[0]
    else:
        generator_price_mode = 0.0

    total_cost = env.cost
    total_costs.append(total_cost)
    print(
        f"Episode {episode+1}: Total Reward = {total_reward:.2f}, Total Cost = ${total_cost:.2f}"
    )
    print(
        f"Grid Usage: {np.mean(env.grid_usage):.2f} MW ({grid_percentage:.2f}%), Mode Price = ${grid_price_mode:.2f}"
    )
    print(
        f"Solar Usage: {np.mean(env.solar_forecast):.2f} MW ({solar_percentage:.2f}%), Mode Price = ${solar_price_mode:.2f}"
    )
    print(
        f"Battery Usage: {np.mean(env.battery_usage):.2f} MW ({battery_percentage:.2f}%), Mode Price = ${battery_price_mode:.2f}"
    )
    print(
        f"Generator Usage: {np.mean(env.generator_usage):.2f} MW ({generator_percentage:.2f}%), Mode Price = ${generator_price_mode:.2f}"
    )
    print(f"Battery Capacity: {env.battery_capacity:.2f} MWh")
    print(f"Energy Used for Charging: {abs(energy_used_for_charging):.2f} MWh")
    print(f"Epsilon: {epsilon:.2f}")
    print("---")

print(f"Average Total Cost: ${np.mean(total_costs):.2f}")

Episode 1: Total Reward = -1314420.12, Total Cost = $27878.49
Grid Usage: 2.75 MW (0.09%), Mode Price = $0.05
Solar Usage: 43.84 MW (1.48%), Mode Price = $0.00
Battery Usage: 0.00 MW (0.00%), Mode Price = $0.00
Generator Usage: 2906.66 MW (98.42%), Mode Price = $2.33
Battery Capacity: 200.00 MWh
Energy Used for Charging: 0.00 MWh
Epsilon: 0.10
---
Episode 2: Total Reward = -1454209.94, Total Cost = $29841.21
Grid Usage: 5.21 MW (0.16%), Mode Price = $0.05
Solar Usage: 41.57 MW (1.31%), Mode Price = $0.00
Battery Usage: 0.00 MW (0.00%), Mode Price = $0.00
Generator Usage: 3117.12 MW (98.52%), Mode Price = $4.00
Battery Capacity: 200.00 MWh
Energy Used for Charging: 0.00 MWh
Epsilon: 0.10
---
Episode 3: Total Reward = -1564264.77, Total Cost = $32094.65
Grid Usage: 3.27 MW (0.10%), Mode Price = $0.05
Solar Usage: 35.91 MW (1.08%), Mode Price = $0.00
Battery Usage: 0.00 MW (0.00%), Mode Price = $0.00
Generator Usage: 3288.76 MW (98.82%), Mode Price = $4.80
Battery Capacity: 200.00 MWh
Ene