# Reinforcement Learning: Q-Learning Algorithm

## 1. Introduction

**Q-Learning** is a model-free, off-policy reinforcement learning algorithm that learns the optimal action-value function $Q^*(s, a)$ directly from experience, without requiring a model of the environment dynamics.

### 1.1 The Reinforcement Learning Framework

In reinforcement learning, an agent interacts with an environment over discrete time steps. At each time step $t$:
- The agent observes state $s_t \in \mathcal{S}$
- Selects an action $a_t \in \mathcal{A}$
- Receives a reward $r_{t+1} \in \mathbb{R}$
- Transitions to a new state $s_{t+1}$

The goal is to find a policy $\pi: \mathcal{S} \rightarrow \mathcal{A}$ that maximizes the expected cumulative discounted reward:

$$G_t = \sum_{k=0}^{\infty} \gamma^k r_{t+k+1}$$

where $\gamma \in [0, 1)$ is the discount factor.

### 1.2 The Action-Value Function

The **action-value function** (or Q-function) under policy $\pi$ is defined as:

$$Q^\pi(s, a) = \mathbb{E}_\pi\left[G_t \mid s_t = s, a_t = a\right] = \mathbb{E}_\pi\left[\sum_{k=0}^{\infty} \gamma^k r_{t+k+1} \mid s_t = s, a_t = a\right]$$

The **optimal action-value function** is:

$$Q^*(s, a) = \max_\pi Q^\pi(s, a)$$

### 1.3 The Bellman Optimality Equation

The optimal Q-function satisfies the **Bellman optimality equation**:

$$Q^*(s, a) = \mathbb{E}\left[r + \gamma \max_{a'} Q^*(s', a') \mid s, a\right]$$

This recursive relationship is the foundation of Q-learning.

### 1.4 The Q-Learning Update Rule

Q-learning uses **temporal difference (TD) learning** to iteratively update the Q-values:

$$Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t)\right]$$

where:
- $\alpha \in (0, 1]$ is the learning rate
- $r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a')$ is the **TD target**
- $r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t)$ is the **TD error** $\delta_t$

### 1.5 Exploration vs. Exploitation

Q-learning requires balancing exploration (trying new actions) and exploitation (using known good actions). The **$\epsilon$-greedy** policy addresses this:

$$\pi(a|s) = \begin{cases} 1 - \epsilon + \frac{\epsilon}{|\mathcal{A}|} & \text{if } a = \arg\max_{a'} Q(s, a') \\ \frac{\epsilon}{|\mathcal{A}|} & \text{otherwise} \end{cases}$$

### 1.6 Convergence Properties

Under certain conditions (all state-action pairs visited infinitely often, learning rate satisfying Robbins-Monro conditions), Q-learning converges to $Q^*$ with probability 1:

$$\sum_{t=1}^{\infty} \alpha_t = \infty \quad \text{and} \quad \sum_{t=1}^{\infty} \alpha_t^2 < \infty$$

## 2. Implementation: Gridworld Environment

We will implement Q-learning on a simple gridworld environment where an agent must navigate from a start position to a goal while avoiding obstacles.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle, FancyArrowPatch
from matplotlib.colors import LinearSegmentedColormap
import warnings
warnings.filterwarnings('ignore')

# Set random seed for reproducibility
np.random.seed(42)

In [None]:
class GridWorld:
    """
    A simple gridworld environment for Q-learning.
    
    The agent starts at position (0, 0) and must reach the goal.
    Walls are impassable, and stepping on traps gives negative reward.
    """
    
    def __init__(self, size=5):
        self.size = size
        self.n_states = size * size
        self.n_actions = 4  # up, down, left, right
        
        # Action mappings
        self.actions = {
            0: (-1, 0),  # up
            1: (1, 0),   # down
            2: (0, -1),  # left
            3: (0, 1)    # right
        }
        self.action_names = ['↑', '↓', '←', '→']
        
        # Define environment layout
        self.start = (0, 0)
        self.goal = (size - 1, size - 1)
        
        # Define walls (impassable cells)
        self.walls = {(1, 1), (2, 1), (3, 1), (1, 3), (2, 3)}
        
        # Define traps (negative reward cells)
        self.traps = {(0, 4), (3, 3), (4, 1)}
        
        # Rewards
        self.goal_reward = 100
        self.trap_reward = -50
        self.step_reward = -1
        
        self.reset()
    
    def reset(self):
        """Reset the agent to the starting position."""
        self.agent_pos = self.start
        return self._pos_to_state(self.agent_pos)
    
    def _pos_to_state(self, pos):
        """Convert (row, col) position to state index."""
        return pos[0] * self.size + pos[1]
    
    def _state_to_pos(self, state):
        """Convert state index to (row, col) position."""
        return (state // self.size, state % self.size)
    
    def step(self, action):
        """Take an action and return (next_state, reward, done)."""
        # Calculate new position
        delta = self.actions[action]
        new_pos = (self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1])
        
        # Check boundaries and walls
        if (0 <= new_pos[0] < self.size and 
            0 <= new_pos[1] < self.size and 
            new_pos not in self.walls):
            self.agent_pos = new_pos
        
        # Calculate reward
        if self.agent_pos == self.goal:
            reward = self.goal_reward
            done = True
        elif self.agent_pos in self.traps:
            reward = self.trap_reward
            done = False
        else:
            reward = self.step_reward
            done = False
        
        return self._pos_to_state(self.agent_pos), reward, done
    
    def get_valid_actions(self, state):
        """Get list of valid actions from a state."""
        pos = self._state_to_pos(state)
        valid = []
        for action, delta in self.actions.items():
            new_pos = (pos[0] + delta[0], pos[1] + delta[1])
            if (0 <= new_pos[0] < self.size and 
                0 <= new_pos[1] < self.size and 
                new_pos not in self.walls):
                valid.append(action)
        return valid if valid else list(range(self.n_actions))

## 3. Q-Learning Agent Implementation

In [None]:
class QLearningAgent:
    """
    Q-Learning agent with epsilon-greedy exploration.
    """
    
    def __init__(self, n_states, n_actions, alpha=0.1, gamma=0.99, 
                 epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        """
        Initialize the Q-learning agent.
        
        Parameters:
        -----------
        n_states : int
            Number of states in the environment
        n_actions : int
            Number of possible actions
        alpha : float
            Learning rate
        gamma : float
            Discount factor
        epsilon : float
            Initial exploration rate
        epsilon_decay : float
            Decay rate for epsilon after each episode
        epsilon_min : float
            Minimum epsilon value
        """
        self.n_states = n_states
        self.n_actions = n_actions
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        
        # Initialize Q-table with zeros
        self.Q = np.zeros((n_states, n_actions))
        
        # For tracking learning progress
        self.td_errors = []
    
    def choose_action(self, state):
        """Choose action using epsilon-greedy policy."""
        if np.random.random() < self.epsilon:
            return np.random.randint(self.n_actions)
        else:
            return np.argmax(self.Q[state])
    
    def learn(self, state, action, reward, next_state, done):
        """
        Update Q-value using the Q-learning update rule.
        
        Q(s,a) ← Q(s,a) + α[r + γ max_a' Q(s',a') - Q(s,a)]
        """
        # Calculate TD target
        if done:
            td_target = reward
        else:
            td_target = reward + self.gamma * np.max(self.Q[next_state])
        
        # Calculate TD error
        td_error = td_target - self.Q[state, action]
        self.td_errors.append(abs(td_error))
        
        # Update Q-value
        self.Q[state, action] += self.alpha * td_error
    
    def decay_epsilon(self):
        """Decay epsilon after each episode."""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

## 4. Training the Agent

In [None]:
def train_agent(env, agent, n_episodes=1000, max_steps=100):
    """
    Train the Q-learning agent.
    
    Returns:
    --------
    rewards_history : list
        Total reward per episode
    steps_history : list
        Number of steps per episode
    epsilon_history : list
        Epsilon value per episode
    """
    rewards_history = []
    steps_history = []
    epsilon_history = []
    
    for episode in range(n_episodes):
        state = env.reset()
        total_reward = 0
        
        for step in range(max_steps):
            # Choose and take action
            action = agent.choose_action(state)
            next_state, reward, done = env.step(action)
            
            # Learn from experience
            agent.learn(state, action, reward, next_state, done)
            
            total_reward += reward
            state = next_state
            
            if done:
                break
        
        rewards_history.append(total_reward)
        steps_history.append(step + 1)
        epsilon_history.append(agent.epsilon)
        
        # Decay exploration rate
        agent.decay_epsilon()
        
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(rewards_history[-100:])
            avg_steps = np.mean(steps_history[-100:])
            print(f"Episode {episode + 1}/{n_episodes} | "
                  f"Avg Reward: {avg_reward:.2f} | "
                  f"Avg Steps: {avg_steps:.2f} | "
                  f"Epsilon: {agent.epsilon:.3f}")
    
    return rewards_history, steps_history, epsilon_history

# Create environment and agent
env = GridWorld(size=5)
agent = QLearningAgent(
    n_states=env.n_states,
    n_actions=env.n_actions,
    alpha=0.1,
    gamma=0.99,
    epsilon=1.0,
    epsilon_decay=0.995,
    epsilon_min=0.01
)

# Train the agent
print("Training Q-Learning Agent...\n")
rewards, steps, epsilons = train_agent(env, agent, n_episodes=1000, max_steps=100)

## 5. Visualization of Results

In [None]:
def plot_results(rewards, steps, epsilons, agent, env):
    """
    Create comprehensive visualization of Q-learning results.
    """
    fig = plt.figure(figsize=(16, 12))
    
    # Plot 1: Learning Curve (Rewards)
    ax1 = fig.add_subplot(2, 3, 1)
    window = 50
    smoothed_rewards = np.convolve(rewards, np.ones(window)/window, mode='valid')
    ax1.plot(rewards, alpha=0.3, color='blue', label='Episode Reward')
    ax1.plot(range(window-1, len(rewards)), smoothed_rewards, 
             color='red', linewidth=2, label=f'{window}-Episode Moving Avg')
    ax1.set_xlabel('Episode')
    ax1.set_ylabel('Total Reward')
    ax1.set_title('Learning Curve: Rewards per Episode')
    ax1.legend(loc='lower right')
    ax1.grid(True, alpha=0.3)
    
    # Plot 2: Steps per Episode
    ax2 = fig.add_subplot(2, 3, 2)
    smoothed_steps = np.convolve(steps, np.ones(window)/window, mode='valid')
    ax2.plot(steps, alpha=0.3, color='green', label='Episode Steps')
    ax2.plot(range(window-1, len(steps)), smoothed_steps, 
             color='darkgreen', linewidth=2, label=f'{window}-Episode Moving Avg')
    ax2.set_xlabel('Episode')
    ax2.set_ylabel('Steps')
    ax2.set_title('Steps per Episode')
    ax2.legend(loc='upper right')
    ax2.grid(True, alpha=0.3)
    
    # Plot 3: Epsilon Decay
    ax3 = fig.add_subplot(2, 3, 3)
    ax3.plot(epsilons, color='purple', linewidth=2)
    ax3.set_xlabel('Episode')
    ax3.set_ylabel('Epsilon (ε)')
    ax3.set_title('Exploration Rate Decay')
    ax3.grid(True, alpha=0.3)
    ax3.set_ylim(0, 1.05)
    
    # Plot 4: Q-Value Heatmap (max Q per state)
    ax4 = fig.add_subplot(2, 3, 4)
    max_Q = np.max(agent.Q, axis=1).reshape(env.size, env.size)
    im = ax4.imshow(max_Q, cmap='viridis', interpolation='nearest')
    plt.colorbar(im, ax=ax4, label='Max Q-Value')
    
    # Mark special cells
    for wall in env.walls:
        ax4.add_patch(Rectangle((wall[1]-0.5, wall[0]-0.5), 1, 1, 
                                fill=True, color='black', alpha=0.7))
    for trap in env.traps:
        ax4.add_patch(Rectangle((trap[1]-0.5, trap[0]-0.5), 1, 1, 
                                fill=True, color='red', alpha=0.3))
    
    ax4.plot(env.start[1], env.start[0], 'go', markersize=15, label='Start')
    ax4.plot(env.goal[1], env.goal[0], 'y*', markersize=20, label='Goal')
    ax4.set_title('Learned State Values (max Q)')
    ax4.set_xlabel('Column')
    ax4.set_ylabel('Row')
    ax4.legend(loc='upper left', fontsize=8)
    
    # Plot 5: Optimal Policy Visualization
    ax5 = fig.add_subplot(2, 3, 5)
    
    # Create grid
    for i in range(env.size + 1):
        ax5.axhline(y=i, color='black', linewidth=0.5)
        ax5.axvline(x=i, color='black', linewidth=0.5)
    
    # Draw cells and policy arrows
    arrow_params = {
        0: (0, 0.3),   # up
        1: (0, -0.3),  # down
        2: (-0.3, 0),  # left
        3: (0.3, 0)    # right
    }
    
    for state in range(env.n_states):
        row, col = env._state_to_pos(state)
        pos = (col + 0.5, env.size - row - 0.5)
        
        # Color cells
        if (row, col) in env.walls:
            ax5.add_patch(Rectangle((col, env.size - row - 1), 1, 1, 
                                   fill=True, color='black'))
        elif (row, col) in env.traps:
            ax5.add_patch(Rectangle((col, env.size - row - 1), 1, 1, 
                                   fill=True, color='red', alpha=0.3))
        elif (row, col) == env.goal:
            ax5.add_patch(Rectangle((col, env.size - row - 1), 1, 1, 
                                   fill=True, color='gold', alpha=0.5))
        elif (row, col) == env.start:
            ax5.add_patch(Rectangle((col, env.size - row - 1), 1, 1, 
                                   fill=True, color='green', alpha=0.3))
        
        # Draw policy arrow (skip walls and goal)
        if (row, col) not in env.walls and (row, col) != env.goal:
            best_action = np.argmax(agent.Q[state])
            dx, dy = arrow_params[best_action]
            ax5.arrow(pos[0], pos[1], dx, dy, head_width=0.15, 
                     head_length=0.1, fc='blue', ec='blue')
    
    ax5.set_xlim(0, env.size)
    ax5.set_ylim(0, env.size)
    ax5.set_aspect('equal')
    ax5.set_title('Learned Optimal Policy')
    ax5.set_xlabel('Column')
    ax5.set_ylabel('Row')
    
    # Plot 6: TD Error Convergence
    ax6 = fig.add_subplot(2, 3, 6)
    td_window = 100
    td_errors = agent.td_errors
    if len(td_errors) > td_window:
        smoothed_td = np.convolve(td_errors, np.ones(td_window)/td_window, mode='valid')
        ax6.plot(smoothed_td, color='orange', linewidth=1)
    else:
        ax6.plot(td_errors, color='orange', linewidth=1)
    ax6.set_xlabel('Update Step')
    ax6.set_ylabel('|TD Error|')
    ax6.set_title('TD Error Convergence')
    ax6.grid(True, alpha=0.3)
    ax6.set_yscale('log')
    
    plt.tight_layout()
    plt.savefig('plot.png', dpi=150, bbox_inches='tight')
    plt.show()
    print("\nFigure saved to 'plot.png'")

# Generate visualization
plot_results(rewards, steps, epsilons, agent, env)

## 6. Evaluate the Learned Policy

In [None]:
def evaluate_policy(env, agent, n_episodes=100):
    """
    Evaluate the learned policy without exploration (epsilon=0).
    """
    original_epsilon = agent.epsilon
    agent.epsilon = 0  # Pure exploitation
    
    total_rewards = []
    total_steps = []
    successes = 0
    
    for _ in range(n_episodes):
        state = env.reset()
        episode_reward = 0
        
        for step in range(100):
            action = agent.choose_action(state)
            next_state, reward, done = env.step(action)
            episode_reward += reward
            state = next_state
            
            if done:
                successes += 1
                break
        
        total_rewards.append(episode_reward)
        total_steps.append(step + 1)
    
    agent.epsilon = original_epsilon  # Restore epsilon
    
    print("Policy Evaluation Results (100 episodes):")
    print(f"  Success Rate: {successes}%")
    print(f"  Average Reward: {np.mean(total_rewards):.2f} ± {np.std(total_rewards):.2f}")
    print(f"  Average Steps: {np.mean(total_steps):.2f} ± {np.std(total_steps):.2f}")
    print(f"  Best Reward: {max(total_rewards):.2f}")
    print(f"  Worst Reward: {min(total_rewards):.2f}")

# Evaluate the trained policy
evaluate_policy(env, agent)

## 7. Display the Learned Q-Table

In [None]:
def display_q_table(agent, env):
    """
    Display the Q-table in a readable format.
    """
    print("Learned Q-Table (Q-values for each state-action pair):")
    print("=" * 70)
    print(f"{'State':^10} {'Position':^10} {'↑':^10} {'↓':^10} {'←':^10} {'→':^10}")
    print("-" * 70)
    
    for state in range(env.n_states):
        pos = env._state_to_pos(state)
        if pos in env.walls:
            continue
        
        q_values = agent.Q[state]
        best_action = np.argmax(q_values)
        
        row = f"{state:^10} {str(pos):^10}"
        for i, q in enumerate(q_values):
            if i == best_action:
                row += f" {q:>8.2f}*"
            else:
                row += f" {q:>9.2f}"
        print(row)
    
    print("=" * 70)
    print("* indicates the best action for each state")

display_q_table(agent, env)

## 8. Trace Optimal Path

In [None]:
def trace_optimal_path(env, agent):
    """
    Trace the optimal path from start to goal using the learned policy.
    """
    state = env.reset()
    path = [env._state_to_pos(state)]
    actions_taken = []
    total_reward = 0
    
    print("\nOptimal Path from Start to Goal:")
    print("=" * 40)
    
    for step in range(20):  # Max steps to prevent infinite loops
        action = np.argmax(agent.Q[state])
        next_state, reward, done = env.step(action)
        
        pos = env._state_to_pos(state)
        next_pos = env._state_to_pos(next_state)
        
        print(f"Step {step + 1}: {pos} → {env.action_names[action]} → {next_pos} (reward: {reward})")
        
        actions_taken.append(env.action_names[action])
        total_reward += reward
        
        if done:
            path.append(next_pos)
            break
        
        path.append(next_pos)
        state = next_state
    
    print("=" * 40)
    print(f"Total steps: {len(actions_taken)}")
    print(f"Total reward: {total_reward}")
    print(f"Path: {' → '.join([str(p) for p in path])}")
    
    return path, actions_taken

path, actions = trace_optimal_path(env, agent)

## 9. Summary and Key Takeaways

### Algorithm Properties

Q-learning is:
- **Model-free**: Does not require knowledge of transition probabilities $P(s'|s,a)$
- **Off-policy**: Learns about the greedy policy while following an exploratory policy
- **Guaranteed to converge**: Under appropriate conditions (sufficient exploration, decaying learning rate)

### Key Observations

1. **Exploration-Exploitation Trade-off**: The $\epsilon$-greedy policy balances trying new actions vs. exploiting known good actions

2. **Temporal Credit Assignment**: The discount factor $\gamma$ determines how far into the future rewards influence current decisions

3. **Convergence**: TD error decreases over time as the Q-values approach their optimal values

4. **Policy Extraction**: The optimal policy is obtained by taking $\arg\max_a Q^*(s, a)$ for each state

### Limitations

- **Tabular representation**: Does not scale to large or continuous state spaces
- **Sample inefficiency**: Requires many episodes to converge
- **Function approximation**: Deep Q-Networks (DQN) extend Q-learning to handle larger state spaces

### Extensions

Modern variants include:
- **Double Q-Learning**: Reduces overestimation bias
- **Prioritized Experience Replay**: Focuses on important transitions
- **Dueling DQN**: Separates state-value and advantage estimation