<p style="text-align:center">
    <a href="https://skills.network" target="_blank">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo"  />
    </a>
</p>


# **Lab: Implementing Q-Learning in Keras**


Estimated time needed: **30** minutes


## Lab Overview
In this lab, you will implement a Q-Learning algorithm using Keras to solve a reinforcement learning problem.

## Learning objectives:
By the end of this lab, you will:  
- Implement a Q-Learning algorithm using Keras
- Define and train a neural network to approximate the Q-values
- Evaluate the performance of the trained Q-Learning agent

## Prerequisites
- Basic knowledge of Python programming
- Familiarity with Keras and neural networks
- Understanding of reinforcement learning concepts


### Step-by-Step Guide

#### Step 1: Setting Up the Environment

First, you will set up the environment using the OpenAI Gym library. You will use the 'CartPole-v1' environment, a common benchmark for reinforcement learning algorithms.


In [15]:
#%pip install gym

In [16]:
#!pip install --upgrade numpy==1.26.4
#!pip uninstall tensorflow -y
#!pip install tensorflow==2.16.2

### Set Environment Variables
Sometimes, environment variables can help mitigate certain issues with TensorFlow. You can try disabling the oneDNN optimizations or CUDA.


In [17]:
import os
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

### Reduce Recursion Limit
You can also try increasing the recursion limit, although this is generally more of a workaround than a solution.


In [18]:
import sys
sys.setrecursionlimit(1500)

import gym
import numpy as np

# Create the environment
env = gym.make('CartPole-v1')

# Set random seed for reproducibility
np.random.seed(42)
env.action_space.seed(42)
env.observation_space.seed(42)

[42]

#### Explanation:  
- `gym` is a toolkit for developing and comparing reinforcement learning algorithms.
- `CartPole-v1` is an environment where a pole is balanced on a cart, and the goal is to prevent the pole from falling over.
- Setting random seeds ensures that you can reproduce the results.


### Step 2: Define the Q-Learning Model

You will define a neural network using Keras to approximate the Q-values. The network will take the state as input and output Q-values for each action.


In [19]:
# Suppress warnings for a cleaner notebook or console experience
import warnings
warnings.filterwarnings('ignore')

# Override the default warning function
def warn(*args, **kwargs):
    pass
warnings.warn = warn

# Import necessary libraries for the Q-Learning model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input  # Import Input layer
from tensorflow.keras.optimizers import Adam
import gym  # Ensure the environment library is available

# Define the model building function
def build_model(state_size, action_size):
    model = Sequential()
    model.add(Input(shape=(state_size,)))  # Use Input layer to specify the input shape
    model.add(Dense(24, activation='relu'))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(action_size, activation='linear'))
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
    return model

# Create the environment and set up the model
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
model = build_model(state_size, action_size)


#### Explanation:
- `Sequential` model: a linear stack of layers in Keras.
- `Dense` layers: fully connected layers.
- `input_dim`: the size of the input layer, corresponding to the state size.
- `activation='relu'`: Rectified Linear Unit activation function.
- `activation='linear'`: linear activation function for the output layer, as we are predicting continuous Q-values.
- `Adam` optimizer: an optimization algorithm that adjusts the learning rate based on gradients.


#### Step 3: Implement the Q-Learning Algorithm

Now, you will implement the Q-Learning algorithm, which involves interacting with the environment, updating the Q-values, and training the neural network.

**Define the replay Function**


In [20]:
import random
import numpy as np
from collections import deque
import tensorflow as tf

# Define epsilon and epsilon_decay
epsilon = 1.0  # Starting with a high exploration rate
epsilon_min = 0.01  # Minimum exploration rate
epsilon_decay = 0.99  # Faster decay rate for epsilon after each episode

# Replay memory
memory = deque(maxlen=2000)

def remember(state, action, reward, next_state, done):
    """Store experience in memory."""
    memory.append((state, action, reward, next_state, done))

def replay(batch_size=64):  # Increased batch size
    """Train the model using a random sample of experiences from memory."""
    if len(memory) < batch_size:
        return  # Skip replay if there's not enough experience

    minibatch = random.sample(memory, batch_size)  # Sample a random batch from memory

    # Extract information for batch processing
    states = np.vstack([x[0] for x in minibatch])
    actions = np.array([x[1] for x in minibatch])
    rewards = np.array([x[2] for x in minibatch])
    next_states = np.vstack([x[3] for x in minibatch])
    dones = np.array([x[4] for x in minibatch])

    # Predict Q-values for the next states in batch
    q_next = model.predict(next_states)
    # Predict Q-values for the current states in batch
    q_target = model.predict(states)

    # Vectorized update of target values
    for i in range(batch_size):
        target = rewards[i]
        if not dones[i]:
            target += 0.95 * np.amax(q_next[i])  # Update Q value with the discounted future reward
        q_target[i][actions[i]] = target  # Update only the taken action's Q value

    # Train the model with the updated targets in batch
    model.fit(states, q_target, epochs=1, verbose=0)  # Train in batch mode

    # Reduce exploration rate (epsilon) after each training step
    global epsilon
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

def act(state):
    """Choose an action based on the current state and exploration rate."""
    if np.random.rand() <= epsilon:
        return random.randrange(action_size)  # Explore: choose a random action
    act_values = model.predict(state)  # Exploit: predict action based on the state
    return np.argmax(act_values[0])  # Return the action with the highest Q-value

# Define the number of episodes you want to train the model for
episodes = 10  # You can set this to any number you prefer
train_frequency = 5  # Train the model every 5 steps

for e in range(episodes):
    state = env.reset()  # Unpack the tuple returned by env.reset()
    state = np.reshape(state, [1, state_size])
    for time in range(200):  # Limit to 200 time steps per episode
        action = act(state)
        step_result = env.step(action)
        if len(step_result) == 5:
          next_state, reward, terminated, truncated, _ = step_result
          done = terminated or truncated
        else:
          next_state, reward, done, _ = step_result
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_size])
        remember(state, action, reward, next_state, done)  # Store experience
        state = next_state

        if done:
            print(f"episode: {e+1}/{episodes}, score: {time}, e: {epsilon:.2}")
            break

        # Train the model every 'train_frequency' steps
        if time % train_frequency == 0:
            replay(batch_size=64)  # Call replay with larger batch size for efficiency

env.close()


episode: 1/10, score: 16, e: 1.0
episode: 2/10, score: 18, e: 1.0
episode: 3/10, score: 9, e: 1.0
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
episode: 4/10, score: 25, e: 0.99
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
episode: 5/10, score: 19, e: 0.95
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m

#### Step 4: Evaluate the Performance

Finally, you will evaluate the performance of the trained Q-Learning agent.


In [21]:
for e in range(10):

    state = env.reset()  # Unpack the state from the tuple
    state = np.reshape(state, [1, state_size])  # Reshape the state correctly
    for time in range(500):
        env.render()
        action = np.argmax(model.predict(state)[0])
        step_result = env.step(action)
        if len(step_result) == 5:
          next_state, reward, terminated, truncated, _ = step_result
          done = terminated or truncated
        else:
          next_state, reward, done, _ = step_result
        next_state = np.reshape(next_state, [1, state_size])
        state = next_state
        if done:
            print(f"episode: {e+1}/10, score: {time}")
            break

env.close()

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25

#### Explanation:
- This loop runs 10 episodes to test the trained agent.
- `env.render()`: visualizes the environment.
- The agent chooses actions based on the trained model and interacts with the environment.
- The score for each episode is printed.


# Practice exercises

## Exercise 1: Experiment with Different Network Architectures

### Objective:
Understand how changing the architecture of the neural network affects the performance of the Q-Learning agent.

### Instructions:
1. Modify the `build_model()` function to include a different number of neurons and layers. For example, increase the number of layers to 3 and the number of neurons in each layer to 64.
2. Train the model with the modified architecture and observe the performance in terms of average score achieved over 100 episodes.
3. Compare the performance with the original architecture.


In [22]:
# Exercise 1: Experiment with Different Network Architectures

# Initialize the environment
env = gym.make('CartPole-v1')
np.random.seed(42)
env.action_space.seed(42)
env.observation_space.seed(42)

# Define state size and action size
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Modified build_model function with 3 layers and 64 neurons each
def build_model(state_size, action_size):
    model = Sequential()
    model.add(Input(shape=(state_size,)))
    model.add(Dense(64, activation='relu'))  # First hidden layer with 64 neurons
    model.add(Dense(64, activation='relu'))  # Second hidden layer with 64 neurons
    model.add(Dense(64, activation='relu'))  # Third hidden layer with 64 neurons
    model.add(Dense(action_size, activation='linear'))
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
    return model

# Create the model with modified architecture
model = build_model(state_size, action_size)

# Epsilon-greedy parameters
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995

# Replay memory
memory = deque(maxlen=2000)

def remember(state, action, reward, next_state, done):
    """Store experience in memory."""
    memory.append((state, action, reward, next_state, done))

def replay(batch_size=64):
    """Train the model using a random sample of experiences from memory."""
    if len(memory) < batch_size:
        return

    minibatch = random.sample(memory, batch_size)
    states = np.vstack([x[0] for x in minibatch])
    actions = np.array([x[1] for x in minibatch])
    rewards = np.array([x[2] for x in minibatch])
    next_states = np.vstack([x[3] for x in minibatch])
    dones = np.array([x[4] for x in minibatch])

    q_next = model.predict(next_states, verbose=0)
    q_target = model.predict(states, verbose=0)

    for i in range(batch_size):
        target = rewards[i]
        if not dones[i]:
            target += 0.95 * np.amax(q_next[i])
        q_target[i][actions[i]] = target

    model.fit(states, q_target, epochs=1, verbose=0)

    global epsilon
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

def act(state):
    """Choose an action based on the current state and exploration rate."""
    if np.random.rand() <= epsilon:
        return random.randrange(action_size)
    act_values = model.predict(state, verbose=0)
    return np.argmax(act_values[0])

# Training parameters
episodes = 100  # Train for 100 episodes as specified
train_frequency = 5
scores = []  # Store scores for each episode

print("Training with modified architecture (3 layers, 64 neurons each)...")
print("-" * 60)

# Train the model with the modified architecture
for e in range(episodes):
    state = env.reset()
    state = np.reshape(state, [1, state_size])
    total_reward = 0

    for time in range(500):  # Maximum steps per episode
        action = act(state)
        step_result = env.step(action)
        if len(step_result) == 5:
          next_state, reward, terminated, truncated, _ = step_result
          done = terminated or truncated
        else:
          next_state, reward, done, _ = step_result
        reward = reward if not done else -10
        total_reward += reward
        next_state = np.reshape(next_state, [1, state_size])
        remember(state, action, reward, next_state, done)
        state = next_state

        if done:
            scores.append(time)
            if (e + 1) % 10 == 0:  # Print every 10 episodes
                avg_score = np.mean(scores[-10:])
                print(f"Episode: {e+1}/{episodes}, Score: {time}, "
                      f"Avg Score (last 10): {avg_score:.2f}, Epsilon: {epsilon:.3f}")
            break

        if time % train_frequency == 0:
            replay(batch_size=64)

env.close()

# Calculate and display final statistics
print("-" * 60)
print("Training completed!")
print(f"Average score over all {episodes} episodes: {np.mean(scores):.2f}")
print(f"Average score over last 10 episodes: {np.mean(scores[-10:]):.2f}")
print(f"Average score over last 50 episodes: {np.mean(scores[-50:]):.2f}")
print(f"Best score: {max(scores)}")
print(f"Final epsilon: {epsilon:.3f}")

# Comparison note
print("\n" + "=" * 60)
print("COMPARISON:")
print("=" * 60)
print("Original Architecture: 2 hidden layers with 24 neurons each")
print("Modified Architecture: 3 hidden layers with 64 neurons each")
print(f"\nThe modified architecture achieved an average score of {np.mean(scores):.2f} over {episodes} episodes.")
print("Generally, deeper and wider networks can learn more complex patterns,")
print("but may require more training time and can be more prone to overfitting.")


Training with modified architecture (3 layers, 64 neurons each)...
------------------------------------------------------------
Episode: 10/100, Score: 17, Avg Score (last 10): 17.00, Epsilon: 0.878
Episode: 20/100, Score: 18, Avg Score (last 10): 17.30, Epsilon: 0.722
Episode: 30/100, Score: 35, Avg Score (last 10): 18.50, Epsilon: 0.591
Episode: 40/100, Score: 43, Avg Score (last 10): 33.50, Epsilon: 0.414
Episode: 50/100, Score: 198, Avg Score (last 10): 150.30, Epsilon: 0.090
Episode: 60/100, Score: 247, Avg Score (last 10): 236.80, Epsilon: 0.010
Episode: 70/100, Score: 192, Avg Score (last 10): 202.10, Epsilon: 0.010
Episode: 80/100, Score: 238, Avg Score (last 10): 262.80, Epsilon: 0.010
Episode: 90/100, Score: 226, Avg Score (last 10): 278.20, Epsilon: 0.010
Episode: 100/100, Score: 298, Avg Score (last 10): 280.00, Epsilon: 0.010
------------------------------------------------------------
Training completed!
Average score over all 100 episodes: 149.65
Average score over last 

<details>
    <summary>Click here for Solution</summary>

```python
# Install gym if necessary
!pip install gym

import gym
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Input
from keras.optimizers import Adam
from collections import deque
import random

# Initialize the environment
env = gym.make('CartPole-v1')

# Global settings
episodes = 10  # Number of episodes
batch_size = 32  # Size of the mini-batch for training
memory = deque(maxlen=2000)  # Memory buffer to store experiences

# Define state size and action size based on the environment
state_size = env.observation_space.shape[0]  # State space size from the environment
action_size = env.action_space.n  # Number of possible actions from the environment

# Define the model
def build_model(state_size, action_size):
    model = Sequential()
    model.add(Input(shape=(state_size,)))  # Explicit Input layer
    model.add(Dense(32, activation='relu'))  # Smaller hidden layers
    model.add(Dense(32, activation='relu'))
    model.add(Dense(action_size, activation='linear'))
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
    return model

# Re-initialize the model with the new architecture
model = build_model(state_size, action_size)

# Placeholder for your action function (e.g., epsilon-greedy)
def act(state):
    return env.action_space.sample()  # For now, a random action is taken

# Function to remember experiences in memory
def remember(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))

# Optimized function to replay experiences from memory and train the model
def replay(batch_size):
    minibatch = random.sample(memory, batch_size)
    states = np.vstack([sample[0] for sample in minibatch])
    next_states = np.vstack([sample[3] for sample in minibatch])
    targets = model.predict(states)
    target_next = model.predict(next_states)
    
    for i, (state, action, reward, next_state, done) in enumerate(minibatch):
        target = reward if done else reward + 0.95 * np.amax(target_next[i])
        targets[i][action] = target
        
    model.fit(states, targets, epochs=1, verbose=0)

# Train the model with the modified architecture
for e in range(episodes):
    state, _ = env.reset()  # Unpack the state from the tuple
    state = np.reshape(state, [1, state_size])
    for time in range(200):  # Reduced number of steps per episode
        action = act(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_size])
        remember(state, action, reward, next_state, done)
        state = next_state
        
        if done:
            print(f"episode: {e+1}/{episodes}, score: {time}")
            break
        
        if len(memory) > batch_size and time % 10 == 0:  # Train every 10 steps
            replay(batch_size)  # Pass the batch size to replay()

env.close()
```

</details>


## Exercise 2: Implement an Adaptive Exploration Rate

### Objective:
Learn how to adapt the exploration rate (`epsilon`) based on the agent's performance to balance exploration and exploitation.

### Instructions:
1. Modify the `epsilon` decay strategy to decrease more rapidly when the agent's performance improves significantly.
2. Implement a check to reduce `epsilon` faster if the agent achieves a score greater than a certain threshold (e.g., 200) in consecutive episodes.
3. Observe the effect on the learning rate and the agent's performance.


In [23]:
# Exercise 2: Implement an Adaptive Exploration Rate

# Reinitialize environment and model for this exercise
env = gym.make('CartPole-v1')
np.random.seed(42)
env.action_space.seed(42)
env.observation_space.seed(42)

state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Build the model
def build_model(state_size, action_size):
    model = Sequential()
    model.add(Input(shape=(state_size,)))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(action_size, activation='linear'))
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
    return model

model = build_model(state_size, action_size)

# Initialize epsilon parameters
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.99

# Replay memory
memory = deque(maxlen=2000)

def remember(state, action, reward, next_state, done):
    """Store experience in memory."""
    memory.append((state, action, reward, next_state, done))

def replay(batch_size=64):
    """Train the model using a random sample of experiences from memory."""
    if len(memory) < batch_size:
        return

    minibatch = random.sample(memory, batch_size)
    states = np.vstack([x[0] for x in minibatch])
    actions = np.array([x[1] for x in minibatch])
    rewards = np.array([x[2] for x in minibatch])
    next_states = np.vstack([x[3] for x in minibatch])
    dones = np.array([x[4] for x in minibatch])

    q_next = model.predict(next_states, verbose=0)
    q_target = model.predict(states, verbose=0)

    for i in range(batch_size):
        target = rewards[i]
        if not dones[i]:
            target += 0.95 * np.amax(q_next[i])
        q_target[i][actions[i]] = target

    model.fit(states, q_target, epochs=1, verbose=0)

def act(state):
    """Choose an action based on the current state and exploration rate."""
    if np.random.rand() <= epsilon:
        return random.randrange(action_size)
    act_values = model.predict(state, verbose=0)
    return np.argmax(act_values[0])

# Adaptive epsilon decay function
def adjust_epsilon(score, consecutive_success_threshold=200):
    """
    Adjust epsilon based on agent's performance.
    Reduces epsilon faster when performance is good.
    """
    global epsilon

    if score >= consecutive_success_threshold:
        # Reduce epsilon faster if performance is good (multiply by 0.9 instead of 0.99)
        epsilon = max(epsilon_min, epsilon * 0.9)
    else:
        # Regular epsilon decay
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

# Training parameters
episodes = 100  # Train for 100 episodes to observe the effect
train_frequency = 5
batch_size = 64
consecutive_success_threshold = 200  # Threshold for good performance
consecutive_successes = 0  # Track consecutive episodes with score >= threshold

scores = []  # Store scores for analysis

print("Training with Adaptive Exploration Rate...")
print("=" * 60)
print(f"Initial epsilon: {epsilon:.3f}")
print(f"Consecutive success threshold: {consecutive_success_threshold}")
print("=" * 60)

# Train the model with adaptive epsilon decay
for e in range(episodes):
    state = env.reset()
    state = np.reshape(state, [1, state_size])

    for time in range(500):  # Maximum steps per episode
        action = act(state)
        step_result = env.step(action)
        if len(step_result) == 5:
          next_state, reward, terminated, truncated, _ = step_result
          done = terminated or truncated
        else:
          next_state, reward, done, _ = step_result
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_size])
        remember(state, action, reward, next_state, done)
        state = next_state

        if done:
            score = time
            scores.append(score)

            # Track consecutive successes
            if score >= consecutive_success_threshold:
                consecutive_successes += 1
            else:
                consecutive_successes = 0

            # Adjust epsilon based on performance
            adjust_epsilon(score, consecutive_success_threshold)

            # Print progress every 10 episodes
            if (e + 1) % 10 == 0:
                avg_score = np.mean(scores[-10:])
                print(f"Episode: {e+1}/{episodes}, Score: {score}, "
                      f"Avg Score (last 10): {avg_score:.2f}, "
                      f"Epsilon: {epsilon:.3f}, "
                      f"Consecutive Successes: {consecutive_successes}")
            break

        # Train the model periodically
        if time % train_frequency == 0:
            replay(batch_size)

env.close()

# Display final statistics
print("\n" + "=" * 60)
print("Training completed!")
print("=" * 60)
print(f"Average score over all {episodes} episodes: {np.mean(scores):.2f}")
print(f"Average score over last 10 episodes: {np.mean(scores[-10:]):.2f}")
print(f"Average score over last 50 episodes: {np.mean(scores[-50:]):.2f}")
print(f"Best score: {max(scores)}")
print(f"Final epsilon: {epsilon:.3f}")
print(f"Maximum consecutive successes: {consecutive_successes}")
print("\n" + "=" * 60)
print("OBSERVATION:")
print("=" * 60)
print("The adaptive epsilon decay strategy reduces exploration faster when")
print("the agent performs well, allowing it to exploit learned knowledge")
print("more quickly. This can lead to faster convergence and better")
print("final performance compared to fixed epsilon decay.")


Training with Adaptive Exploration Rate...
Initial epsilon: 1.000
Consecutive success threshold: 200
Episode: 10/100, Score: 14, Avg Score (last 10): 20.60, Epsilon: 0.904, Consecutive Successes: 0
Episode: 20/100, Score: 16, Avg Score (last 10): 24.00, Epsilon: 0.818, Consecutive Successes: 0
Episode: 30/100, Score: 19, Avg Score (last 10): 21.10, Epsilon: 0.740, Consecutive Successes: 0
Episode: 40/100, Score: 10, Avg Score (last 10): 14.10, Epsilon: 0.669, Consecutive Successes: 0
Episode: 50/100, Score: 14, Avg Score (last 10): 14.50, Epsilon: 0.605, Consecutive Successes: 0
Episode: 60/100, Score: 27, Avg Score (last 10): 13.60, Epsilon: 0.547, Consecutive Successes: 0
Episode: 70/100, Score: 35, Avg Score (last 10): 36.80, Epsilon: 0.495, Consecutive Successes: 0
Episode: 80/100, Score: 63, Avg Score (last 10): 45.30, Epsilon: 0.448, Consecutive Successes: 0
Episode: 90/100, Score: 77, Avg Score (last 10): 61.60, Epsilon: 0.405, Consecutive Successes: 0
Episode: 100/100, Score: 1

<details>
    <summary>Click here for Solution</summary>

```python
# Function to adjust epsilon based on performance
def adjust_epsilon(score, consecutive_success_threshold=200):
    global epsilon

    if score >= consecutive_success_threshold:
        epsilon = max(epsilon_min, epsilon * 0.9)  # Reduce epsilon faster if performance is good
    else:
        epsilon = max(epsilon_min, epsilon * epsilon_decay)  # Regular epsilon decay

episodes = 2  # Set number of episodes for training

# Train the model with adaptive epsilon decay
for e in range(episodes):
    state = env.reset()  
    state = state[0]  # Extract the first element, which is the actual state array
    state = np.reshape(state, [1, len(state)])  # Reshape state to match the expected input shape

    total_reward = 0

    for time in range(500):  # Limit the episode to 500 time steps
        action = act(state)  # Choose action based on policy
        next_state, reward, done, truncated, _ = env.step(action)  # Unpack 5 values

        reward = reward if not done else -10  # Penalize for reaching a terminal state
        total_reward += reward  # Accumulate rewards

        next_state = np.reshape(next_state, [1, len(next_state)])  # Reshape next state (optional based on model needs)

        remember(state, action, reward, next_state, done)  # Store experience in memory
        state = next_state  # Update the current state

        if done or truncated:  # Check if the episode is done or truncated
            adjust_epsilon(total_reward)  # Adjust epsilon based on the total reward
            print(f"episode: {e}/{episodes}, score: {time}, e: {epsilon:.2}")  # Print the episode details
            break  # Break out of the loop if the episode is done or truncated

        if len(memory) > batch_size:  # Check if enough experiences are stored in memory
            replay(batch_size)  # Train the model with the stored experiences (pass batch_size here)
```

</details>


## Exercise 3 : Implement a Custom Reward Function

### Objective:
Understand the impact of reward shaping on training the Q-Learning agent.

### Instructions:
1. Modify the reward function to provide more granular feedback to the agent. For example, give higher rewards for keeping the pole more vertical and closer to the center.
2. Implement a reward function that rewards the agent proportionally to the angle of the pole and the distance of the cart from the center.
3. Train the agent with the new reward function and compare the learning speed and stability to the original setting.


In [25]:
# Exercise 3: Implement a Custom Reward Function

# Reinitialize environment and model for this exercise
env = gym.make('CartPole-v1')
np.random.seed(42)
env.action_space.seed(42)
env.observation_space.seed(42)

state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Build the model
def build_model(state_size, action_size):
    model = Sequential()
    model.add(Input(shape=(state_size,)))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(action_size, activation='linear'))
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
    return model

model = build_model(state_size, action_size)

# Initialize epsilon parameters
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.99

# Replay memory
memory = deque(maxlen=2000)

def remember(state, action, reward, next_state, done):
    """Store experience in memory."""
    memory.append((state, action, reward, next_state, done))

def replay(batch_size=64):
    """Train the model using a random sample of experiences from memory."""
    if len(memory) < batch_size:
        return

    minibatch = random.sample(memory, batch_size)
    states = np.vstack([x[0] for x in minibatch])
    actions = np.array([x[1] for x in minibatch])
    rewards = np.array([x[2] for x in minibatch])
    next_states = np.vstack([x[3] for x in minibatch])
    dones = np.array([x[4] for x in minibatch])

    q_next = model.predict(next_states, verbose=0)
    q_target = model.predict(states, verbose=0)

    for i in range(batch_size):
        target = rewards[i]
        if not dones[i]:
            target += 0.95 * np.amax(q_next[i])
        q_target[i][actions[i]] = target

    model.fit(states, q_target, epochs=1, verbose=0)

    global epsilon
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

def act(state):
    """Choose an action based on the current state and exploration rate."""
    if np.random.rand() <= epsilon:
        return random.randrange(action_size)
    act_values = model.predict(state, verbose=0)
    return np.argmax(act_values[0])

# Custom reward function
def custom_reward(state):
    """
    Custom reward function that provides granular feedback based on:
    - Cart position (x): closer to center = higher reward
    - Pole angle (theta): more vertical = higher reward

    State variables:
    - x: cart position (range approximately -2.4 to 2.4)
    - x_dot: cart velocity
    - theta: pole angle in radians (range approximately -0.2095 to 0.2095, about ±12 degrees)
    - theta_dot: pole angular velocity
    """
    # Extract state variables
    # Note: state is already a 1D array, so we need to flatten if it's reshaped
    if len(state.shape) > 1:
        state_flat = state[0]  # Extract from reshaped state
    else:
        state_flat = state

    x, x_dot, theta, theta_dot = state_flat

    # Normalize and compute rewards
    # Cart position reward: maximum when x=0, decreases as |x| increases
    # Using 2.4 as the approximate limit (CartPole-v1 limit is 2.4)
    cart_reward = 1.0 - abs(x) / 2.4

    # Pole angle reward: maximum when theta=0 (vertical), decreases as |theta| increases
    # Using 0.2095 radians (approximately 12 degrees) as the threshold
    # This is close to the failure threshold in CartPole-v1
    angle_reward = 1.0 - abs(theta) / 0.2095

    # Combine rewards (weighted sum)
    # You can adjust weights to emphasize one aspect over the other
    total_reward = 0.5 * cart_reward + 0.5 * angle_reward

    # Scale to make it comparable to original reward (which is 1.0 per step)
    # This ensures the reward magnitude is similar
    return total_reward

# Training parameters
episodes = 100  # Train for 100 episodes to observe learning speed and stability
train_frequency = 5
batch_size = 64

scores = []  # Store scores for analysis
rewards_per_episode = []  # Store total rewards per episode

print("Training with Custom Reward Function...")
print("=" * 60)
print("Custom reward: 0.5 * (1 - |x|/2.4) + 0.5 * (1 - |theta|/0.2095)")
print("=" * 60)

# Train the model with custom reward function
for e in range(episodes):
    state = env.reset()
    state = np.reshape(state, [1, state_size])
    total_reward = 0

    for time in range(500):  # Maximum steps per episode
        action = act(state)
        step_result = env.step(action)
        if len(step_result) == 5:
          next_state, _, terminated, truncated, _ = step_result
          done = terminated or truncated
        else:
          next_state, _, done, _ = step_result


        # Use custom reward function
        reward = custom_reward(next_state) if not done else -10
        total_reward += reward

        next_state = np.reshape(next_state, [1, state_size])
        remember(state, action, reward, next_state, done)
        state = next_state

        if done:
            score = time
            scores.append(score)
            rewards_per_episode.append(total_reward)

            # Print progress every 10 episodes
            if (e + 1) % 10 == 0:
                avg_score = np.mean(scores[-10:])
                avg_reward = np.mean(rewards_per_episode[-10:])
                print(f"Episode: {e+1}/{episodes}, Score: {score}, "
                      f"Avg Score (last 10): {avg_score:.2f}, "
                      f"Total Reward: {total_reward:.2f}, "
                      f"Avg Reward (last 10): {avg_reward:.2f}, "
                      f"Epsilon: {epsilon:.3f}")
            break

        # Train the model periodically
        if time % train_frequency == 0:
            replay(batch_size)

env.close()

# Display final statistics
print("\n" + "=" * 60)
print("Training completed with Custom Reward Function!")
print("=" * 60)
print(f"Average score over all {episodes} episodes: {np.mean(scores):.2f}")
print(f"Average score over last 10 episodes: {np.mean(scores[-10:]):.2f}")
print(f"Average score over last 50 episodes: {np.mean(scores[-50:]):.2f}")
print(f"Best score: {max(scores)}")
print(f"Final epsilon: {epsilon:.3f}")
print(f"\nAverage total reward per episode: {np.mean(rewards_per_episode):.2f}")
print(f"Average total reward (last 10 episodes): {np.mean(rewards_per_episode[-10:]):.2f}")

# Comparison with original reward function
print("\n" + "=" * 60)
print("COMPARISON: Custom Reward vs Original Reward")
print("=" * 60)
print("Original Reward Function:")
print("  - Reward = 1.0 for each step (until episode ends)")
print("  - Reward = -10 when episode terminates")
print("\nCustom Reward Function:")
print("  - Reward = 0.5 * (1 - |x|/2.4) + 0.5 * (1 - |theta|/0.2095)")
print("  - Provides granular feedback based on cart position and pole angle")
print("  - Reward = -10 when episode terminates")
print("\nExpected Benefits:")
print("  - Faster learning: Agent receives immediate feedback on its actions")
print("  - More stable training: Continuous rewards help guide learning")
print("  - Better convergence: Agent learns to optimize for both objectives")
print(f"\nThe custom reward function achieved an average score of {np.mean(scores):.2f}")
print(f"over {episodes} episodes, with the last 10 episodes averaging {np.mean(scores[-10:]):.2f}.")


Training with Custom Reward Function...
Custom reward: 0.5 * (1 - |x|/2.4) + 0.5 * (1 - |theta|/0.2095)
Episode: 10/100, Score: 11, Avg Score (last 10): 19.90, Total Reward: -1.43, Avg Reward (last 10): 6.71, Epsilon: 0.740
Episode: 20/100, Score: 9, Avg Score (last 10): 13.90, Total Reward: -2.53, Avg Reward (last 10): 1.42, Epsilon: 0.536
Episode: 30/100, Score: 9, Avg Score (last 10): 11.30, Total Reward: -2.74, Avg Reward (last 10): -0.92, Epsilon: 0.409
Episode: 40/100, Score: 60, Avg Score (last 10): 22.00, Total Reward: 40.80, Avg Reward (last 10): 8.18, Epsilon: 0.257
Episode: 50/100, Score: 29, Avg Score (last 10): 30.60, Total Reward: 13.37, Avg Reward (last 10): 13.99, Epsilon: 0.134
Episode: 60/100, Score: 50, Avg Score (last 10): 34.90, Total Reward: 29.64, Avg Reward (last 10): 18.36, Epsilon: 0.064
Episode: 70/100, Score: 58, Avg Score (last 10): 67.50, Total Reward: 33.62, Avg Reward (last 10): 45.58, Epsilon: 0.016
Episode: 80/100, Score: 304, Avg Score (last 10): 159.

<details>
    <summary>Click here for Solution</summary>

```python
# Define a custom reward function based on the cart position and pole angle
def custom_reward(state):
    # Extract state variables: x (cart position), x_dot (cart velocity), theta (pole angle), theta_dot (pole angular velocity)
    x, x_dot, theta, theta_dot = state
    
    # Custom reward function: Encourage the agent to keep the cart near the center and the pole upright
    reward = (1 - abs(x) / 2.4) + (1 - abs(theta) / 0.20948)
    
    return reward

episodes = 2  # Number of episodes to run

# Train the model with the custom reward function
for e in range(episodes):
    state = env.reset()  # Reset the environment

    # Print the state structure for debugging
    print(f"State: {state}, State Type: {type(state)}")

    # Extract the state if it's a tuple and reshape if necessary
    if isinstance(state, tuple):
        state = state[0]  # Extract the first element if it's a tuple

    state = np.reshape(state, [1, state_size])  # Reshape state to match the expected input shape

    for time in range(500):  # Limit the episode to 500 time steps
        action = act(state)  # Choose an action based on the current state
        
        # Unpack 5 values returned by env.step(action)
        next_state, reward, done, truncated, _ = env.step(action)

        # Compute the custom reward based on the next state
        reward = custom_reward(next_state) if not done else -10

        # Reshape next_state if necessary
        if isinstance(next_state, tuple):
            next_state = next_state[0]  # Extract the first element if it's a tuple

        next_state = np.reshape(next_state, [1, state_size])  # Reshape next state to match input shape

        # Store the experience in memory
        remember(state, action, reward, next_state, done)
        state = next_state  # Update the current state

        if done or truncated:  # If the episode is done, break out of the loop
            print(f"episode: {e}/{episodes}, score: {time}, e: {epsilon:.2}")
            break

        if len(memory) > batch_size:  # If there are enough samples in memory, train the model
            replay(batch_size)  # Train the model with a batch of experiences

```

</details>


#### Conclusion

Congratulations on completing this lab!  In this lab, you explored various strategies to enhance the performance of the Q-Learning agent, such as experimenting with different network architectures, implementing adaptive exploration rates, and customizing the reward function. These variations help reinforce your understanding of the Q-Learning algorithm's flexibility and the impact of different hyperparameters and strategies on the learning process.  


## Authors


Skills Network


Copyright © IBM Corporation. All rights reserved.
