<a href="https://colab.research.google.com/github/Isaiah-Essien/isaiah_essien_rl_summative/blob/main/seizure_rl_simulator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Epilepsy Detection RL-Simultion using DQN and PPO

This project aims to simulate a rienforcement learning approach to my mission-based project of Epilepsy detection and care system.

It simulates an agent(Camera, yellow color) and a patient(green/red dot) randomly performing action in a 10 by 10 action space, with each grid being an action with a reward or penalty.



In [6]:
!pip install stable-baselines3

In [34]:
#===================Imports=================
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pygame
from pygame.locals import *
from OpenGL.GL import *
from stable_baselines3 import DQN
from OpenGL.GLU import *
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv

In [25]:
#==============Static rendering===========

grid_size = 10
cell_size = 1

def draw_grid():
    glColor3f(0.8, 0.8, 0.8)
    glBegin(GL_LINES)
    for i in range(grid_size + 1):
        glVertex3f(i * cell_size, 0, 0)
        glVertex3f(i * cell_size, grid_size * cell_size, 0)
        glVertex3f(0, i * cell_size, 0)
        glVertex3f(grid_size * cell_size, i * cell_size, 0)
    glEnd()

def draw_patient(patient_pos, seizure):
    if seizure:
        glColor3f(1.0, 0.0, 0.0)  # Red if seizure
    else:
        glColor3f(0.0, 1.0, 0.0)  # Green if no seizure
    glPointSize(10)
    glBegin(GL_POINTS)
    glVertex3f(patient_pos[0] + 0.5, patient_pos[1] + 0.5, 0)
    glEnd()

def draw_camera(camera_pos):
    glColor3f(0.0, 0.0, 1.0)  # Blue for camera
    glPointSize(15)
    glBegin(GL_POINTS)
    glVertex3f(camera_pos[0] + 0.5, camera_pos[1] + 0.5, 0)
    glEnd()

def visualize_environment(patient_pos, camera_pos, seizure):
    pygame.init()
    display = (600, 600)
    pygame.display.set_mode(display, DOUBLEBUF | OPENGL)
    gluOrtho2D(0, grid_size, 0, grid_size)

    glClearColor(1, 1, 1, 1)
    glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT)

    draw_grid()
    draw_patient(patient_pos, seizure)
    draw_camera(camera_pos)

    pygame.display.flip()

    # Keep window open for static visualization
    running = True
    while running:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
    pygame.quit()

if __name__ == "__main__":
    patient_pos = np.array([4, 6])
    camera_pos = np.array([5, 5])
    seizure = True  # or False to visualize both cases
    visualize_environment(patient_pos, camera_pos, seizure)


In [42]:
colors = {
    'bg': (25, 25, 112),
    'grid': (70, 130, 180),
    'patient_normal': (60, 179, 113),
    'patient_seizure': (220, 20, 60),
    'drone': (255, 215, 0),
}

class EpilepsyDetectionEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, render_mode=None):
        super().__init__()

        self.grid_size = 10
        self.camera_pos = np.array([5, 5])
        self.patient_pos = np.array([np.random.randint(0, 10), np.random.randint(0, 10)])
        self.camera_zoom = 1
        self.seizure = False
        self.time_step = 0

        self.action_space = spaces.Discrete(9)
        self.observation_space = spaces.Box(low=0, high=10, shape=(5,), dtype=np.float32)

        self.render_mode = render_mode
        self.window_size = 700
        self.cell_size = self.window_size // self.grid_size

        if render_mode == "rgb_array":
            pygame.init()
            self.window = pygame.Surface((self.window_size, self.window_size))

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.camera_pos = np.array([5, 5])
        self.patient_pos = np.array([np.random.randint(0, 10), np.random.randint(0, 10)])
        self.camera_zoom = 1
        self.seizure = False
        self.time_step = 0

        return self._get_obs(), {}

    def _get_obs(self):
        return np.array([
            self.camera_pos[0], self.camera_pos[1],
            self.camera_zoom,
            self.patient_pos[0], self.patient_pos[1]
        ], dtype=np.float32)

    def step(self, action):
        reward = -0.1
        self.time_step += 1

        self.patient_pos += np.random.choice([-1, 0, 1], size=2)
        self.patient_pos = np.clip(self.patient_pos, 0, self.grid_size - 1)
        self.seizure = np.random.rand() < 0.05

        if action == 0: self.camera_pos[0] -= 1
        elif action == 1: self.camera_pos[0] += 1
        elif action == 2: self.camera_zoom = min(self.camera_zoom + 1, 5)
        elif action == 3: self.camera_zoom = max(self.camera_zoom - 1, 1)
        elif action == 4: self.camera_pos[1] -= 1
        elif action == 5: self.camera_pos[1] += 1
        self.camera_pos = np.clip(self.camera_pos, 0, self.grid_size - 1)

        if self.seizure and np.array_equal(self.camera_pos, self.patient_pos):
            reward += 10
        elif self.seizure and not np.array_equal(self.camera_pos, self.patient_pos):
            reward -= 10
        elif not self.seizure and np.array_equal(self.camera_pos, self.patient_pos):
            reward -= 5

        done = self.time_step >= 200
        return self._get_obs(), reward, done, False, {}

    def render(self):
        if self.render_mode == "rgb_array":
            self.window.fill(colors['bg'])

            for x in range(self.grid_size):
                for y in range(self.grid_size):
                    rect = pygame.Rect(x*self.cell_size, y*self.cell_size, self.cell_size, self.cell_size)
                    pygame.draw.rect(self.window, colors['grid'], rect, 2, border_radius=8)

            patient_color = colors['patient_seizure'] if self.seizure else colors['patient_normal']
            patient_pos_pix = ((self.patient_pos + 0.5) * self.cell_size).astype(int)
            pygame.draw.circle(self.window, patient_color, patient_pos_pix, self.cell_size//3)

            drone_pos_pix = ((self.camera_pos + 0.5) * self.cell_size).astype(int)
            pygame.draw.rect(self.window, colors['drone'], (*drone_pos_pix - self.cell_size//4, self.cell_size//2, self.cell_size//2), border_radius=10)

            for angle in [45, 135, 225, 315]:
                offset = np.array([np.cos(np.radians(angle)), np.sin(np.radians(angle))]) * self.cell_size//2.5
                rotor_pos = drone_pos_pix + offset.astype(int)
                pygame.draw.circle(self.window, colors['drone'], rotor_pos, self.cell_size//10)

            return np.transpose(pygame.surfarray.array3d(self.window), axes=(1, 0, 2))

        elif self.render_mode == "human":
            print(f"Time:{self.time_step}, Camera:{self.camera_pos}, Patient:{self.patient_pos}, Seizure:{self.seizure}")

    def close(self):
        pygame.quit()


In [17]:
#============Logger==================

from stable_baselines3.common.callbacks import BaseCallback
import numpy as np

class EpilepsyLoggerCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(EpilepsyLoggerCallback, self).__init__(verbose)
        self.episode_rewards = []
        self.episode_lengths = []

    def _on_step(self) -> bool:
        infos = self.locals.get('infos', [])
        for info in infos:
            if 'episode' in info:
                episode_reward = info['episode']['r']
                episode_length = info['episode']['l']

                self.episode_rewards.append(episode_reward)
                self.episode_lengths.append(episode_length)

                if self.verbose:
                    print(f"Episode {len(self.episode_rewards)} ended:")
                    print(f"  Reward: {episode_reward}")
                    print(f"  Length: {episode_length}")

        return True

    def _on_training_end(self) -> None:
        total_episodes = len(self.episode_rewards)
        if total_episodes == 0:
            print("No episodes completed. Consider increasing total_timesteps.")
            return

        avg_reward = np.mean(self.episode_rewards)
        avg_length = np.mean(self.episode_lengths)

        print("\n==== Training Summary ====")
        print(f"Total Episodes: {total_episodes}")
        print(f"Average Reward per Episode: {avg_reward:.2f}")
        print(f"Average Episode Length: {avg_length:.2f}")
        print(f"Final Episode Reward: {self.episode_rewards[-1]:.2f}")
        print("==========================\n")


In [26]:
#===============training DQN==============
env = DummyVecEnv([lambda: Monitor(EpilepsyDetectionEnv())])
logger_callback = EpilepsyLoggerCallback(verbose=1)

model = DQN("MlpPolicy", env, verbose=1,
            learning_rate=0.0001,
            buffer_size=500000,
            learning_starts=10000,
            batch_size=32,
            gamma=0.98,
            exploration_fraction=0.3,
            exploration_final_eps=0.05)

model.learn(total_timesteps=700000,callback=logger_callback)
model.save("epilepsy_dqn_model")

env.close()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Episode 2762 ended:
  Reward: -170.0
  Length: 200
Episode 2763 ended:
  Reward: -140.0
  Length: 200
Episode 2764 ended:
  Reward: -120.0
  Length: 200
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 200      |
|    ep_rew_mean      | -122     |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 2764     |
|    fps              | 880      |
|    time_elapsed     | 627      |
|    total_timesteps  | 552800   |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.298    |
|    n_updates        | 135699   |
----------------------------------
Episode 2765 ended:
  Reward: -80.0
  Length: 200
Episode 2766 ended:
  Reward: -70.0
  Length: 200
Episode 2767 ended:
  Reward: -90.0
  Length: 200
Episode 2768 ended:
  Reward: -150.0
  Length: 200
----------------------------------
| rollout/           

In [40]:
#=========training PPO========
env = DummyVecEnv([lambda: Monitor(EpilepsyDetectionEnv())])
logger_callback = EpilepsyLoggerCallback(verbose=1)

model = PPO("MlpPolicy", env, verbose=1,
            learning_rate=0.0001,
            gamma=0.98,
            batch_size=16,
            n_steps=4096)

model.learn(total_timesteps=700000,callback=logger_callback)
model.save("epilepsy_ppo_model")

env.close()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Episode 2266 ended:
  Reward: -100.0
  Length: 200
Episode 2267 ended:
  Reward: -180.0
  Length: 200
Episode 2268 ended:
  Reward: -150.0
  Length: 200
Episode 2269 ended:
  Reward: -120.0
  Length: 200
Episode 2270 ended:
  Reward: -80.0
  Length: 200
Episode 2271 ended:
  Reward: -120.0
  Length: 200
Episode 2272 ended:
  Reward: -115.0
  Length: 200
Episode 2273 ended:
  Reward: -140.0
  Length: 200
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 200          |
|    ep_rew_mean          | -124         |
| time/                   |              |
|    fps                  | 239          |
|    iterations           | 111          |
|    time_elapsed         | 1901         |
|    total_timesteps      | 454656       |
| train/                  |              |
|    approx_kl            | 0.0046553286 |
|    clip_fraction        | 0.0393       |
|    clip_ra

In [43]:
#====================Evaluating and recording video======

from stable_baselines3 import DQN, PPO
from gymnasium.wrappers import RecordVideo
# from environment.custom_env import EpilepsyDetectionEnv

model_dqn = DQN.load("epilepsy_dqn_model.zip")
model_ppo = PPO.load("epilepsy_ppo_model.zip")

env_dqn = RecordVideo(
    EpilepsyDetectionEnv(render_mode='rgb_array'),
    video_folder='rl_agent_videos_dqn',
    episode_trigger=lambda x: True
)

env_ppo = RecordVideo(
    EpilepsyDetectionEnv(render_mode='rgb_array'),
    video_folder='rl_agent_videos_ppo',
    episode_trigger=lambda x: True
)

# Evaluation function
def evaluate_and_record(model, env, episodes=20, model_name='model'):
    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        for step in range(200):
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, _, _ = env.step(action)
            total_reward += reward
            if done:
                break
        print(f"{model_name} Episode {episode+1}: Total Reward = {total_reward:.2f}")
    env.close()

# Record videos clearly
evaluate_and_record(model_dqn, env_dqn, model_name='DQN')
evaluate_and_record(model_ppo, env_ppo, model_name='PPO')



DQN Episode 1: Total Reward = -110.00
DQN Episode 2: Total Reward = -110.00
DQN Episode 3: Total Reward = -90.00
DQN Episode 4: Total Reward = -160.00
DQN Episode 5: Total Reward = -120.00
DQN Episode 6: Total Reward = -130.00
DQN Episode 7: Total Reward = -80.00
DQN Episode 8: Total Reward = -120.00
DQN Episode 9: Total Reward = -135.00
DQN Episode 10: Total Reward = -160.00
DQN Episode 11: Total Reward = -125.00
DQN Episode 12: Total Reward = -170.00
DQN Episode 13: Total Reward = -100.00
DQN Episode 14: Total Reward = -100.00
DQN Episode 15: Total Reward = -160.00
DQN Episode 16: Total Reward = -150.00
DQN Episode 17: Total Reward = -140.00
DQN Episode 18: Total Reward = -105.00
DQN Episode 19: Total Reward = -170.00
DQN Episode 20: Total Reward = -90.00
PPO Episode 1: Total Reward = -90.00
PPO Episode 2: Total Reward = -120.00
PPO Episode 3: Total Reward = -105.00
PPO Episode 4: Total Reward = -110.00
PPO Episode 5: Total Reward = -150.00
PPO Episode 6: Total Reward = -105.00
PPO E