In [None]:
```json
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pixel Paladin RL - Model Training Notebook"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Description:** Train an AI agent using Reinforcement Learning (DQN) to master a custom-built Pygame maze environment. The agent learns optimal policies through trial-and-error interaction."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Setup and Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pygame\n",
    "import numpy as np\n",
    "import tensorflow as tf\n",
    "from tensorflow.keras import layers, models, optimizers, losses\n",
    "import matplotlib.pyplot as plt\n",
    "import random\n",
    "from collections import deque\n",
    "import time\n",
    "import gymnasium as gym\n",
    "from gymnasium import spaces\n",
    "import os\n",
    "\n",
    "# Suppress pygame welcome message\n",
    "os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = \"hide\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Custom Pygame Environment Definition"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class MazeEnv(gym.Env):\n",
    "    metadata = {'render_modes': ['human', 'rgb_array'], 'render_fps': 10}\n",
    "\n",
    "    def __init__(self, maze=None, render_mode=None, size=10):\n",
    "        super().__init__()\n",
    "\n",
    "        self.size = size  # Size of the maze grid (size x size)\n",
    "        self.cell_size = 40 # Pixel size of each grid cell\n",
    "        self.window_size = self.size * self.cell_size # Pygame window dimensions\n",
    "\n",
    "        # Define action and observation space\n",
    "        # Actions: 0=Up, 1=Down, 2=Left, 3=Right\n",
    "        self.action_space = spaces.Discrete(4)\n",
    "        # Observation is the agent's (row, col) position\n",
    "        self.observation_space = spaces.Box(0, self.size - 1, shape=(2,), dtype=int)\n",
    "\n",
    "        # Default maze if none provided\n",
    "        if maze is None:\n",
    "            self.maze = self._generate_default_maze()\n",
    "        else:\n",
    "            self.maze = np.array(maze)\n",
    "            assert self.maze.shape == (self.size, self.size), \"Provided maze has incorrect dimensions\"\n",
    "\n",
    "        # Find start and goal positions\n",
    "        start_pos = np.argwhere(self.maze == 2)\n",
    "        goal_pos = np.argwhere(self.maze == 3)\n",
    "        assert len(start_pos) == 1, \"Maze must have exactly one start point (marked with 2)\"\n",
    "        assert len(goal_pos) == 1, \"Maze must have exactly one goal point (marked with 3)\"\n",
    "        self._start_pos = tuple(start_pos[0])\n",
    "        self._goal_pos = tuple(goal_pos[0])\n",
    "\n",
    "        self._agent_location = None\n",
    "\n",
    "        assert render_mode is None or render_mode in self.metadata[\"render_modes\"]\n",
    "        self.render_mode = render_mode\n",
    "\n",
    "        self.window = None\n",
    "        self.clock = None\n",
    "\n",
    "    def _generate_default_maze(self):\n",
    "        # 0: Empty, 1: Wall, 2: Start, 3: Goal\n",
    "        maze = np.zeros((self.size, self.size), dtype=int)\n",
    "        # Add boundary walls\n",
    "        maze[0, :] = 1\n",
    "        maze[-1, :] = 1\n",
    "        maze[:, 0] = 1\n",
    "        maze[:, -1] = 1\n",
    "        # Add some internal walls\n",
    "        maze[1:self.size-1, self.size // 2] = 1\n",
    "        maze[self.size // 2, 1:self.size-1] = 1\n",
    "        maze[self.size // 2, self.size // 2] = 0 # Clear center crossing\n",
    "        # Start and Goal\n",
    "        maze[1, 1] = 2\n",
    "        maze[self.size - 2, self.size - 2] = 3\n",
    "        return maze\n",
    "\n",
    "    def _get_obs(self):\n",
    "        return np.array(self._agent_location, dtype=int)\n",
    "\n",
    "    def _get_info(self):\n",
    "        # Optional info, e.g., distance to goal (not used by agent directly)\n",
    "        return {\"distance\": np.linalg.norm(np.array(self._agent_location) - np.array(self._goal_pos))}\n",
    "\n",
    "    def reset(self, seed=None, options=None):\n",
    "        super().reset(seed=seed)\n",
    "        self._agent_location = self._start_pos\n",
    "        observation = self._get_obs()\n",
    "        info = self._get_info()\n",
    "\n",
    "        if self.render_mode == \"human\":\n",
    "            self._render_frame()\n",
    "\n",
    "        return observation, info\n",
    "\n",
    "    def step(self, action):\n",
    "        # Map the action (element of {0, 1, 2, 3}) to the direction we walk in\n",
    "        direction_map = {\n",
    "            0: (-1, 0),  # Up\n",
    "            1: (1, 0),   # Down\n",
    "            2: (0, -1),  # Left\n",
    "            3: (0, 1)    # Right\n",
    "        }\n",
    "        direction = direction_map[action]\n",
    "\n",
    "        # Calculate potential new position\n",
    "        new_location = (\n",
    "            self._agent_location[0] + direction[0],\n",
    "            self._agent_location[1] + direction[1]\n",
    "        )\n",
    "\n",
    "        terminated = False\n",
    "        reward = -0.1 # Small penalty for each step to encourage efficiency\n",
    "\n",
    "        # Check if the new location is valid (within bounds and not a wall)\n",
    "        if (0 <= new_location[0] < self.size and\n",
    "            0 <= new_location[1] < self.size and\n",
    "            self.maze[new_location[0], new_location[1]] != 1):\n",
    "            self._agent_location = new_location\n",
    "        else:\n",
    "            reward = -0.5 # Penalty for hitting a wall\n",
    "\n",
    "        # Check if the agent reached the goal\n",
    "        if self._agent_location == self._goal_pos:\n",
    "            reward = 10.0 # Large reward for reaching the goal\n",
    "            terminated = True\n",
    "\n",
    "        truncated = False # We don't have a step limit here, but could add one\n",
    "        observation = self._get_obs()\n",
    "        info = self._get_info()\n",
    "\n",
    "        if self.render_mode == \"human\":\n",
    "            self._render_frame()\n",
    "\n",
    "        return observation, reward, terminated, truncated, info\n",
    "\n",
    "    def render(self):\n",
    "        if self.render_mode == \"rgb_array\":\n",
    "            return self._render_frame()\n",
    "        elif self.render_mode == \"human\":\n",
    "             self._render_frame()\n",
    "\n",
    "    def _render_frame(self):\n",
    "        if self.window is None and self.render_mode == \"human\":\n",
    "            pygame.init()\n",
    "            pygame.display.init()\n",
    "            self.window = pygame.display.set_mode((self.window_size, self.window_size))\n",
    "            pygame.display.set_caption(\"Pixel Paladin RL - Maze Environment\")\n",
    "        if self.clock is None and self.render_mode == \"human\":\n",
    "            self.clock = pygame.time.Clock()\n",
    "\n",
    "        canvas = pygame.Surface((self.window_size, self.window_size))\n",
    "        canvas.fill((255, 255, 255)) # White background\n",
    "\n",
    "        # Draw the maze elements\n",
    "        for r in range(self.size):\n",
    "            for c in range(self.size):\n",
    "                rect = pygame.Rect(c * self.cell_size, r * self.cell_size, self.cell_size, self.cell_size)\n",
    "                if self.maze[r, c] == 1: # Wall\n",
    "                    pygame.draw.rect(canvas, (0, 0, 0), rect) # Black\n",
    "                elif self.maze[r, c] == 3: # Goal\n",
    "                    pygame.draw.rect(canvas, (0, 255, 0), rect) # Green\n",
    "                elif (r, c) == self._start_pos: # Start (only draw if agent isn't there)\n",
    "                     if self._agent_location != self._start_pos:\n",
    "                         pygame.draw.rect(canvas, (200, 200, 200), rect) # Light Gray\n",
    "\n",
    "        # Draw the agent\n",
    "        agent_rect = pygame.Rect(\n",
    "            self._agent_location[1] * self.cell_size + self.cell_size // 4,\n",
    "            self._agent_location[0] * self.cell_size + self.cell_size // 4,\n",
    "            self.cell_size // 2,\n",
    "            self.cell_size // 2\n",
    "        )\n",
    "        pygame.draw.rect(canvas, (0, 0, 255), agent_rect) # Blue\n",
    "\n",
    "        if self.render_mode == \"human\":\n",
    "            # Update the screen\n",
    "            self.window.blit(canvas, canvas.get_rect())\n",
    "            pygame.event.pump()\n",
    "            pygame.display.update()\n",
    "            self.clock.tick(self.metadata[\"render_fps\"])\n",
    "        else:  # rgb_array\n",
    "            return np.transpose(\n",
    "                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)\n",
    "            )\n",
    "\n",
    "    def close(self):\n",
    "        if self.window is not None:\n",
    "            pygame.display.quit()\n",
    "            pygame.quit()\n",
    "            self.window = None\n",
    "            self.clock = None"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Hyperparameter Definition"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Environment parameters\n",
    "MAZE_SIZE = 10\n",
    "\n",
    "# DQN Agent parameters\n",
    "STATE_SIZE = 2 # (row, col)\n",
    "ACTION_SIZE = 4 # (up, down, left, right)\n",
    "LEARNING_RATE = 0.001\n",
    "GAMMA = 0.99         # Discount factor for future rewards\n",
    "MEMORY_SIZE = 10000  # Max size of the replay buffer\n",
    "BATCH_SIZE = 64      # Number of experiences to sample from memory for training\n",
    "\n",
    "# Exploration parameters (Epsilon-Greedy)\n",
    "EPSILON_START = 1.0\n",
    "EPSILON_END = 0.01\n",
    "EPSILON_DECAY_STEPS = 10000 # How many steps to decay epsilon over\n",
    "\n",
    "# Training parameters\n",
    "TOTAL_EPISODES = 500\n",
    "MAX_STEPS_PER_EPISODE = 200 # Prevent infinitely running episodes\n",
    "TARGET_UPDATE_FREQ = 100   # How often (in steps) to update the target network\n",
    "TRAIN_START_STEPS = 1000   # Start training only after this many steps have been collected\n",
    "LEARNING_FREQ = 4          # Train the model every N steps"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4. DQN Agent Definition"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class DQNAgent:\n",
    "    def __init__(self, state_size, action_size, learning_rate, gamma, memory_size, batch_size,\n",
    "                 epsilon_start, epsilon_end, epsilon_decay_steps):\n",
    "        self.state_size = state_size\n",
    "        self.action_size = action_size\n",
    "        self.memory = deque(maxlen=memory_size)\n",
    "        self.gamma = gamma\n",
    "        self.epsilon = epsilon_start\n",
    "        self.epsilon_start = epsilon_start\n",
    "        self.epsilon_end = epsilon_end\n",
    "        self.epsilon_decay_steps = epsilon_decay_steps\n",
    "        self.batch_size = batch_size\n",
    "        self.learning_rate = learning_rate\n",
    "        self.model = self._build_model()\n",
    "        self.target_model = self._build_model()\n",
    "        self.update_target_model() # Initialize target model weights\n",
    "        self.optimizer = optimizers.Adam(learning_rate=self.learning_rate)\n",
    "        self.loss_function = losses.MeanSquaredError()\n",
    "        self.step_count = 0\n",
    "\n",
    "    def _build_model(self):\n",
    "        # Simple Feed Forward Neural Network\n",
    "        model = models.Sequential([\n",
    "            layers.Input(shape=(self.state_size,)),\n",
    "            layers.Dense(64, activation='relu'),\n",
    "            layers.Dense(64, activation='relu'),\n",
    "            layers.Dense(self.action_size, activation='linear') # Q-values for each action\n",
    "        ])\n",
    "        # No compile here, we handle loss and optimization manually in train_step\n",
    "        return model\n",
    "\n",
    "    def update_target_model(self):\n",
    "        # Copy weights from model to target_model\n",
    "        self.target_model.set_weights(self.model.get_weights())\n",
    "\n",
    "    def remember(self, state, action, reward, next_state, done):\n",
    "        # Store experience tuple in replay memory\n",
    "        self.memory.append((state, action, reward, next_state, done))\n",
    "\n",
    "    def act(self, state):\n",
    "        # Epsilon-greedy action selection\n",
    "        self.step_count += 1\n",
    "        # Update epsilon\n",
    "        epsilon_decay = (self.epsilon_start - self.epsilon_end) / self.epsilon_decay_steps\n",
    "        self.epsilon = max(self.epsilon_end, self.epsilon_start - epsilon_decay * self.step_count)\n",
    "\n",
    "        if np.random.rand() <= self.epsilon:\n",
    "            return random.randrange(self.action_size) # Explore: random action\n",
    "        else:\n",
    "            # Exploit: predict Q-values and choose the best action\n",
    "            state_tensor = tf.convert_to_tensor(state)\n",
    "            state_tensor = tf.expand_dims(state_tensor, 0) # Add batch dimension\n",
    "            act_values = self.model(state_tensor, training=False)\n",
    "            return np.argmax(act_values[0].numpy()) # Choose action with highest Q-value\n",
    "\n",
    "    @tf.function # Decorator for potential performance improvement\n",
    "    def train_step(self, states, actions, rewards, next_states, dones):\n",
    "        # Predict Q-values for the next states using the target network\n",
    "        future_rewards = self.target_model(next_states, training=False)\n",
    "        # Q(s', a') = max_a' Q_target(s', a')\n",
    "        updated_q_values = rewards + self.gamma * tf.reduce_max(future_rewards, axis=1) * (1 - dones)\n",
    "\n",
    "        # Create a mask to only update the Q-value for the action taken\n",
    "        masks = tf.one_hot(actions, self.action_size)\n",
    "\n",
    "        with tf.GradientTape() as tape:\n",
    "            # Predict Q-values for the current states using the main network\n",
    "            q_values = self.model(states, training=True)\n",
    "            # Select the Q-value for the action that was actually taken\n",
    "            q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)\n",
    "            # Calculate loss between predicted Q-value and target Q-value\n",
    "            loss = self.loss_function(updated_q_values, q_action)\n",
    "\n",
    "        # Calculate gradients and update the main network weights\n",
    "        grads = tape.gradient(loss, self.model.trainable_variables)\n",
    "        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))\n",
    "        return loss\n",
    "\n",
    "    def replay(self):\n",
    "        # Train the network using experiences sampled from memory\n",
    "        if len(self.memory) < self.batch_size:\n",
    "            return 0 # Not enough memory yet\n",
    "\n",
    "        # Sample a minibatch of experiences\n",
    "        minibatch = random.sample(self.memory, self.batch_size)\n",
    "\n",
    "        # Separate the components of the minibatch\n",
    "        states = np.array([experience[0] for experience in minibatch])\n",
    "        actions = np.array([experience[1] for experience in minibatch])\n",
    "        rewards = np.array([experience[2] for experience in minibatch])\n",
    "        next_states = np.array([experience[3] for experience in minibatch])\n",
    "        dones = np.array([experience[4] for experience in minibatch]).astype(np.float32) # Convert boolean to float for calculation\n",
    "\n",
    "        # Convert numpy arrays to TensorFlow tensors\n",
    "        states_tensor = tf.convert_to_tensor(states, dtype=tf.float32)\n",
    "        actions_tensor = tf.convert_to_tensor(actions, dtype=tf.int32)\n",
    "        rewards_tensor = tf.convert_to_tensor(rewards, dtype=tf.float32)\n",
    "        next_states_tensor = tf.convert_to_tensor(next_states, dtype=tf.float32)\n",
    "        dones_tensor = tf.convert_to_tensor(dones, dtype=tf.float32)\n",
    "\n",
    "        loss = self.train_step(states_tensor, actions_tensor, rewards_tensor, next_states_tensor, dones_tensor)\n",
    "        return loss.numpy()\n",
    "\n",
    "    def load(self, name):\n",
    "        self.model.load_weights(name)\n",
    "        self.update_target_model() # Ensure target model is also updated\n",
    "\n",
    "    def save(self, name):\n",
    "        self.model.save_weights(name)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5. Model Training"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Initialize environment and agent\n",
    "env = MazeEnv(size=MAZE_SIZE)\n",
    "agent = DQNAgent(\n",
    "    state_size=STATE_SIZE,\n",
    "    action_size=ACTION_SIZE,\n",
    "    learning_rate=LEARNING_RATE,\n",
    "    gamma=GAMMA,\n",
    "    memory_size=MEMORY_SIZE,\n",
    "    batch_size=BATCH_SIZE,\n",
    "    epsilon_start=EPSILON_START,\n",
    "    epsilon_end=EPSILON_END,\n",
    "    epsilon_decay_steps=EPSILON_DECAY_STEPS\n",
    ")\n",
    "\n",
    "episode_rewards = []\n",
    "training_losses = []\n",
    "total_steps = 0\n",
    "start_time = time.time()\n",
    "\n",
    "print(f\"Starting training for {TOTAL_EPISODES} episodes...\")\n",
    "\n",
    "for episode in range(1, TOTAL_EPISODES + 1):\n",
    "    state, _ = env.reset()\n",
    "    state = np.reshape(state, [1, STATE_SIZE]).astype(np.float32) # Reshape and ensure float type\n",
    "    episode_reward = 0\n",
    "    episode_loss = []\n",
    "\n",
    "    for step in range(1, MAX_STEPS_PER_EPISODE + 1):\n",
    "        # Select action\n",
    "        action = agent.act(state)\n",
    "\n",
    "        # Take action in environment\n",
    "        next_state, reward, terminated, truncated, _ = env.step(action)\n",
    "        next_state = np.reshape(next_state, [1, STATE_SIZE]).astype(np.float32)\n",
    "        done = terminated or truncated\n",
    "\n",
    "        # Store experience\n",
    "        agent.remember(state[0], action, reward, next_state[0], done) # Store flattened state\n",
    "\n",
    "        # Move to next state\n",
    "        state = next_state\n",
    "        episode_reward += reward\n",
    "        total_steps += 1\n",
    "\n",
    "        # Train the agent\n",
    "        if total_steps > TRAIN_START_STEPS and total_steps % LEARNING_FREQ == 0:\n",
    "            loss = agent.replay()\n",
    "            if loss is not None:\n",
    "                 episode_loss.append(loss)\n",
    "\n",
    "        # Update target network\n",
    "        if total_steps % TARGET_UPDATE_FREQ == 0:\n",
    "            agent.update_target_model()\n",
    "\n",
    "        if