In [1]:
import numpy as np
import random
import gym
from gym import spaces
import matplotlib.pyplot as plt
import pygame
import sys
import itertools

# Initialize pygame
pygame.init()

class ComplexPowerGridEnv(gym.Env):
    def __init__(self, render_mode=None):
        super(ComplexPowerGridEnv, self).__init__()
        
        # Grid configuration
        self.num_feeders = 3
        self.max_load = 1.0
        self.feeders_capacity = [1.0, 1.5, 1.2]
        self.fault_probability = 0.2
        self.tie_lines = [0] * self.num_feeders
        
        # Action space
        self.action_space = spaces.MultiBinary(self.num_feeders)
        
        # Observation space
        self.num_bins = 5
        self.observation_space = spaces.MultiDiscrete(
            [self.num_bins] * self.num_feeders +  # Loads
            [2] * self.num_feeders +              # Fault status
            [2] * self.num_feeders                # Tie line status
        )
        
        # Rendering
        self.render_mode = render_mode
        self.screen_size = 800
        self.screen = None
        self.clock = None
        self.font = None
        
        # Episode tracking
        self.current_step = 0
        self.max_steps = 200
        self.total_reward = 0  # Track total reward for display
        
        # Interactivity
        self.agent_control = True  # True: Agent controls tie lines, False: Manual control
        
        # Colors
        self.colors = {
            'background': (240, 240, 245),
            'feeder': (70, 130, 180),
            'fault': (220, 20, 60),
            'load': (255, 215, 0),
            'tie_line': (34, 139, 34),
            'tie_line_off': (169, 169, 169),  # Gray for inactive tie lines
            'overload': (139, 0, 0),
            'text': (0, 0, 0),
            'panel': (200, 200, 200)
        }

    def reset(self):
        self.loads = [random.uniform(0.3, self.max_load) for _ in range(self.num_feeders)]
        self.faults = [random.random() < self.fault_probability for _ in range(self.num_feeders)]
        self.tie_lines = [0] * self.num_feeders
        self.current_step = 0
        self.total_reward = 0
        self.agent_control = True
        
        if self.render_mode == "human" and self.screen is None:
            self._init_render()
        
        return self._get_state(), {}

    def step(self, action):
        if self.agent_control:
            self.tie_lines = list(action)
        
        self._redistribute_power()
        reward = self._calculate_reward()
        self.total_reward += reward
        self.current_step += 1
        done = self.current_step >= self.max_steps
        
        return self._get_state(), reward, done, {}

    def _redistribute_power(self):
        for i in range(self.num_feeders):
            if self.faults[i] and self.tie_lines[i]:
                connected = [j for j in range(self.num_feeders) 
                            if self.tie_lines[j] and j != i]
                if connected:
                    load_to_distribute = self.loads[i] / len(connected)
                    for j in connected:
                        self.loads[j] += load_to_distribute
                    self.loads[i] = 0

    def _calculate_reward(self):
        reward = 0
        for i in range(self.num_feeders):
            if self.faults[i]:
                if self.tie_lines[i]:
                    if self.loads[i] == 0:
                        reward += 10
                    else:
                        reward -= 5
                else:
                    reward -= 15
            else:
                if self.tie_lines[i]:
                    reward -= 2
        for i in range(self.num_feeders):
            if self.loads[i] > self.feeders_capacity[i]:
                reward -= 20 * (self.loads[i] - self.feeders_capacity[i])
        reward += 1
        return reward

    def _get_state(self):
        discretized_loads = [self.discretize(load, self.max_load) for load in self.loads]
        fault_status = [int(fault) for fault in self.faults]
        tie_line_status = list(self.tie_lines)
        return np.array(discretized_loads + fault_status + tie_line_status, dtype=np.int32)

    def discretize(self, value, max_value):
        return min(int(value / max_value * (self.num_bins - 1)), self.num_bins - 1)

    def handle_input(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                return False
            if event.type == pygame.KEYDOWN:
                # Toggle faults (keys 1, 2, 3)
                if event.key in [pygame.K_1, pygame.K_2, pygame.K_3]:
                    feeder_idx = [pygame.K_1, pygame.K_2, pygame.K_3].index(event.key)
                    self.faults[feeder_idx] = not self.faults[feeder_idx]
                # Toggle control mode (A for agent, M for manual)
                if event.key == pygame.K_a:
                    self.agent_control = True
                if event.key == pygame.K_m:
                    self.agent_control = False
                # Manual tie line control (keys 4, 5, 6)
                if not self.agent_control and event.key in [pygame.K_4, pygame.K_5, pygame.K_6]:
                    feeder_idx = [pygame.K_4, pygame.K_5, pygame.K_6].index(event.key)
                    self.tie_lines[feeder_idx] = 1 - self.tie_lines[feeder_idx]
        return True

    def render(self):
        if self.render_mode is None:
            return
            
        if self.screen is None:
            self._init_render()
        
        self.screen.fill(self.colors['background'])
        
        # Draw feeders
        feeder_width = 100
        spacing = (self.screen_size - self.num_feeders * feeder_width) / (self.num_feeders + 1)
        feeder_centers = []  # Store feeder centers for tie line drawing
        
        for i in range(self.num_feeders):
            x = spacing + i * (feeder_width + spacing)
            feeder_centers.append(x + feeder_width / 2)
            
            # Feeder base
            feeder_height = 200
            pygame.draw.rect(self.screen, self.colors['feeder'], 
                           (x, 300, feeder_width, feeder_height), border_radius=10)
            
            # Fault indicator
            if self.faults[i]:
                pygame.draw.circle(self.screen, self.colors['fault'], 
                                 (int(x + feeder_width/2), 280), 12, width=3)
            
            # Load indicator
            load_height = min(150, 150 * (self.loads[i] / self.feeders_capacity[i]))
            load_color = self.colors['overload'] if self.loads[i] > self.feeders_capacity[i] else self.colors['load']
            pygame.draw.rect(self.screen, load_color,
                           (x + 20, 300 + feeder_height - load_height, 
                            feeder_width - 40, load_height), border_radius=5)
            
            # Capacity line
            capacity_height = 150 * (1 / self.feeders_capacity[i])
            pygame.draw.line(self.screen, self.colors['text'],
                           (x, 300 + feeder_height - capacity_height),
                           (x + feeder_width, 300 + feeder_height - capacity_height), 2)
            
            # Tie line indicator
            tie_color = self.colors['tie_line'] if self.tie_lines[i] else self.colors['tie_line_off']
            pygame.draw.circle(self.screen, tie_color,
                              (int(x + feeder_width/2), 520), 10, width=2)
            
            # Text labels
            label = self.font.render(f"F{i+1}", True, self.colors['text'])
            self.screen.blit(label, (x + feeder_width/2 - 10, 250))
            load_text = self.font.render(f"{self.loads[i]:.2f}/{self.feeders_capacity[i]}", True, self.colors['text'])
            self.screen.blit(load_text, (x + feeder_width/2 - 30, 530))
        
        # Draw tie lines (as connections between feeders)
        for i in range(self.num_feeders):
            for j in range(i + 1, self.num_feeders):
                if self.tie_lines[i] and self.tie_lines[j]:
                    pygame.draw.line(self.screen, self.colors['tie_line'],
                                   (feeder_centers[i], 520), (feeder_centers[j], 520), 5)
        
        # Draw status panel
        pygame.draw.rect(self.screen, self.colors['panel'], (20, 20, 300, 120))
        mode_text = self.font.render(f"Mode: {'Agent' if self.agent_control else 'Manual'}", True, self.colors['text'])
        step_text = self.font.render(f"Step: {self.current_step}", True, self.colors['text'])
        reward_text = self.font.render(f"Total Reward: {self.total_reward:.1f}", True, self.colors['text'])
        self.screen.blit(mode_text, (30, 30))
        self.screen.blit(step_text, (30, 60))
        self.screen.blit(reward_text, (30, 90))
        
        # Instructions
        instr_text = self.font.render("1,2,3: Toggle faults | A: Agent | M: Manual | 4,5,6: Toggle tie lines", 
                                    True, self.colors['text'])
        self.screen.blit(instr_text, (20, self.screen_size - 40))
        
        pygame.display.flip()
        self.clock.tick(10)

    def _init_render(self):
        if self.render_mode == "human" and self.screen is None:
            self.screen = pygame.display.set_mode((self.screen_size, 600))
            pygame.display.set_caption("Interactive Power Grid Environment")
            self.clock = pygame.time.Clock()
            self.font = pygame.font.SysFont('Arial', 18)

    def close(self):
        if self.screen is not None:
            pygame.quit()
            self.screen = None

class QLearningAgent:
    def __init__(self, action_space, observation_space):
        self.action_space = action_space
        self.observation_space = observation_space
        self.possible_actions = list(itertools.product([0, 1], repeat=action_space.n))
        q_table_shape = tuple(observation_space.nvec) + (len(self.possible_actions),)
        self.q_table = np.random.uniform(low=-1, high=1, size=q_table_shape)
        self.alpha = 0.1
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.training_errors = []

    def choose_action(self, state):
        if random.random() < self.epsilon:
            return self.action_space.sample()
        action_idx = np.argmax(self.q_table[tuple(state)])
        return np.array(self.possible_actions[action_idx], dtype=np.int8)

    def update(self, state, action, reward, next_state, done):
        action_idx = self.possible_actions.index(tuple(action))
        current_q = self.q_table[tuple(state)][action_idx]
        max_next_q = np.max(self.q_table[tuple(next_state)])
        target_q = reward + self.gamma * max_next_q * (1 - done)
        self.q_table[tuple(state)][action_idx] += self.alpha * (target_q - current_q)
        self.training_errors.append(abs(target_q - current_q))
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

def train_agent(env, agent, episodes=1000):
    rewards_history = []
    episode_lengths = []
    
    for episode in range(episodes):
        state, _ = env.reset()
        total_reward = 0
        steps = 0
        done = False
        
        while not done:
            action = agent.choose_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.update(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            steps += 1
        
        rewards_history.append(total_reward)
        episode_lengths.append(steps)
        if episode % 100 == 0:
            avg_reward = np.mean(rewards_history[-100:]) if rewards_history else 0
            print(f"Episode {episode}, Reward: {total_reward}, Avg Reward: {avg_reward:.2f}, Epsilon: {agent.epsilon:.3f}")
    
    return rewards_history, episode_lengths

def showcase_agent(env, agent, episodes=3):
    agent.epsilon = 0  # Disable exploration
    
    for episode in range(episodes):
        state, _ = env.reset()
        done = False
        print(f"\nShowcase Episode {episode + 1}")
        
        while not done:
            # Handle user input
            if not env.handle_input():
                env.close()
                return
            
            # Agent chooses action if in agent control mode
            action = agent.choose_action(state) if env.agent_control else np.array(env.tie_lines, dtype=np.int8)
            
            # Step the environment
            next_state, reward, done, _ = env.step(action)
            state = next_state
            
            # Render
            env.render()
            
            # Print agent's action for feedback
            if env.agent_control:
                print(f"Step {env.current_step}: Agent chose tie lines {list(action)}, Reward: {reward:.1f}")
            else:
                print(f"Step {env.current_step}: Manual tie lines {env.tie_lines}, Reward: {reward:.1f}")
        
        print(f"Episode {episode + 1} Total Reward: {env.total_reward:.1f}")

def plot_training(rewards, episode_lengths):
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
    ax1.plot(rewards)
    ax1.set_title('Training Rewards')
    ax1.set_xlabel('Episode')
    ax1.set_ylabel('Total Reward')
    ax2.plot(episode_lengths)
    ax2.set_title('Episode Lengths')
    ax2.set_xlabel('Episode')
    ax2.set_ylabel('Steps')
    plt.tight_layout()
    plt.show()
    plt.close()



In [2]:

print("Starting training...")
train_env = ComplexPowerGridEnv(render_mode=None)
agent = QLearningAgent(train_env.action_space, train_env.observation_space)
rewards, lengths = train_agent(train_env, agent, episodes=1500)
# plot_training(rewards, lengths)
train_env.close()



Starting training...
Episode 0, Reward: 298, Avg Reward: 298.00, Epsilon: 0.367
Episode 100, Reward: 754.0764869993818, Avg Reward: 502.81, Epsilon: 0.010
Episode 200, Reward: 186, Avg Reward: 442.66, Epsilon: 0.010
Episode 300, Reward: 180, Avg Reward: 721.25, Epsilon: 0.010
Episode 400, Reward: -737.061708462873, Avg Reward: 538.43, Epsilon: 0.010
Episode 500, Reward: 1798, Avg Reward: 574.17, Epsilon: 0.010
Episode 600, Reward: 1998, Avg Reward: 409.90, Epsilon: 0.010
Episode 700, Reward: 182, Avg Reward: 491.02, Epsilon: 0.010
Episode 800, Reward: 188, Avg Reward: 570.30, Epsilon: 0.010
Episode 900, Reward: 1695, Avg Reward: 700.97, Epsilon: 0.010
Episode 1000, Reward: -1934.9989340426487, Avg Reward: 568.99, Epsilon: 0.010
Episode 1100, Reward: 1668.3050241791982, Avg Reward: 767.65, Epsilon: 0.010
Episode 1200, Reward: 188, Avg Reward: 593.07, Epsilon: 0.010
Episode 1300, Reward: 1749.7757578600683, Avg Reward: 506.62, Epsilon: 0.010
Episode 1400, Reward: 1435.0521642389624, Avg 


Controls:
1, 2, 3: Toggle faults on feeders 1, 2, 3. For example, press 1 to make feeder 1 faulty (red circle appears) or non-faulty.

A: Switch to agent control mode. The agent will set tie lines based on its learned policy.

M: Switch to manual control mode. You can set tie lines yourself.

4, 5, 6: In manual mode, toggle tie lines for feeders 1, 2, 3 (green circle for on, gray for off).

Close Window: Click the window's close button to exit.

Observing the Agent:
In agent mode (default):
Toggle faults (e.g., press 1 to make feeder 1 faulty).

Watch the agent respond by setting tie lines (green circles and lines appear).

Check the console for the agent's actions and rewards.

For example, if feeder 1 is faulty, the agent may activate tie lines for feeders 1 and 2 to redistribute the load, earning a +10 reward if successful.

In manual mode:
Set tie lines yourself (e.g., press 4 and 5 to activate tie lines for feeders 1 and 2).

See how your actions affect the grid (loads redistribute, rewards are calculated).

Compare your actions to what the agent would have done (printed in the console).

Visual Elements:
Feeders: Blue rectangles with load bars (yellow, or red if overloaded).

Faults: Red circles above faulty feeders.

Loads: Yellow/red bars showing current load relative to capacity.

Tie Lines: Green circles for active tie lines, gray for inactive. Green lines connect feeders with active tie lines.

Status Panel: Shows mode, step, and total reward.

Instructions: Bottom text explains controls.



In [3]:
print("\nStarting interactive showcase...")
show_env = ComplexPowerGridEnv(render_mode="human")
showcase_agent(show_env, agent, episodes=3)
show_env.close()


Starting interactive showcase...

Showcase Episode 1
Step 1: Agent chose tie lines [1, 1, 1], Reward: 7.0
Step 2: Agent chose tie lines [0, 0, 1], Reward: 11.0
Step 3: Agent chose tie lines [1, 1, 1], Reward: 7.0
Step 4: Agent chose tie lines [0, 0, 1], Reward: 11.0
Step 5: Agent chose tie lines [1, 1, 1], Reward: 7.0
Step 6: Agent chose tie lines [0, 0, 1], Reward: 11.0
Step 7: Agent chose tie lines [1, 1, 1], Reward: 7.0
Step 8: Agent chose tie lines [0, 0, 1], Reward: 11.0
Step 9: Agent chose tie lines [1, 1, 1], Reward: 7.0
Step 10: Agent chose tie lines [0, 0, 1], Reward: 11.0
Step 11: Agent chose tie lines [1, 1, 1], Reward: 7.0
Step 12: Agent chose tie lines [0, 0, 1], Reward: 11.0
Step 13: Agent chose tie lines [1, 1, 1], Reward: 7.0
Step 14: Agent chose tie lines [0, 0, 1], Reward: 11.0
Step 15: Agent chose tie lines [1, 1, 1], Reward: 7.0
Step 16: Agent chose tie lines [0, 0, 1], Reward: 11.0
Step 17: Agent chose tie lines [1, 1, 1], Reward: 7.0
Step 18: Agent chose tie line