In [20]:
# ============================================
# CELL 1: INSTALLATION AND CORE CONCEPTS
# ============================================
!pip install gymnasium -q

import gymnasium as gym
import numpy as np

print("Gymnasium installed successfully!")
print(f"Version: {gym.__version__}")

Gymnasium installed successfully!
Version: 1.2.2


In [21]:
# ============================================
# CELL 2: CREATING A CUSTOM GYMNASIUM ENVIRONMENT
# ============================================

class ClassroomEnv(gym.Env):
    """
    A simple grid-world environment where an agent learns to navigate to a goal.

    Grid Layout (5x5):
        G = Goal
        A = Agent
        X = Obstacle
        . = Empty space

    Current Layout:
        . . . X G
        . . . . .
        . X . . .
        . . . X .
        A . . . .
    """

    def __init__(self):
        super(ClassroomEnv, self).__init__()

        # ====================
        # PARAMETERS
        # ====================
        self.grid_size = 5
        self.max_steps = 50

        # Starting position (bottom-left)
        self.start_pos = np.array([0, 0])

        # Goal position (top-right)
        self.goal_pos = np.array([4, 4])

        # Fixed obstacle positions (change these if you want!)
        self.obstacles = [
            np.array([3, 4]),  # Top row
            np.array([1, 2]),  # Middle
            np.array([3, 1])   # Bottom area
        ]
        # ====================

        # Action space: 0=Up, 1=Down, 2=Left, 3=Right
        self.action_space = gym.spaces.Discrete(4)

        # Observation space: agent's (x, y) position
        self.observation_space = gym.spaces.Box(
            low=0,
            high=self.grid_size - 1,
            shape=(2,),
            dtype=np.int32
        )

        # Internal state (will be set in reset())
        self.agent_pos = None
        self.current_step = 0

    def reset(self, seed=None, options=None):
        """Reset environment to starting state"""
        pass  # We'll implement this in Cell 3

    def step(self, action):
        """Execute one action in the environment"""
        pass  # We'll implement this in Cell 4

    def render(self):
        """Display the current state"""
        pass  # We'll implement this in Cell 5

print("Good !")
env = ClassroomEnv()
print(f"Grid size: {env.grid_size}x{env.grid_size}")
print(f"Start: {env.start_pos}, Goal: {env.goal_pos}")
print(f"Obstacles: {len(env.obstacles)} fixed positions")
print(f"Action space: {env.action_space}")

Good !
Grid size: 5x5
Start: [0 0], Goal: [4 4]
Obstacles: 3 fixed positions
Action space: Discrete(4)


In [22]:
# ============================================
# CELL 3: IMPLEMENTING THE RESET METHOD
# ============================================

class ClassroomEnv(gym.Env):
    """
    A simple grid-world environment where an agent learns to navigate to a goal.

    Grid Layout (5x5):
        G = Goal
        A = Agent
        X = Obstacle
        . = Empty space

    Current Layout:
        . . . X G
        . . . . .
        . X . . .
        . . . X .
        A . . . .
    """

    def __init__(self):
        super(ClassroomEnv, self).__init__()

        # ====================
        # PARAMETERS
        # ====================
        self.grid_size = 5
        self.max_steps = 50

        # Starting position (bottom-left)
        self.start_pos = np.array([0, 0])

        # Goal position (top-right)
        self.goal_pos = np.array([4, 4])

        # Fixed obstacle positions (change these if you want!)
        self.obstacles = [
            np.array([3, 4]),  # Top row
            np.array([1, 2]),  # Middle
            np.array([3, 1])   # Bottom area
        ]
        # ====================

        # Action space: 0=Up, 1=Down, 2=Left, 3=Right
        self.action_space = gym.spaces.Discrete(4)

        # Observation space: agent's (x, y) position
        self.observation_space = gym.spaces.Box(
            low=0,
            high=self.grid_size - 1,
            shape=(2,),
            dtype=np.int32
        )

        # Internal state (will be set in reset())
        self.agent_pos = None
        self.current_step = 0

    def reset(self, seed=None, options=None):
        """
        Reset the environment to its initial state.

        This is called:
        - At the start of each episode
        - When the agent reaches the goal
        - When max_steps is reached

        Returns:
            observation: The agent's starting position [x, y]
            info: Empty dictionary (required by Gymnasium)
        """
        # Set random seed for reproducibility (if provided)
        super().reset(seed=seed)

        # Reset agent to starting position
        self.agent_pos = self.start_pos.copy()

        # Reset step counter
        self.current_step = 0

        # Return observation and empty info dict
        return self.agent_pos.copy(), {}

    def step(self, action):
        """Execute one action in the environment"""
        pass  # We'll implement this in Cell 4

    def render(self):
        """Display the current state"""
        pass  # We'll implement this in Cell 5

print("Good !")
env = ClassroomEnv()
observation, info = env.reset()

print(f"Agent position after reset: {observation}")
print(f"Expected starting position: {env.start_pos}")
print(f"Current step counter: {env.current_step}")
print(f"Info dictionary: {info}")

Good !
Agent position after reset: [0 0]
Expected starting position: [0 0]
Current step counter: 0
Info dictionary: {}


In [23]:
# ============================================
# CELL 4: IMPLEMENTING THE STEP METHOD
# ============================================

class ClassroomEnv(gym.Env):
    """
    A simple grid-world environment where an agent learns to navigate to a goal.

    Grid Layout (5x5):
        G = Goal
        A = Agent
        X = Obstacle
        . = Empty space

    Current Layout:
        . . . X G
        . . . . .
        . X . . .
        . . . X .
        A . . . .
    """

    def __init__(self):
        super(ClassroomEnv, self).__init__()

        # ====================
        # PARAMETERS
        # ====================
        self.grid_size = 5
        self.max_steps = 50

        # Starting position (bottom-left)
        self.start_pos = np.array([0, 0])

        # Goal position (top-right)
        self.goal_pos = np.array([4, 4])

        # Fixed obstacle positions (change these if you want!)
        self.obstacles = [
            np.array([3, 4]),  # Top row
            np.array([1, 2]),  # Middle
            np.array([3, 1])   # Bottom area
        ]

        # Reward structure
        self.STEP_REWARD = -0.01     # Small penalty per step
        self.OBSTACLE_REWARD = -1.0  # Large penalty for obstacles
        self.GOAL_REWARD = +1.0      # Big reward for goal

        # ====================

        # Action space: 0=Up, 1=Down, 2=Left, 3=Right
        self.action_space = gym.spaces.Discrete(4)

        # Observation space: agent's (x, y) position
        self.observation_space = gym.spaces.Box(
            low=0,
            high=self.grid_size - 1,
            shape=(2,),
            dtype=np.int32
        )

        # Internal state (will be set in reset())
        self.agent_pos = None
        self.current_step = 0

    def reset(self, seed=None, options=None):
        """
        Reset the environment to its initial state.

        This is called:
        - At the start of each episode
        - When the agent reaches the goal
        - When max_steps is reached

        Returns:
            observation: The agent's starting position [x, y]
            info: Empty dictionary (required by Gymnasium)
        """
        # Set random seed for reproducibility (if provided)
        super().reset(seed=seed)

        # Reset agent to starting position
        self.agent_pos = self.start_pos.copy()

        # Reset step counter
        self.current_step = 0

        # Return observation and empty info dict
        return self.agent_pos.copy(), {}

    def step(self, action):
        """
        Execute one action and return the result.

        Args:
            action: Integer (0=Up, 1=Down, 2=Left, 3=Right)

        Returns:
            observation: New agent position [x, y]
            reward: Float reward for this step
            terminated: Boolean, True if episode should end (reached goal)
            truncated: Boolean, True if max steps reached
            info: Dictionary with extra information
        """
        # Increment step counter
        self.current_step += 1

        # Calculate new position based on action
        new_pos = self.agent_pos.copy()

        if action == 0:    # Up
            new_pos[1] += 1
        elif action == 1:  # Down
            new_pos[1] -= 1
        elif action == 2:  # Left
            new_pos[0] -= 1
        elif action == 3:  # Right
            new_pos[0] += 1

        # Check if new position is within grid boundaries
        if (new_pos[0] < 0 or new_pos[0] >= self.grid_size or
            new_pos[1] < 0 or new_pos[1] >= self.grid_size):
            # Hit wall - stay in place
            new_pos = self.agent_pos.copy()

        # Update agent position
        self.agent_pos = new_pos

        # Initialize reward with step penalty
        reward = self.STEP_REWARD
        terminated = False
        truncated = False

        # Check if agent reached the goal
        if np.array_equal(self.agent_pos, self.goal_pos):
            reward = self.GOAL_REWARD
            terminated = True

        # Check if agent hit an obstacle
        hit_obstacle = any(np.array_equal(self.agent_pos, obs)
                          for obs in self.obstacles)
        if hit_obstacle:
            reward += self.OBSTACLE_REWARD  # Add obstacle penalty

        # Check if max steps reached
        if self.current_step >= self.max_steps:
            truncated = True

        # Extra information
        info = {
            'step': self.current_step,
            'hit_obstacle': hit_obstacle
        }

        return self.agent_pos.copy(), reward, terminated, truncated, info

    def render(self):
        """Display the current state"""
        pass  # We'll implement this in Cell 5

print("Good !")
env = ClassroomEnv()
observation, info = env.reset()

print(f"Starting position: {observation}")
print(f"Goal position: {env.goal_pos}")
print(f"Obstacles: {[obs.tolist() for obs in env.obstacles]}")
print("\n" + "="*50)

print("\nTest 1: Move RIGHT from [0,0]")
obs, reward, terminated, truncated, info = env.step(3)
print(f"New position: {obs}")
print(f"Reward: {reward}")

print("\nTest 2: Move LEFT")
obs, reward, terminated, truncated, info = env.step(2)
print(f"Position: {obs}")
print(f"Reward: {reward}")

print("\nTest 3: Moving to obstacle at [2, 1]")
env.agent_pos = np.array([2, 1])
obs, reward, terminated, truncated, info = env.step(3)
print(f"Hit obstacle: {info['hit_obstacle']}")
print(f"Reward: {reward}")

print("\nTest 4: Moving to goal")
env.agent_pos = np.array([4, 3])
obs, reward, terminated, truncated, info = env.step(0)
print(f"Reward: {reward}")

Good !
Starting position: [0 0]
Goal position: [4 4]
Obstacles: [[3, 4], [1, 2], [3, 1]]


Test 1: Move RIGHT from [0,0]
New position: [1 0]
Reward: -0.01

Test 2: Move LEFT
Position: [0 0]
Reward: -0.01

Test 3: Moving to obstacle at [2, 1]
Hit obstacle: True
Reward: -1.01

Test 4: Moving to goal
Reward: 1.0


In [24]:
# ============================================
# CELL 5: IMPLEMENTING THE RENDER METHOD
# ============================================

class ClassroomEnv(gym.Env):
    """
    A simple grid-world environment where an agent learns to navigate to a goal.

    Grid Layout (5x5):
        G = Goal (top-right corner)
        A = Agent (starts bottom-left)
        X = Obstacle (fixed positions)
        . = Empty space

    Current Layout:
        . . . X G
        . . . . .
        . X . . .
        . . . X .
        A . . . .
    """

    def __init__(self):
        super(ClassroomEnv, self).__init__()

        # ====================
        # PARAMETERS
        # ====================
        self.grid_size = 5
        self.max_steps = 50

        # Starting position (bottom-left)
        self.start_pos = np.array([0, 0])

        # Goal position (top-right)
        self.goal_pos = np.array([4, 4])

        # Fixed obstacle positions (change these if you want!)
        self.obstacles = [
            np.array([3, 4]),  # Top row
            np.array([1, 2]),  # Middle
            np.array([3, 1])   # Bottom area
        ]

        # Reward structure
        self.STEP_REWARD = -0.01     # Small penalty per step
        self.OBSTACLE_REWARD = -1.0  # Large penalty for obstacles
        self.GOAL_REWARD = +1.0      # Big reward for goal
        # ====================

        # Action space: 0=Up, 1=Down, 2=Left, 3=Right
        self.action_space = gym.spaces.Discrete(4)

        # Observation space: agent's (x, y) position
        self.observation_space = gym.spaces.Box(
            low=0,
            high=self.grid_size - 1,
            shape=(2,),
            dtype=np.int32
        )

        # Internal state (will be set in reset())
        self.agent_pos = None
        self.current_step = 0

    def reset(self, seed=None, options=None):
        """
        Reset the environment to its initial state.

        Returns:
            observation: The agent's starting position [x, y]
            info: Empty dictionary (required by Gymnasium)
        """
        super().reset(seed=seed)
        self.agent_pos = self.start_pos.copy()
        self.current_step = 0
        return self.agent_pos.copy(), {}

    def step(self, action):
        """
        Execute one action and return the result.

        Args:
            action: Integer (0=Up, 1=Down, 2=Left, 3=Right)

        Returns:
            observation: New agent position [x, y]
            reward: Float reward for this step
            terminated: Boolean, True if episode should end (reached goal)
            truncated: Boolean, True if max steps reached
            info: Dictionary with extra information
        """
        # Increment step counter
        self.current_step += 1

        # Calculate new position based on action
        new_pos = self.agent_pos.copy()

        if action == 0:    # Up
            new_pos[1] += 1
        elif action == 1:  # Down
            new_pos[1] -= 1
        elif action == 2:  # Left
            new_pos[0] -= 1
        elif action == 3:  # Right
            new_pos[0] += 1

        # Check if new position is within grid boundaries
        if (new_pos[0] < 0 or new_pos[0] >= self.grid_size or
            new_pos[1] < 0 or new_pos[1] >= self.grid_size):
            # Hit wall - stay in place
            new_pos = self.agent_pos.copy()

        # Update agent position
        self.agent_pos = new_pos

        # Initialize reward with step penalty
        reward = self.STEP_REWARD
        terminated = False
        truncated = False

        # Check if agent reached the goal
        if np.array_equal(self.agent_pos, self.goal_pos):
            reward = self.GOAL_REWARD
            terminated = True

        # Check if agent hit an obstacle
        hit_obstacle = any(np.array_equal(self.agent_pos, obs)
                          for obs in self.obstacles)
        if hit_obstacle:
            reward += self.OBSTACLE_REWARD  # Add obstacle penalty

        # Check if max steps reached
        if self.current_step >= self.max_steps:
            truncated = True

        # Extra information
        info = {
            'step': self.current_step,
            'hit_obstacle': hit_obstacle
        }

        return self.agent_pos.copy(), reward, terminated, truncated, info

    def render(self):
        """
        Display the current state of the environment.

        Symbols:
            A = Agent (current position)
            G = Goal (target)
            X = Obstacle (to avoid)
            . = Empty space (walkable)
        """
        # Print header with step information
        print(f"\nStep {self.current_step}/{self.max_steps}")
        print("=" * (self.grid_size * 2 + 1))

        # Print grid from top to bottom (y goes from high to low for display)
        for y in range(self.grid_size - 1, -1, -1):
            row = ""
            for x in range(self.grid_size):
                current_pos = np.array([x, y])

                # Check what's at this position and display appropriate symbol
                if np.array_equal(current_pos, self.agent_pos):
                    row += "A "
                elif np.array_equal(current_pos, self.goal_pos):
                    row += "G "
                elif any(np.array_equal(current_pos, obs) for obs in self.obstacles):
                    row += "X "
                else:
                    row += ". "

            print(row)

        # Print footer with position information
        print("=" * (self.grid_size * 2 + 1))
        print(f"Agent: {self.agent_pos.tolist()} | Goal: {self.goal_pos.tolist()}")

print("Complete ClassroomEnv created!")

Complete ClassroomEnv created!


In [25]:
# ============================================
# TEST THE COMPLETE ENVIRONMENT
# ============================================

print("\n" + "TESTING COMPLETE ENVIRONMENT" + "\n" + "="*50)

env = ClassroomEnv()
observation, info = env.reset()

print("\n 1. Initial State:")
env.render()

print("\n 2. Move RIGHT:")
obs, reward, terminated, truncated, info = env.step(3)
env.render()
print(f"Reward: {reward}")

print("\n 3. Move UP:")
obs, reward, terminated, truncated, info = env.step(0)
env.render()
print(f"Reward: {reward}")

print("\n 4. Move UP (will hit obstacle at [1,2]):")
obs, reward, terminated, truncated, info = env.step(0)
env.render()
print(f"Reward: {reward} (includes obstacle penalty!)")
print(f"Hit obstacle: {info['hit_obstacle']}")

print("\nEnvironment is fully functional!")


TESTING COMPLETE ENVIRONMENT

 1. Initial State:

Step 0/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
A . . . . 
Agent: [0, 0] | Goal: [4, 4]

 2. Move RIGHT:

Step 1/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
. A . . . 
Agent: [1, 0] | Goal: [4, 4]
Reward: -0.01

 3. Move UP:

Step 2/50
. . . X G 
. . . . . 
. X . . . 
. A . X . 
. . . . . 
Agent: [1, 1] | Goal: [4, 4]
Reward: -0.01

 4. Move UP (will hit obstacle at [1,2]):

Step 3/50
. . . X G 
. . . . . 
. A . . . 
. . . X . 
. . . . . 
Agent: [1, 2] | Goal: [4, 4]
Reward: -1.01 (includes obstacle penalty!)
Hit obstacle: True

Environment is fully functional!


In [26]:
# ============================================
# CELL 6: TESTING WITH A RANDOM AGENT
# ============================================

def test_random_agent(env, num_episodes=3):
    """
    Test the environment with an agent that takes random actions.

    Args:
        env: The ClassroomEnv instance
        num_episodes: Number of episodes to run
    """
    action_names = ["Up", "Down", "Left", "Right"]

    for episode in range(num_episodes):
        print(f"\nEpisode {episode + 1}/{num_episodes}")
        print("-" * 40)

        observation, info = env.reset()
        env.render()

        total_reward = 0
        done = False

        while not done:
            # Choose random action
            action = env.action_space.sample()

            # Execute action
            observation, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            done = terminated or truncated

            print(f"\nAction: {action_names[action]}")
            env.render()
            print(f"Reward: {reward:.2f} | Total: {total_reward:.2f}")

            if terminated:
                print("Goal reached!")
            if truncated:
                print("Max steps reached.")

        print(f"\nEpisode summary: {info['step']} steps, Total reward: {total_reward:.2f}")

# Create environment and test
env = ClassroomEnv()
test_random_agent(env, num_episodes=1)


Episode 1/1
----------------------------------------

Step 0/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
A . . . . 
Agent: [0, 0] | Goal: [4, 4]

Action: Right

Step 1/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
. A . . . 
Agent: [1, 0] | Goal: [4, 4]
Reward: -0.01 | Total: -0.01

Action: Down

Step 2/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
. A . . . 
Agent: [1, 0] | Goal: [4, 4]
Reward: -0.01 | Total: -0.02

Action: Up

Step 3/50
. . . X G 
. . . . . 
. X . . . 
. A . X . 
. . . . . 
Agent: [1, 1] | Goal: [4, 4]
Reward: -0.01 | Total: -0.03

Action: Down

Step 4/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
. A . . . 
Agent: [1, 0] | Goal: [4, 4]
Reward: -0.01 | Total: -0.04

Action: Down

Step 5/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
. A . . . 
Agent: [1, 0] | Goal: [4, 4]
Reward: -0.01 | Total: -0.05

Action: Right

Step 6/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
. . A . . 
Agent: [2, 0] | Goal: [4, 4]
Reward: -0.01 | Total: -0.06

Action: Up

Step 7/50


In [27]:
# ============================================
# CELL 7: Q-LEARNING AGENT
# ============================================

class QLearningAgent:
    """
    Q-Learning agent for discrete state spaces.
    Uses a dictionary (Q-table) to store state-action values.
    """

    def __init__(self, n_actions, learning_rate=0.1, discount_factor=0.95, epsilon=0.1):
        """
        Initialize Q-Learning agent.

        Args:
            n_actions: Number of possible actions
            learning_rate: How much to update Q-values (0 to 1)
            discount_factor: Importance of future rewards (0 to 1)
            epsilon: Exploration rate (0 = exploit only, 1 = explore only)
        """
        # ====================
        # PARAMETERS
        # ====================
        self.n_actions = n_actions
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        # ====================

        # Q-table: stores Q-values for each state-action pair
        self.q_table = {}

    def get_q_values(self, state):
        """Get Q-values for a state (initialize if new state)."""
        state_key = tuple(state)

        if state_key not in self.q_table:
            self.q_table[state_key] = np.zeros(self.n_actions)

        return self.q_table[state_key]

    def choose_action(self, state):
        """
        Choose action using epsilon-greedy policy.
        - With probability epsilon: random action (exploration)
        - With probability 1-epsilon: best action (exploitation)
        """
        if np.random.random() < self.epsilon:
            return np.random.randint(0, self.n_actions)
        else:
            q_values = self.get_q_values(state)
            return np.argmax(q_values)

    def update(self, state, action, reward, next_state, done):
        """
        Update Q-value using Bellman equation:
        Q(s,a) = Q(s,a) + α * [r + γ * max(Q(s',a')) - Q(s,a)]
        """
        state_key = tuple(state)

        # Get current Q-value
        current_q = self.get_q_values(state)[action]

        # Calculate target Q-value
        if done:
            target_q = reward
        else:
            next_q_values = self.get_q_values(next_state)
            target_q = reward + self.discount_factor * np.max(next_q_values)

        # Update Q-value
        self.q_table[state_key][action] = current_q + self.learning_rate * (target_q - current_q)

# Create and test agent
agent = QLearningAgent(n_actions=4, learning_rate=0.5, discount_factor=0.95, epsilon=0.1)

print("Q-Learning agent created.")
print(f"Learning rate: {agent.learning_rate}")
print(f"Discount factor: {agent.discount_factor}")
print(f"Epsilon: {agent.epsilon}")

Q-Learning agent created.
Learning rate: 0.5
Discount factor: 0.95
Epsilon: 0.1


In [28]:
# ============================================
# CELL 8: TRAINING Q-LEARNING AGENT
# ============================================

def train_qlearning(env, agent, num_episodes=1000, epsilon_decay=True):
    """
    Train Q-Learning agent.

    Args:
        env: The ClassroomEnv instance
        agent: The QLearningAgent instance
        num_episodes: Number of training episodes
        epsilon_decay: Gradually reduce exploration over time

    Returns:
        rewards: List of total rewards per episode
    """
    rewards = []
    initial_epsilon = agent.epsilon

    for episode in range(num_episodes):
        state, info = env.reset()
        total_reward = 0
        done = False

        while not done:
            action = agent.choose_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            agent.update(state, action, reward, next_state, done)

            state = next_state
            total_reward += reward

        if epsilon_decay:
            agent.epsilon = initial_epsilon * (0.995 ** episode)
            agent.epsilon = max(0.01, agent.epsilon)

        rewards.append(total_reward)

        if (episode + 1) % 200 == 0:
            avg_reward = np.mean(rewards[-100:])
            print(f"Episode {episode + 1}/{num_episodes} | Avg Reward: {avg_reward:.2f} | Epsilon: {agent.epsilon:.3f}")

    return rewards

# Create environment and agent
env = ClassroomEnv()
agent = QLearningAgent(
    n_actions=4,
    learning_rate=0.5,
    discount_factor=0.99,
    epsilon=0.9
)

print("Training Q-Learning agent...")
print("=" * 50)
rewards = train_qlearning(env, agent, num_episodes=1000, epsilon_decay=True)

print("\nTraining complete.")
print(f"Final average reward (last 100 episodes): {np.mean(rewards[-100:]):.2f}")
print(f"Q-table size (states discovered): {len(agent.q_table)}")
print(f"Final epsilon: {agent.epsilon:.3f}")

Training Q-Learning agent...
Episode 200/1000 | Avg Reward: 0.18 | Epsilon: 0.332
Episode 400/1000 | Avg Reward: 0.65 | Epsilon: 0.122
Episode 600/1000 | Avg Reward: 0.89 | Epsilon: 0.045
Episode 800/1000 | Avg Reward: 0.90 | Epsilon: 0.016
Episode 1000/1000 | Avg Reward: 0.93 | Epsilon: 0.010

Training complete.
Final average reward (last 100 episodes): 0.93
Q-table size (states discovered): 24
Final epsilon: 0.010


In [29]:
# ============================================
# CELL 9: TESTING TRAINED Q-LEARNING AGENT
# ============================================

def test_trained_agent(env, agent, num_episodes=3):
    """
    Test the trained Q-Learning agent.
    Sets epsilon=0 to use only learned policy (no exploration).

    Args:
        env: The ClassroomEnv instance
        agent: The trained QLearningAgent instance
        num_episodes: Number of test episodes
    """
    action_names = ["Up", "Down", "Left", "Right"]

    original_epsilon = agent.epsilon
    agent.epsilon = 0.0

    successes = 0

    for episode in range(num_episodes):
        print(f"\nTest Episode {episode + 1}/{num_episodes}")
        print("-" * 40)

        state, info = env.reset(seed=42 + episode)
        env.render()

        total_reward = 0
        done = False

        while not done:
            action = agent.choose_action(state)
            state, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            done = terminated or truncated

            print(f"\nAction: {action_names[action]}")
            env.render()
            print(f"Reward: {reward:.2f} | Total: {total_reward:.2f}")

            if terminated:
                print("Goal reached!")
                successes += 1
                break
            if truncated:
                print("Max steps reached.")
                break

        print(f"\nEpisode summary: {info['step']} steps, Total reward: {total_reward:.2f}")

    agent.epsilon = original_epsilon

    print(f"\nSuccess rate: {successes}/{num_episodes}")

# Test the trained agent
print("Testing trained Q-Learning agent...")
print("=" * 50)
test_trained_agent(env, agent, num_episodes=3)

Testing trained Q-Learning agent...

Test Episode 1/3
----------------------------------------

Step 0/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
A . . . . 
Agent: [0, 0] | Goal: [4, 4]

Action: Up

Step 1/50
. . . X G 
. . . . . 
. X . . . 
A . . X . 
. . . . . 
Agent: [0, 1] | Goal: [4, 4]
Reward: -0.01 | Total: -0.01

Action: Right

Step 2/50
. . . X G 
. . . . . 
. X . . . 
. A . X . 
. . . . . 
Agent: [1, 1] | Goal: [4, 4]
Reward: -0.01 | Total: -0.02

Action: Right

Step 3/50
. . . X G 
. . . . . 
. X . . . 
. . A X . 
. . . . . 
Agent: [2, 1] | Goal: [4, 4]
Reward: -0.01 | Total: -0.03

Action: Up

Step 4/50
. . . X G 
. . . . . 
. X A . . 
. . . X . 
. . . . . 
Agent: [2, 2] | Goal: [4, 4]
Reward: -0.01 | Total: -0.04

Action: Up

Step 5/50
. . . X G 
. . A . . 
. X . . . 
. . . X . 
. . . . . 
Agent: [2, 3] | Goal: [4, 4]
Reward: -0.01 | Total: -0.05

Action: Right

Step 6/50
. . . X G 
. . . A . 
. X . . . 
. . . X . 
. . . . . 
Agent: [3, 3] | Goal: [4, 4]
Reward: -0.01 

In [30]:
# ============================================
# CELL 10: ENVIRONMENT SETUP FOR DQN
# ============================================

# Same environment as Q-Learning section
# DQN uses the numpy array state directly as neural network input
env = ClassroomEnv()

In [31]:
# ============================================
# CELL 11: DQN NETWORK AND REPLAY BUFFER
# ============================================

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from collections import deque
import random

class DQNNetwork(nn.Module):
    """
    Neural network for approximating Q-values.
    """
    def __init__(self, state_dim, action_dim, hidden_size=128):
        super(DQNNetwork, self).__init__()

        # ====================
        # PARAMETERS
        # ====================
        # hidden_size: Number of neurons in hidden layers
        # ====================

        self.fc1 = nn.Linear(state_dim, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)


class ReplayBuffer:
    """
    Stores past experiences for training.
    """
    def __init__(self, capacity=10000):
        # ====================
        # PARAMETERS
        # ====================
        # capacity: Maximum number of experiences to store
        # ====================

        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states), np.array(actions), np.array(rewards),
                np.array(next_states), np.array(dones))

    def __len__(self):
        return len(self.buffer)


print("DQN Network and Replay Buffer created.")

DQN Network and Replay Buffer created.


In [32]:
# ============================================
# CELL 12: DQN AGENT
# ============================================

class DQNAgent:
    """
    Deep Q-Network agent with experience replay and target network.
    """

    def __init__(self, state_dim, action_dim, learning_rate=0.001, discount_factor=0.99):
        """
        Initialize DQN agent.

        Args:
            state_dim: Dimension of state space
            action_dim: Number of possible actions
            learning_rate: Neural network learning rate
            discount_factor: Importance of future rewards
        """
        # ====================
        # PARAMETERS
        # ====================
        self.action_dim = action_dim
        self.discount_factor = discount_factor
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.batch_size = 64
        # ====================

        # Q-network and target network
        self.q_network = DQNNetwork(state_dim, action_dim)
        self.target_network = DQNNetwork(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())

        # Optimizer and replay buffer
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.memory = ReplayBuffer()

    def choose_action(self, state):
        """
        Choose action using epsilon-greedy policy.
        """
        if random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)

        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.q_network(state_tensor)
            return q_values.argmax().item()

    def train_step(self):
        """
        Train the network on a batch of experiences.
        """
        if len(self.memory) < self.batch_size:
            return None

        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)

        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)

        # Current Q-values
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1))

        # Target Q-values
        next_q = self.target_network(next_states).max(1)[0].detach()
        target_q = rewards + (1 - dones) * self.discount_factor * next_q

        # Calculate loss and update
        loss = F.mse_loss(current_q.squeeze(), target_q)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

    def update_target_network(self):
        """Copy weights from Q-network to target network."""
        self.target_network.load_state_dict(self.q_network.state_dict())

    def decay_epsilon(self):
        """Reduce exploration rate."""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)


# Create DQN agent
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

agent = DQNAgent(state_dim=state_dim, action_dim=action_dim, learning_rate=0.001, discount_factor=0.99)

print("DQN agent created.")
print(f"State dimension: {state_dim}")
print(f"Action dimension: {action_dim}")
print(f"Network architecture: {state_dim} → 128 → 128 → {action_dim}")

DQN agent created.
State dimension: 2
Action dimension: 4
Network architecture: 2 → 128 → 128 → 4


In [33]:
# ============================================
# CELL 13: TRAINING DQN AGENT
# ============================================

def train_dqn(env, agent, num_episodes=500, target_update_freq=10):
    """
    Train DQN agent.

    Args:
        env: The ClassroomEnv instance
        agent: The DQNAgent instance
        num_episodes: Number of training episodes
        target_update_freq: How often to update target network

    Returns:
        rewards: List of total rewards per episode
    """
    rewards = []

    for episode in range(num_episodes):
        state, info = env.reset()
        total_reward = 0
        done = False

        while not done:
            action = agent.choose_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            agent.memory.push(state, action, reward, next_state, done)
            agent.train_step()

            state = next_state
            total_reward += reward

        agent.decay_epsilon()

        if (episode + 1) % target_update_freq == 0:
            agent.update_target_network()

        rewards.append(total_reward)

        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(rewards[-100:])
            print(f"Episode {episode + 1}/{num_episodes} | Avg Reward: {avg_reward:.2f} | Epsilon: {agent.epsilon:.3f}")

    return rewards


# Create fresh environment and agent
env = ClassroomEnv()
agent = DQNAgent(state_dim=2, action_dim=4, learning_rate=0.001, discount_factor=0.99)

print("Training DQN agent...")
print("=" * 50)
rewards = train_dqn(env, agent, num_episodes=1000, target_update_freq=10)

print("\nTraining complete.")
print(f"Final average reward (last 100 episodes): {np.mean(rewards[-100:]):.2f}")
print(f"Replay buffer size: {len(agent.memory)}")
print(f"Final epsilon: {agent.epsilon:.3f}")

Training DQN agent...
Episode 100/1000 | Avg Reward: -3.08 | Epsilon: 0.606
Episode 200/1000 | Avg Reward: -2.26 | Epsilon: 0.367
Episode 300/1000 | Avg Reward: -1.27 | Epsilon: 0.222
Episode 400/1000 | Avg Reward: -0.97 | Epsilon: 0.135
Episode 500/1000 | Avg Reward: -0.83 | Epsilon: 0.082
Episode 600/1000 | Avg Reward: -0.82 | Epsilon: 0.049
Episode 700/1000 | Avg Reward: -0.44 | Epsilon: 0.030
Episode 800/1000 | Avg Reward: -0.46 | Epsilon: 0.018
Episode 900/1000 | Avg Reward: -0.10 | Epsilon: 0.011
Episode 1000/1000 | Avg Reward: 0.86 | Epsilon: 0.010

Training complete.
Final average reward (last 100 episodes): 0.86
Replay buffer size: 10000
Final epsilon: 0.010


In [34]:
# ============================================
# CELL 14: TESTING TRAINED DQN AGENT
# ============================================

def test_trained_dqn(env, agent, num_episodes=3):
    """
    Test the trained DQN agent.
    Sets epsilon=0 to use only learned policy.

    Args:
        env: The ClassroomEnv instance
        agent: The trained DQNAgent instance
        num_episodes: Number of test episodes
    """
    action_names = ["Up", "Down", "Left", "Right"]

    original_epsilon = agent.epsilon
    agent.epsilon = 0.0

    successes = 0

    for episode in range(num_episodes):
        print(f"\nTest Episode {episode + 1}/{num_episodes}")
        print("-" * 40)

        state, info = env.reset(seed=42 + episode)
        env.render()

        total_reward = 0
        done = False

        while not done:
            action = agent.choose_action(state)
            state, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            done = terminated or truncated

            print(f"\nAction: {action_names[action]}")
            env.render()
            print(f"Reward: {reward:.2f} | Total: {total_reward:.2f}")

            if terminated:
                print("Goal reached!")
                successes += 1
                break
            if truncated:
                print("Max steps reached.")
                break

        print(f"\nEpisode summary: {info['step']} steps, Total reward: {total_reward:.2f}")

    agent.epsilon = original_epsilon

    print(f"\nSuccess rate: {successes}/{num_episodes}")
    print("=" * 50)


# Test the trained DQN agent
print("Testing trained DQN agent...")
print("=" * 50)
test_trained_dqn(env, agent, num_episodes=3)

Testing trained DQN agent...

Test Episode 1/3
----------------------------------------

Step 0/50
. . . X G 
. . . . . 
. X . . . 
. . . X . 
A . . . . 
Agent: [0, 0] | Goal: [4, 4]

Action: Up

Step 1/50
. . . X G 
. . . . . 
. X . . . 
A . . X . 
. . . . . 
Agent: [0, 1] | Goal: [4, 4]
Reward: -0.01 | Total: -0.01

Action: Right

Step 2/50
. . . X G 
. . . . . 
. X . . . 
. A . X . 
. . . . . 
Agent: [1, 1] | Goal: [4, 4]
Reward: -0.01 | Total: -0.02

Action: Right

Step 3/50
. . . X G 
. . . . . 
. X . . . 
. . A X . 
. . . . . 
Agent: [2, 1] | Goal: [4, 4]
Reward: -0.01 | Total: -0.03

Action: Up

Step 4/50
. . . X G 
. . . . . 
. X A . . 
. . . X . 
. . . . . 
Agent: [2, 2] | Goal: [4, 4]
Reward: -0.01 | Total: -0.04

Action: Right

Step 5/50
. . . X G 
. . . . . 
. X . A . 
. . . X . 
. . . . . 
Agent: [3, 2] | Goal: [4, 4]
Reward: -0.01 | Total: -0.05

Action: Right

Step 6/50
. . . X G 
. . . . . 
. X . . A 
. . . X . 
. . . . . 
Agent: [4, 2] | Goal: [4, 4]
Reward: -0.01 | To

In [35]:
# ============================================
# CELL 16: Q-LEARNING ON BUILT-IN ENVIRONMENTS (FIXED)
# ============================================

def train_qlearning_builtin(env_name, num_episodes=1000):
    """
    Train Q-Learning agent on a Gymnasium built-in environment.

    Args:
        env_name: Name of Gymnasium environment (e.g., "Taxi-v3")
        num_episodes: Number of training episodes
    """
    # Create environment
    env = gym.make(env_name)

    # Create agent
    agent = QLearningAgent(
        n_actions=env.action_space.n,
        learning_rate=0.9,
        discount_factor=0.95,
        epsilon=0.9
    )

    print(f"\nTraining Q-Learning on {env_name}")
    print("=" * 50)

    rewards = []

    for episode in range(num_episodes):
        state, info = env.reset()

        # Convert state to array immediately (handle discrete envs)
        state = np.array([state]) if isinstance(state, (int, np.integer)) else np.array(state)

        total_reward = 0
        done = False

        while not done:
            action = agent.choose_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            # Convert next_state to array
            next_state = np.array([next_state]) if isinstance(next_state, (int, np.integer)) else np.array(next_state)

            agent.update(state, action, reward, next_state, done)

            state = next_state
            total_reward += reward

        agent.epsilon = max(0.01, agent.epsilon * 0.995)
        rewards.append(total_reward)

        if (episode + 1) % 200 == 0:
            avg_reward = np.mean(rewards[-100:])
            print(f"Episode {episode + 1}/{num_episodes} | Avg Reward: {avg_reward:.2f} | Epsilon: {agent.epsilon:.3f}")

    env.close()

    print(f"\nTraining complete!")
    print(f"Final avg reward: {np.mean(rewards[-100:]):.2f}")
    print(f"States discovered: {len(agent.q_table)}")

    return agent, rewards


# ====================
# PARAMETERS
# ====================
# Try: "Taxi-v3" or "FrozenLake-v1"

print("Available environments for Q-Learning:")
print("  • Taxi-v3 (500 states, easier)")
print("  • FrozenLake-v1 (16 states, harder due to slippery surface)")

ENV_NAME = "FrozenLake-v1"

agent, rewards = train_qlearning_builtin(ENV_NAME, num_episodes=1000)

Available environments for Q-Learning:
  • Taxi-v3 (500 states, easier)
  • FrozenLake-v1 (16 states, harder due to slippery surface)

Training Q-Learning on FrozenLake-v1
Episode 200/1000 | Avg Reward: 0.00 | Epsilon: 0.330
Episode 400/1000 | Avg Reward: 0.00 | Epsilon: 0.121
Episode 600/1000 | Avg Reward: 0.00 | Epsilon: 0.044
Episode 800/1000 | Avg Reward: 0.00 | Epsilon: 0.016
Episode 1000/1000 | Avg Reward: 0.00 | Epsilon: 0.010

Training complete!
Final avg reward: 0.00
States discovered: 11


In [36]:
# ============================================
# CELL 17: TEST Q-LEARNING ON BUILT-IN ENVIRONMENT
# ============================================

def test_qlearning_builtin(env_name, agent, num_episodes=3):
    """
    Test trained Q-Learning agent on built-in environment.
    """
    env = gym.make(env_name, render_mode='ansi')

    original_epsilon = agent.epsilon
    agent.epsilon = 0.0

    successes = 0

    for episode in range(num_episodes):
        print(f"\nTest Episode {episode + 1}/{num_episodes}")
        print("-" * 40)

        state, info = env.reset()
        state = np.array([state]) if isinstance(state, (int, np.integer)) else np.array(state)

        total_reward = 0
        done = False
        steps = 0

        while not done:
            action = agent.choose_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            next_state = np.array([next_state]) if isinstance(next_state, (int, np.integer)) else np.array(next_state)

            done = terminated or truncated
            total_reward += reward
            state = next_state
            steps += 1

        if reward > 0:
            successes += 1
            print(f"Success! Steps: {steps}, Reward: {total_reward:.2f}")
        else:
            print(f"Failed. Steps: {steps}, Reward: {total_reward:.2f}")

    env.close()
    agent.epsilon = original_epsilon

    print(f"\nSuccess rate: {successes}/{num_episodes}")

# Test the trained agent
test_qlearning_builtin(ENV_NAME, agent, num_episodes=3)


Test Episode 1/3
----------------------------------------
Failed. Steps: 15, Reward: 0.00

Test Episode 2/3
----------------------------------------
Failed. Steps: 16, Reward: 0.00

Test Episode 3/3
----------------------------------------
Failed. Steps: 5, Reward: 0.00

Success rate: 0/3


In [37]:
# ============================================
# CELL 18: DQN ON BUILT-IN ENVIRONMENTS
# ============================================

def train_dqn_builtin(env_name, num_episodes=500):
    """
    Train DQN agent on a Gymnasium built-in environment.

    Args:
        env_name: Name of Gymnasium environment (e.g., "CartPole-v1")
        num_episodes: Number of training episodes
    """
    # Create environment
    env = gym.make(env_name)

    # Get state and action dimensions
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    # Create agent
    agent = DQNAgent(
        state_dim=state_dim,
        action_dim=action_dim,
        learning_rate=0.001,
        discount_factor=0.99
    )

    print(f"\nTraining DQN on {env_name}")
    print(f"State dimension: {state_dim}, Action dimension: {action_dim}")
    print("=" * 50)

    rewards = []

    for episode in range(num_episodes):
        state, info = env.reset()
        total_reward = 0
        done = False

        while not done:
            action = agent.choose_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            agent.memory.push(state, action, reward, next_state, done)
            agent.train_step()

            state = next_state
            total_reward += reward

        agent.decay_epsilon()

        if (episode + 1) % 10 == 0:
            agent.update_target_network()

        rewards.append(total_reward)

        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(rewards[-100:])
            print(f"Episode {episode + 1}/{num_episodes} | Avg Reward: {avg_reward:.2f} | Epsilon: {agent.epsilon:.3f}")

    env.close()

    print(f"\nTraining complete!")
    print(f"Final avg reward: {np.mean(rewards[-100:]):.2f}")

    return agent, rewards


# ====================
# PARAMETERS
# ====================
# Try: "CartPole-v1" or "MountainCar-v0"

print("Available environments for DQN:")
print("  • CartPole-v1 (balance pole, easier)")
print("  • MountainCar-v0 (drive up hill, harder)")

ENV_NAME = "CartPole-v1"

agent, rewards = train_dqn_builtin(ENV_NAME, num_episodes=500)

Available environments for DQN:
  • CartPole-v1 (balance pole, easier)
  • MountainCar-v0 (drive up hill, harder)

Training DQN on CartPole-v1
State dimension: 4, Action dimension: 2
Episode 100/500 | Avg Reward: 34.03 | Epsilon: 0.606
Episode 200/500 | Avg Reward: 71.99 | Epsilon: 0.367
Episode 300/500 | Avg Reward: 115.04 | Epsilon: 0.222
Episode 400/500 | Avg Reward: 153.23 | Epsilon: 0.135
Episode 500/500 | Avg Reward: 108.38 | Epsilon: 0.082

Training complete!
Final avg reward: 108.38


In [38]:
# ============================================
# CELL 19: TEST DQN ON BUILT-IN ENVIRONMENT
# ============================================

def test_dqn_builtin(env_name, agent, num_episodes=3):
    """
    Test trained DQN agent on built-in environment.
    """
    env = gym.make(env_name)

    original_epsilon = agent.epsilon
    agent.epsilon = 0.0

    successes = 0

    for episode in range(num_episodes):
        print(f"\nTest Episode {episode + 1}/{num_episodes}")
        print("-" * 40)

        state, info = env.reset()
        total_reward = 0
        done = False
        steps = 0

        while not done:
            action = agent.choose_action(state)
            state, reward, terminated, truncated, info = env.step(action)

            done = terminated or truncated
            total_reward += reward
            steps += 1

        if total_reward > 195:  # CartPole success threshold
            successes += 1
            print(f"Success! Steps: {steps}, Reward: {total_reward:.2f}")
        else:
            print(f"Steps: {steps}, Reward: {total_reward:.2f}")

    env.close()
    agent.epsilon = original_epsilon

    print(f"\nAverage reward: {np.mean([total_reward]):.2f}")

# Test the trained agent
test_dqn_builtin(ENV_NAME, agent, num_episodes=3)


Test Episode 1/3
----------------------------------------
Steps: 94, Reward: 94.00

Test Episode 2/3
----------------------------------------
Steps: 108, Reward: 108.00

Test Episode 3/3
----------------------------------------
Steps: 99, Reward: 99.00

Average reward: 99.00
