# CartPole: Balance a Pole on a Moving Cart Applying Q-Learning & Q-Network

<div align="center">
    <img src="./images/cart_pole.gif" width="300" alt="CartPole Environment">
</div>

This notebook demonstrates how to create and interact with the CartPole environment using OpenAI Gymnasium. The CartPole problem is a classic control problem where we need to balance a pole on a moving cart.

In [4]:
# Imports the necessary libraries
import gymnasium as gym                     # The CartPole environment
from gymnasium.wrappers import RecordVideo  # For recording the video of the episode
import time                                 # For adding delays in visualization

In [2]:
# Creates the CartPole environment along with visual rendering of during the episode
env = gym.make('CartPole-v1', render_mode='rgb_array')  # rgb_array needed for video recording

"""
# Creates videos of the agent's performance in the specified folder
env = RecordVideo(env, 
                  video_folder= 'videos',           # Folder to save videos
                  name_prefix="eval",               # Prefix for video file names
                  episode_trigger=lambda x: True)   # Trigger to record every episode
"""

# Shows observation space of the environment
# The observation space consists of 4 continuous values:
# 1. Cart Position: [-4.8, 4.8] - Position of cart on track
# 2. Cart Velocity: [-Inf, Inf] - Velocity of the cart
# 3. Pole Angle: [-0.418 rad (-24°), 0.418 rad (24°)] - Angle of the pole
# 4. Pole Angular Velocity: [-Inf, Inf] - Rate of change of the angle
print("Observation space: ", env.observation_space)

# Shows action space of the environment: 2 discrete actions - 0: Move cart to left, 1: Move cart to right
print("Action space: ", env.action_space)

Observation space:  Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)
Action space:  Discrete(2)


In [3]:
# Resets the environment to start a new episode
observation, info = env.reset(seed=42)

print("Reads the post-initialization observation of the environment:")
print(f"Cart position: {observation[0]}, Cart velocity: {observation[1]}, Pole angle: {observation[2]}, Pole angular velocity: {observation[3]}")

Reads the post-initialization observation of the environment:
Cart position: 0.02739560417830944, Cart velocity: -0.006112155970185995, Pole angle: 0.03585979342460632, Pole angular velocity: 0.019736802205443382


## Balancing the Pole with Random Actions

The pole will now be tried to be balanced with random actions.
The following steps will be performed in each step:
1. Sample a random action (0 = move left, 1 = move right)
2. Apply the action to the environment
3. Get the new observation and reward
4. Render the environment
5. Check if episode is over

In [4]:
# Wraps the environment to record statistics for one episode
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=1)

episode_over = False    # Track if the episode is over to stop the loop
# total_reward = 0        # Accumulates rewards for the episode
# step_count = 0          # Initialize step counter

while not episode_over:

    action = env.action_space.sample()  # Samples a discrete random action: 0 (move left) or 1 (move right)
    print(f"Action taken: {action}")    # Prints the action taken
    
    # step_count += 1                     # Increments step counter

    # Takes a step in the environment with the sampled action
    # Returns five values:
    # 1. observation: Array of 4 values [cart_position, cart_velocity, pole_angle, pole_angular_velocity]
    # 2. reward: +1 for every step taken, including the terminal state
    # 3. terminated: True if pole angle > ±12° or cart position > ±2.4 units
    # 4. truncated: True if episode length >= 500 timesteps (default in CartPole)
    # 5. info: Additional information (empty dictionary in CartPole)
    observation, reward, terminated, truncated, info = env.step(action)

    #env.render()   # Renders the environment to visualize the agent's performance

    # Accumulates the rewards for the episode. It is +1 for each time step the pole remains upright
    # total_reward += reward

    # Checks if the episode is over
    episode_over = terminated or truncated

    # time.sleep(0.2) # Adds a delay to visualize the rendering better

print(f"Episode is completed. Total reward: {env.return_queue[0]}, Steps taken: {env.length_queue[0]}")

Action taken: 1
Action taken: 1
Action taken: 1
Action taken: 1
Action taken: 0
Action taken: 0
Action taken: 0
Action taken: 1
Action taken: 0
Action taken: 1
Action taken: 0
Action taken: 0
Action taken: 0
Action taken: 1
Action taken: 0
Action taken: 1
Action taken: 1
Action taken: 0
Action taken: 0
Action taken: 1
Action taken: 1
Action taken: 0
Action taken: 1
Action taken: 1
Action taken: 0
Action taken: 0
Action taken: 1
Action taken: 0
Action taken: 0
Action taken: 0
Episode is completed. Total reward: 30.0, Steps taken: 30


Now, an intelligent agent will be build to keep the pole balanced for longer period of time resulting larger total rewards over each episode.

## Cleanup
Finally, let's close the environment to free up resources.

In [5]:
env.close()

## The Q-Learning Agent

An agent gets trained using Q-learning to develop a policy that tells the agent which action to take in each situation to maximize long-term rewards. The key components are:

### State Space Discretization
Since CartPole has continuous state space, so it is discretized into bins as follows.
- Cart Position: [-4.8, 4.8] → 8 bins
- Cart Velocity: [-4, 4] → 8 bins
- Pole Angle: [-0.418, 0.418] → 8 bins
- Pole Angular Velocity: [-4, 4] → 8 bins

### Q-Learning Components
1. **Q-Table**: Maps state-action pairs to expected rewards
2. **ε-greedy Policy**: Balance exploration and exploitation
3. **Learning Rate (α)**: Controls how much new information overrides old
4. **Discount Factor (γ)**: Balances immediate vs future rewards
5. **Exploration Decay**: Gradually reduce random actions

In [None]:
import numpy as np
from collections import defaultdict

class CartPoleAgent:
    """
    Q-learning agent for the CartPole environment.
    """

    def __init__(self, env, learning_rate=0.1, discount_factor=0.95, 
                 initial_epsilon=1.0, epsilon_decay=0.995, final_epsilon=0.01):
        """
        Initialize the Q-learning agent for the CartPole-v1 environment.
        
        Args:
            env: The CartPole-v1 environment
            learning_rate: How quickly the agent learns from new experiences (α)
            discount_factor: How much future rewards are valued (γ)
            initial_epsilon: Starting exploration rate
            epsilon_decay: Rate at which exploration decreases
            final_epsilon: Minimum exploration rate
        """
        self.env = env
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon
        
        # Defines state space discretization
        self.n_bins = 8  # Number of bins for each dimension
        self.state_bins = {
            'cart_position': np.linspace(-4.8, 4.8, self.n_bins),
            'cart_velocity': np.linspace(-4, 4, self.n_bins),
            'pole_angle': np.linspace(-0.418, 0.418, self.n_bins),
            'pole_angular_velocity': np.linspace(-4, 4, self.n_bins)
        }
        
        # Initialize Q-table with optimistic initial values to encourage exploration
        self.q_table = defaultdict(lambda: np.ones(env.action_space.n))
        
        # Track learning progress
        self.training_rewards = []
        
    def discretize_state(self, observation):
        """
        Convert continuous state values to discrete state indices.
        
        Args:
            observation: [cart_pos, cart_vel, pole_angle, pole_ang_vel]
            
        Returns:
            tuple: Discretized state representation
        """
        discretized = []
        for i, (name, bins) in enumerate(self.state_bins.items()):
            discretized.append(np.digitize(observation[i], bins))
        return tuple(discretized)
    
    def get_action(self, state):
        """
        Choose action using ε-greedy policy.
        
        Args:
            state: Current state (should be already discretized)
            
        Returns:
            int: Chosen action (0: left, 1: right)
        """
    
        if np.random.random() < self.epsilon:       # Explores by choosing random action
            return self.env.action_space.sample()
    
        return np.argmax(self.q_table[state])       # Exploits by choosing best action based on Q-values
    
    def update_q_value(self, state, action, reward, next_state, terminated):
        """
        Updates Q-value for state-action pair using the Bellman equation.
        
        Args:
            state: Current state
            action: Action taken
            reward: Reward received
            next_state: Next state
            terminated: Whether episode ended
        """

        # Get best future value (0 if terminated)
        best_next_value = 0 if terminated else np.max(self.q_table[next_state])
        
        # Calculate target Q-value using Bellman equation
        target = reward + self.discount_factor * best_next_value
        
        # Update Q-value towards target using learning rate
        current_q = self.q_table[state][action]
        self.q_table[state][action] = current_q + self.learning_rate * (target - current_q)
    
    def decay_epsilon(self):
        """Decay exploration rate by multiplication factor"""
        self.epsilon = max(self.final_epsilon, 
                          self.epsilon * self.epsilon_decay)
    
    def train_episode(self):
        """
        Train the agent for one episode.
        
        Returns:
            float: Total reward for the episode
        """
        state, _ = self.env.reset()
        state = self.discretize_state(state)
        total_reward = 0
        done = False
        
        while not done:
            # Choose and take action
            action = self.get_action(state)
            next_observation, reward, terminated, truncated, _ = self.env.step(action)
            
            # Process new state and update Q-value
            next_state = self.discretize_state(next_observation)
            self.update_q_value(state, action, reward, next_state, terminated)
            
            # Update state and accumulate reward
            state = next_state
            total_reward += reward
            done = terminated or truncated
        
        # Decay exploration rate after episode
        self.decay_epsilon()
        self.training_rewards.append(total_reward)
        
        return total_reward

### Training the Q-Learning Agent

Let's create an instance of our Q-learning agent and train it over multiple episodes.

In [None]:
# Create the environment and wrap it to record statistics
env = gym.make('CartPole-v1')
env = gym.wrappers.RecordEpisodeStatistics(env)

# Create the agent
agent = CartPoleAgent(env, 
             learning_rate=0.1,          # Learning rate
             discount_factor=0.95,       # Discount factor
             initial_epsilon=1.0,        # Initial exploration rate
             epsilon_decay=0.995,        # Exploration decay rate
             final_epsilon=0.01)         # Minimum exploration rate

# Training parameters
n_episodes = 1000
print_interval = 100

# Train the agent
for episode in range(n_episodes):
    episode_reward = agent.train_episode()
    
    # Print progress every print_interval episodes
    if (episode + 1) % print_interval == 0:
        avg_reward = np.mean(agent.training_rewards[-print_interval:])
        print(f"Episode {episode + 1}/{n_episodes}")
        print(f"Average Reward (last {print_interval} episodes): {avg_reward:.2f}")
        print(f"Current Exploration Rate (ε): {agent.epsilon:.3f}")
        print("--------------------")

Episode 100/1000
Average Reward (last 100 episodes): 22.80
Current Exploration Rate (ε): 0.606
--------------------
Episode 200/1000
Average Reward (last 100 episodes): 25.20
Current Exploration Rate (ε): 0.367
--------------------
Episode 300/1000
Average Reward (last 100 episodes): 33.99
Current Exploration Rate (ε): 0.222
--------------------
Episode 400/1000
Average Reward (last 100 episodes): 71.07
Current Exploration Rate (ε): 0.135
--------------------
Episode 500/1000
Average Reward (last 100 episodes): 85.02
Current Exploration Rate (ε): 0.082
--------------------
Episode 600/1000
Average Reward (last 100 episodes): 85.80
Current Exploration Rate (ε): 0.049
--------------------
Episode 700/1000
Average Reward (last 100 episodes): 94.92
Current Exploration Rate (ε): 0.030
--------------------
Episode 800/1000
Average Reward (last 100 episodes): 84.39
Current Exploration Rate (ε): 0.018
--------------------
Episode 900/1000
Average Reward (last 100 episodes): 92.12
Current Explo

### Evaluating the Trained Q-Learning Agent

Now let's see how well our trained agent performs. We'll run a few evaluation episodes with rendering enabled and no exploration (ε = 0).

In [11]:
# Create environment for evaluation with rendering
eval_env = gym.make('CartPole-v1', render_mode='human')

# Run 5 evaluation episodes
n_eval_episodes = 5
eval_rewards = []
eval_episode_steps = []

for episode in range(n_eval_episodes):
    state, _ = eval_env.reset()
    state = agent.discretize_state(state)
    episode_reward = 0
    episode_steps = 0
    done = False
    
    while not done:
        # Use the learned policy (no exploration)
        action = np.argmax(agent.q_table[state])
        
        # Take action and get new state
        next_observation, reward, terminated, truncated, _ = eval_env.step(action)
        next_state = agent.discretize_state(next_observation)
        
        # Update state and reward
        state = next_state
        episode_reward += reward
        episode_steps += 1
        done = terminated or truncated
        
        # Add a small delay to better visualize the agent's behavior
        time.sleep(0.01)
    
    eval_rewards.append(episode_reward)
    eval_episode_steps.append(episode_steps)
    print(f"Evaluation Episode {episode + 1}: Reward = {episode_reward}, Steps = {episode_steps}")

print(f"\nAverage Evaluation Reward: {np.mean(eval_rewards):.2f}")

# Close the evaluation environment
eval_env.close()

Evaluation Episode 1: Reward = 98.0, Steps = 98
Evaluation Episode 2: Reward = 87.0, Steps = 87
Evaluation Episode 3: Reward = 93.0, Steps = 93
Evaluation Episode 4: Reward = 101.0, Steps = 101
Evaluation Episode 5: Reward = 97.0, Steps = 97

Average Evaluation Reward: 95.20


It shows that averaged rewards received by the trained agent is much more than the average rewards received when just random actions were taken as experimented in the beginning of this notebook.

## The Q-Network Agent

Implements a Q-Network agent using TensorFlow. Q-Network is better suited for environments with continuous state spaces like CartPole because it eliminates the need for for state discretization, generalizes across similar states better and captures complex state-action relationships.

In [None]:
import tensorflow as tf
import numpy as np
from collections import deque
import random

class QNAgent:
    """The Q-Network Agent for CartPole-v1 environment."""
    def __init__(self, state_size, action_size, 
                 learning_rate=0.001, discount_factor=0.95,
                 initial_epsilon=1.0, epsilon_decay=0.995, final_epsilon=0.01,
                 memory_size=10000, batch_size=32):
        """
        Initializes the Q-Network Agent.
        
        Args:
            state_size: Dimension of state space
            action_size: Dimension of action space
            learning_rate: Learning rate for the neural network
            discount_factor: Discount factor for future rewards
            initial_epsilon: Initial exploration rate
            epsilon_decay: Rate at which exploration decreases
            final_epsilon: Minimum exploration rate
            memory_size: Size of experience replay buffer
            batch_size: Number of samples to train on in each update
        """
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=memory_size)
        self.batch_size = batch_size
        self.gamma = discount_factor
        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = final_epsilon
        self.learning_rate = learning_rate
        
        # Creates main and target networks
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()
        
        # Training metrics
        self.training_rewards = []
    
    def _build_model(self):
        """Builds a neural network model."""
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(64, activation='relu', input_shape=(self.state_size,)),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(self.action_size, activation='linear')
        ])
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate), loss='mse')
        
        return model
    
    def update_target_model(self):
        """Copies weights from main model to target model."""
        self.target_model.set_weights(self.model.get_weights())
    
    def remember(self, state, action, reward, next_state, done):
        """Stores experience in replay memory."""
        self.memory.append((state, action, reward, next_state, done))
    
    def get_action(self, state):
        """Chooses action using ε-greedy policy."""
        if np.random.rand() < self.epsilon:
            return random.randrange(self.action_size)
        
        q_values = self.model.predict(state, verbose=0)
        
        return np.argmax(q_values[0])
    
    def replay(self):
        """Trains the network on a batch of experiences."""
        if len(self.memory) < self.batch_size:
            return
        
        # Samples random batch from memory
        minibatch = random.sample(self.memory, self.batch_size)
        
        states = np.zeros((self.batch_size, self.state_size))
        next_states = np.zeros((self.batch_size, self.state_size))
        
        # Separates experiences into arrays
        for i, (state, action, reward, next_state, done) in enumerate(minibatch):
            states[i] = state
            next_states[i] = next_state
        
        # Predicts Q-values for current and next states
        target_q_values = self.target_model.predict(next_states, verbose=0)
        current_q_values = self.model.predict(states, verbose=0)
        
        # Updates target Q-values with Bellman equation
        for i, (state, action, reward, next_state, done) in enumerate(minibatch):
            if done:
                target = reward
            else:
                target = reward + self.gamma * np.max(target_q_values[i])
            current_q_values[i][action] = target
        
        # Trains the model
        self.model.fit(states, current_q_values, epochs=1, verbose=0)
        
        # Decays exploration rate
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def train_episode(self, env):
        """Trains for one episode."""
        state, _ = env.reset()
        state = np.reshape(state, [1, self.state_size])
        total_reward = 0
        done = False
        
        while not done:
            # Chooses and take action
            action = self.get_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            next_state = np.reshape(next_state, [1, self.state_size])
            done = terminated or truncated
            
            # Stores experience and train
            self.remember(state[0], action, reward, next_state[0], done)
            self.replay()
            
            state = next_state
            total_reward += reward
        
        self.training_rewards.append(total_reward)

        return total_reward

### Training the Q-Network Agent

In [None]:
# Creates and sets up the environment
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]  # 4 for CartPole
action_size = env.action_space.n             # 2 for CartPole

# Create the DQN agent
agent = DQNAgent(state_size=state_size,
                action_size=action_size,
                learning_rate=0.01,     # Options to try: 0.001
                discount_factor=0.95,
                initial_epsilon=1.0,
                epsilon_decay=0.995,
                final_epsilon=0.01)

# Training parameters
n_episodes = 100
target_update_frequency = 10  # Updates target network every 10 episodes
print_interval = 1

# Trains the agent
for episode in range(n_episodes):
    episode_reward = agent.train_episode(env)
    
    # Updates target network periodically
    if episode % target_update_frequency == 0:
        agent.update_target_model()
    
    # Prints progress
    if (episode + 1) % print_interval == 0:
        avg_reward = np.mean(agent.training_rewards[-print_interval:])
        print(f"Episode {episode + 1}/{n_episodes}")
        print(f"Average Reward (last {print_interval} episodes): {avg_reward:.2f}")
        print(f"Current Exploration Rate (ε): {agent.epsilon:.3f}")
        print("--------------------")

Episode 1/100
Average Reward (last 1 episodes): 10.00
Current Exploration Rate (ε): 1.000
--------------------
Episode 2/100
Average Reward (last 1 episodes): 23.00
Current Exploration Rate (ε): 0.990
--------------------
Episode 2/100
Average Reward (last 1 episodes): 23.00
Current Exploration Rate (ε): 0.990
--------------------
Episode 3/100
Average Reward (last 1 episodes): 17.00
Current Exploration Rate (ε): 0.909
--------------------
Episode 3/100
Average Reward (last 1 episodes): 17.00
Current Exploration Rate (ε): 0.909
--------------------
Episode 4/100
Average Reward (last 1 episodes): 23.00
Current Exploration Rate (ε): 0.810
--------------------
Episode 4/100
Average Reward (last 1 episodes): 23.00
Current Exploration Rate (ε): 0.810
--------------------
Episode 5/100
Average Reward (last 1 episodes): 17.00
Current Exploration Rate (ε): 0.744
--------------------
Episode 5/100
Average Reward (last 1 episodes): 17.00
Current Exploration Rate (ε): 0.744
--------------------
E

### Evaluating the Trained Q-Network Agent

In [29]:
# Creates environment for evaluation with rendering
eval_env = gym.make('CartPole-v1', render_mode='human')

# Runs evaluation episodes
n_eval_episodes = 5
eval_rewards = []

for episode in range(n_eval_episodes):
    state, _ = eval_env.reset()
    state = np.reshape(state, [1, state_size])
    episode_reward = 0
    done = False
    
    while not done:
        # Uses the learned policy (no exploration)
        q_values = agent.model.predict(state, verbose=0)
        action = np.argmax(q_values[0])
        
        # Takes action
        next_state, reward, terminated, truncated, _ = eval_env.step(action)
        next_state = np.reshape(next_state, [1, state_size])
        
        state = next_state
        episode_reward += reward
        done = terminated or truncated
        
        # Adds a small delay to better visualize the agent's behavior
        time.sleep(0.01)
    
    eval_rewards.append(episode_reward)
    print(f"Evaluation Episode {episode + 1}: Reward = {episode_reward}")

print(f"\nAverage Evaluation Reward: {np.mean(eval_rewards):.2f}")

# Close the evaluation environment
eval_env.close()

Evaluation Episode 1: Reward = 100.0
Evaluation Episode 2: Reward = 104.0
Evaluation Episode 2: Reward = 104.0
Evaluation Episode 3: Reward = 93.0
Evaluation Episode 3: Reward = 93.0
Evaluation Episode 4: Reward = 103.0
Evaluation Episode 4: Reward = 103.0
Evaluation Episode 5: Reward = 105.0

Average Evaluation Reward: 101.00
Evaluation Episode 5: Reward = 105.0

Average Evaluation Reward: 101.00


The above performance shows that Q-Network agent performed little better than the Q-Learning agent during evaluation. The performance of the Q-Network agent can further be improved over training with increased number of episodes and tuning its hyperparameters.