In [1]:
import numpy as np  # Numerical computations
import gym  # RL environment library
from gym import spaces  # Used to define state and action spaces
import matplotlib.pyplot as plt  # Used for visualization and plotting
import pandas as pd  # Used for handling datasets (e.g., weather data)
import random
from collections import deque

In [2]:
class ReplayBuffer:
    """
    Experience replay buffer to store and sample transitions.
    """
    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)
        self.capacity = capacity
    
    def add(self, state, action, reward, next_state, done):
        """Add experience to buffer"""
        experience = (state, action, reward, next_state, done)
        self.buffer.append(experience)
    
    def sample(self, batch_size):
        """Randomly sample batch of experiences"""
        # Make sure we don't sample more than buffer size
        batch_size = min(batch_size, len(self.buffer))
        
        # Sample random indices
        indices = random.sample(range(len(self.buffer)), batch_size)
        
        # Get samples
        states, actions, rewards, next_states, dones = [], [], [], [], []
        for i in indices:
            s, a, r, s_, d = self.buffer[i]
            states.append(s)
            actions.append(a)
            rewards.append(r)
            next_states.append(s_)
            dones.append(d)
        
        return (
            np.array(states), 
            np.array(actions), 
            np.array(rewards).reshape(-1, 1), 
            np.array(next_states),
            np.array(dones).reshape(-1, 1)
        )
    
    def __len__(self):
        """Return current buffer size"""
        return len(self.buffer)
    
    def is_ready(self, batch_size):
        """Check if buffer has enough experiences"""
        return len(self) >= batch_size



In [3]:
class HybridEnergyEnv(gym.Env):
    def __init__(self):
        super(HybridEnergyEnv, self).__init__()
        
        # Define state and action sizes
        self.state_size = 4  # [P_solar, P_wind, Energy demand, Grid price]
        self.action_size = 3  # [N_pv, N_wt, P_grid]
        
        # Action space bounds
        self.action_low = np.array([0, 0, 0])  # Min panels, min turbines, min grid power
        self.action_high = np.array([300, 50, 1000])  # Max panels, max turbines, max grid power
        
        # Define action and observation spaces
        self.action_space = spaces.Box(
            low=self.action_low,
            high=self.action_high,
            dtype=np.float32
        )
        
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0, 0]),  # Min values for state
            high=np.array([1000, 1000, 1000, 1]),  # Max values for state
            dtype=np.float32
        )
        
        # Emission factors (gCO2/kWh)
        self.EF_PV = 50  # Emission factor per solar panel
        self.EF_WT = 10  # Emission factor per wind turbine
        self.EF_grid = 300  # Emission factor for grid power
        
        # Initialize state
        self.current_state = None
        self.reset()

    def reset(self):
        """Reset the environment to an initial state."""
        self.current_state = np.array([0, 0, 1000, 0.1])  # Example initial state
        return self.current_state

    def estimate_solar_panel_cost(self, wattage, sunlight_hours_per_day, lifetime_years):
        """Estimate the cost per kWh of a solar panel over its lifetime."""
        if wattage==0: return 0
        else:
            cost_per_watt = 2.75  # Cost per watt ($/W)
            maintenance_cost_per_year = 25  # Maintenance cost ($/year)
            capital_cost = wattage * cost_per_watt  # Initial investment
            total_maintenance_cost = maintenance_cost_per_year * lifetime_years
            total_cost = capital_cost + total_maintenance_cost
            lifetime_energy_production_kwh = (wattage * sunlight_hours_per_day * 365 * lifetime_years) / 1000
            return total_cost / lifetime_energy_production_kwh  # Cost per kWh

    def estimate_wind_turbine_cost(self, rated_power_kw, capacity_factor, lifetime_years):
        """Estimate the cost per kWh of a wind turbine over its lifetime."""
        if rated_power_kw==0: return 0
        else:
            cost_per_kw = 1500  # Cost per kW ($/kW)
            maintenance_cost_per_year = 50  # Maintaenance cost ($/year)
            capital_cost = rated_power_kw * cost_per_kw  # Initial investment
            total_maintenance_cost = maintenance_cost_per_year * lifetime_years
            total_cost = capital_cost + total_maintenance_cost
            lifetime_energy_production_kwh = rated_power_kw * capacity_factor * 24 * 365 * lifetime_years
            return total_cost / lifetime_energy_production_kwh  # Cost per kWh

    def step(self, action):
        """Execute one time step in the environment."""
        # Extract action components
        N_pv = int(action[0])  # Number of PV panels (integer)
        N_wt = int(action[1])  # Number of wind turbines (integer)
        P_grid_action = action[2]  # Grid power (continuous)
        
        # Extract current state
        P_solar, P_wind, energy_demand, grid_price = self.current_state
        
        # Compute energy generated (with efficiency)
        P_pv = N_pv * P_solar * 0.2  # 20% efficiency for solar
        P_wt = N_wt * P_wind * 0.5  # 50% efficiency for wind
        total_renewable_energy = P_pv + P_wt
        
        # Compute energy deficit and grid usage
        energy_deficit = max(0, energy_demand - total_renewable_energy)
        grid_power_used = min(P_grid_action, energy_deficit)
        
        # Compute costs
        solar_cost = self.estimate_solar_panel_cost(N_pv * 300, 5, 25)  # 300W panels, 5h sunlight/day
        wind_cost = self.estimate_wind_turbine_cost(N_wt * 10000, 0.35, 25)  # 10kW turbines, 35% capacity
        grid_cost = grid_power_used * grid_price
        total_cost = solar_cost + wind_cost + grid_cost
        
        # Compute carbon footprint
        carbon_footprint = (
            (self.EF_PV * N_pv) + 
            (self.EF_WT * N_wt) + 
            (self.EF_grid * grid_power_used)
        )
        
        # Compute reward (negative cost and emissions)
        reward = - (total_cost + carbon_footprint)
        
        # Update state with random fluctuations
        next_P_solar = np.clip(P_solar + np.random.uniform(-50, 50), 0, 1200)
        next_P_wind = np.clip(P_wind + np.random.uniform(-2, 2), 0, 25)
        next_state = np.array([
            next_P_solar,
            next_P_wind,
            energy_demand,  # Demand remains fixed
            grid_price  # Grid price remains fixed
        ])
        
        self.current_state = next_state
        done = False
        
        return next_state, reward, done, {}



In [4]:
import numpy as np
import random
import matplotlib.pyplot as plt

class QAgent:
    def __init__(self, env, alpha=0.1, gamma=0.95, epsilon=1.0, epsilon_decay=0.99, min_epsilon=0.01):
        self.env = env
        
        # Define discretized actions
        self.n_pv_values = np.arange(0, 301)  # 0 to 100 panels 
        self.n_wt_values = np.arange(0, 51)   # 0 to 50 turbines 
        self.p_grid_values = np.arange(0, 1001)  # 0 to 1000 grid power 

        # Convert to tuples for indexing
        self.action_space = [(pv, wt, grid) for pv in self.n_pv_values for wt in self.n_wt_values for grid in self.p_grid_values]

        # Q-table: map (state -> action values)
        self.q_table = {}

        # Hyperparameters
        self.alpha = alpha      # Learning rate
        self.gamma = gamma      # Discount factor
        self.epsilon = epsilon  # Exploration rate
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon

    def discretize_state(self, state):
        """Convert continuous state to a discretized key."""
        return tuple(np.round(state, decimals=1))  # Round to 1 decimal for a manageable table size

    def choose_action(self, state):
        """Epsilon-greedy policy for action selection."""
        state = self.discretize_state(state)
        
        # Initialize Q-values if state is new
        if state not in self.q_table:
            self.q_table[state] = np.zeros(len(self.action_space))
        
        if np.random.rand() < self.epsilon:
            return random.choice(self.action_space)  # Explore
        else:
            best_action_idx = np.argmax(self.q_table[state])  # Exploit
            return self.action_space[best_action_idx]

    def update_q_table(self, state, action, reward, next_state):
        """Update Q-table using the Q-learning formula."""
        state = self.discretize_state(state)
        next_state = self.discretize_state(next_state)
        action_idx = self.action_space.index(action)

        # Initialize Q-values if states are new
        if state not in self.q_table:
            self.q_table[state] = np.zeros(len(self.action_space))
        if next_state not in self.q_table:
            self.q_table[next_state] = np.zeros(len(self.action_space))

        # Q-learning update rule
        best_next_q = np.max(self.q_table[next_state])  # Best Q-value for next state
        self.q_table[state][action_idx] += self.alpha * (reward + self.gamma * best_next_q - self.q_table[state][action_idx])

    def train(self, episodes=1000):
        rewards = []
        for episode in range(episodes):
            state = self.env.reset()
            total_reward = 0
            done = False
            i = 0
            while i<100:
                action = self.choose_action(state)
                next_state, reward, done, _ = self.env.step(action)
                self.update_q_table(state, action, reward, next_state)
                state = next_state
                total_reward += reward
                i+=1

            self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)  # Decay epsilon
            rewards.append(total_reward)
            if episode % 10 == 0:
                print(f"Episode {episode}, Reward: {total_reward}")

        return rewards

def test_agent(agent, episodes=20):
    """Test the trained agent and return average reward."""
    total_rewards = []
    for _ in range(episodes):
        state = agent.env.reset()
        total_reward = 0
        done = False
        i=0
        while i<100:
            action = agent.choose_action(state)  # Best known action
            state, reward, done, _ = agent.env.step(action)
            total_reward += reward
            i+=1
        total_rewards.append(total_reward)
    return np.mean(total_rewards)

def naive_strategy(env, episodes=100):
    """Baseline strategy: always use a fixed number of panels/turbines/grid power."""
    total_rewards = []
    for _ in range(episodes):
        state = env.reset()
        total_reward = 0
        done = False
        i=0
        while i<100:
            action = (50, 20, 500)  # Fixed naive strategy
            state, reward, done, _ = env.step(action)
            total_reward += reward
            i+=1
        total_rewards.append(total_reward)
    return np.mean(total_rewards)

# Initialize Environment and Agent
env = HybridEnergyEnv()
agent = QAgent(env)

# Train Agent
q_learning_rewards = agent.train(episodes=100)

# Test Agent vs. Naive Strategy
agent_reward = test_agent(agent, episodes=100)
naive_reward = naive_strategy(env, episodes=100)

print(f"Q-learning Agent Average Reward: {agent_reward}")
print(f"Naive Strategy Average Reward: {naive_reward}")

# Plot Training Rewards
plt.plot(q_learning_rewards)
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.title("Q-learning Training Progress")
plt.show()


  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


Episode 0, Reward: -8399696.777274515


MemoryError: Unable to allocate 117. MiB for an array with shape (15366351,) and data type float64

In [None]:
def get_best_actions(agent, states):
    """Get the best actions for multiple states using the trained Q-table."""
    best_actions = {}
    
    for state in states:
        state_key = agent.discretize_state(state)  # Convert to discrete form
        if state_key in agent.q_table:
            best_action = max(agent.q_table[state_key], key=agent.q_table[state_key].get)
        else:
            best_action = random.choice(agent.action_space)  # Fallback to random action
        
        best_actions[tuple(state)] = best_action  # Store result
    
    return best_actions

# Example states
test_states = [
    [500, 10, 800, 0.2],   # Moderate solar & wind
    [700, 5, 1200, 0.4],   # High demand, low wind
    [300, 15, 600, 0.1],   # Low solar, high wind
    [1000, 20, 500, 0.3],  # High solar & wind
    [200, 2, 1000, 0.5]    # Low energy generation, high grid price
]

# Get best actions
best_actions = get_best_actions(agent, test_states)

# Print results
for state, action in best_actions.items():
    print(f"State: {state} -> Best Action: {action}")
