In [None]:
import time
from collections import deque, namedtuple

import gymnasium as gym
import numpy as np
import PIL.Image
import tensorflow as tf
import utils

from pyvirtualdisplay import Display

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.losses import MSE
from tensorflow.keras.optimizers import Adam

In [None]:
# Set up a virtual display to render the Lunar Lander environment.
Display(visible=0, size=(840, 480)).start()

# Set the random seed for TensorFlow
tf.random.set_seed(utils.SEED)

In [None]:
MEMORY_SIZE = 100_000     # size of memory buffer
GAMMA = 0.995             # discount factor
ALPHA = 1e-3              # learning rate  
NUM_STEPS_FOR_UPDATE = 4  # perform a learning update every C time steps

In [None]:
env = gym.make("CartPole-v0", render_mode="rgb_array", sutton_barto_reward=False)

In [None]:
env.reset()
PIL.Image.fromarray(env.render())


In [None]:
fixed_state = np.array([0.25, 0.0, 0.0, 0.0])  # x=0.5, rest = 0
env.unwrapped.state = fixed_state
PIL.Image.fromarray(env.render())

In [None]:
state_size = env.observation_space.shape
num_actions = env.action_space.n

print('State Shape:', state_size)
print('Number of actions:', num_actions)

In [None]:
# Reset the environment and get the initial state.
current_state = env.reset()
print(current_state)

In [None]:
# Select an action
action = 0

# Run a single time step of the environment's dynamics with the given action.
next_state, reward, terminated_vals, _ , _= env.step(action)

print (next_state[0], reward, terminated_vals)



In [None]:
# Create the Q-Network
q_network = Sequential([ 
    Input(shape=state_size),                      
    Dense(units=32, activation='relu'),            
    Dense(units=32, activation='relu'),            
    Dense(units=num_actions, activation='linear'),
    ])
     
     
# Create the target Q^-Network
target_q_network = Sequential([
    Input(shape=state_size),                      
    Dense(units=32, activation='relu'),            
    Dense(units=32, activation='relu'),            
    Dense(units=num_actions, activation='linear'),
    ])
    
optimizer = Adam(learning_rate=ALPHA)             


### To Train DQN and DDQN



In [None]:
# Store experiences as named tuples
experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "terminated_vals"])

In [None]:
def compute_loss(experiences, gamma, q_network, target_q_network):
    """ 
    Calculates the loss.
    
    Args:
      experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "terminated_vals"] namedtuples
      gamma: (float) The discount factor.
      q_network: (tf.keras.Sequential) Keras model for predicting the q_values
      target_q_network: (tf.keras.Sequential) Keras model for predicting the targets
          
    Returns:
      loss: (TensorFlow Tensor(shape=(0,), dtype=int32)) the Mean-Squared Error between
            the y targets and the Q(s,a) values.
    """

    # Unpack the mini-batch of experience tuples
    states, actions, rewards, next_states, terminated_vals = experiences
    

    


    ### For DQN ###
    # Compute max Q^(s,a)
    max_qsa = tf.reduce_max(target_q_network(next_states), axis=-1)
    y_targets = rewards + (1-terminated_vals)*(gamma*max_qsa)
    
    
    # ### For Double DQN ###
    # # Q-values from target network for next_states
    # q_values_target = target_q_network(next_states)  # shape [batch_size, num_actions]
    # q_values = q_network(next_states)
    # # Greedy actions chosen by q_network
    # next_actions = tf.argmax(q_values, axis=1,output_type=tf.int32)  # shape [batch_size]
    # # Gather the Q-value for each chosen action
    # batch_indices = tf.range(tf.shape(next_actions)[0], dtype=tf.int32)
    # indices = tf.stack([batch_indices, next_actions], axis=1)
    # q_next_target = tf.gather_nd(q_values_target, indices)  # shape [batch_size]
    # y_targets = rewards + (1-terminated_vals)*(gamma*q_next_target)
    

    # Set y = R if episode terminates, otherwise set y = R + γ max Q^(s,a). 
    
    
    
    # Get the q_values and reshape to match y_targets
    q_values = q_network(states)
    q_values = tf.gather_nd(q_values, tf.stack([tf.range(q_values.shape[0]),
                                                tf.cast(actions, tf.int32)], axis=1))
        
    # Compute the loss

    loss = MSE (y_targets, q_values)

    
    return loss

In [None]:
# @tf.function decorator to increase performance. Without this decorator our training will take twice as long
@tf.function
def agent_learn(experiences, gamma):
    """
    Updates the weights of the Q networks.
    
    Args:
      experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "terminated_vals"] namedtuples
      gamma: (float) The discount factor.
    
    """
    
    # Calculate the loss
    with tf.GradientTape() as tape:
        loss = compute_loss(experiences, gamma, q_network, target_q_network)

    # Get the gradients of the loss with respect to the weights.
    gradients = tape.gradient(loss, q_network.trainable_variables)
    
    # Update the weights of the q_network.
    optimizer.apply_gradients(zip(gradients, q_network.trainable_variables))

    # update the weights of target q_network
    utils.update_target_network(q_network, target_q_network)

In [None]:
#create a csv logger for DQN
utils.init_logger("logs/dqn.csv")

In [None]:
# To train an evaluate DQN
start = time.time()

num_episodes = 5000
max_num_timesteps = 200
td_loss_total = 0
total_point_history = []

num_p_av = 100    # number of total points to use for averaging
epsilon = 1.0     # initial ε value for ε-greedy policy

# Create a memory buffer D with capacity N
memory_buffer = deque(maxlen=MEMORY_SIZE)

# Set the target network weights equal to the Q-Network weights
target_q_network.set_weights(q_network.get_weights())

for i in range(num_episodes):
    
    # Reset the environment to the initial state and get the initial state
    state, _ = env.reset()
    total_points = 0
    # print(state)
    for t in range(max_num_timesteps):
        
        # From the current state S choose an action A using an ε-greedy policy
        state_qn = np.expand_dims(state, axis=0)  # state needs to be the right shape for the q_network
        q_values = q_network(state_qn)
        action = utils.get_action(q_values, epsilon)
        
        #to get max_q to plotting
        max_q = max(q_values.numpy()[0])
        
        # Take action A and receive reward R and the next state S'
        next_state, reward, terminated_vals, _ , _= env.step(action)
        # Penalize distance from center
        reward = 1.0 - abs(next_state[0])  
        if abs(next_state[0]) > 0.5:
            reward = 0
        # print(reward)
        # Store experience tuple (S,A,R,S') in the memory buffer.
        # We store the terminated_vals variable as well for convenience.
        memory_buffer.append(experience(state, action, reward, next_state, terminated_vals))
        
        # Only update the network every NUM_STEPS_FOR_UPDATE time steps.
        update = utils.check_update_conditions(t, NUM_STEPS_FOR_UPDATE, memory_buffer)
        
        if update:
            # Sample random mini-batch of experience tuples (S,A,R,S') from D
            experiences = utils.get_experiences(memory_buffer)
                    
            # Set the y targets, perform a gradient descent step,
            # and update the network weights.
            
            agent_learn(experiences, GAMMA)

        

            
        # print(type(next_state))
        state = next_state.copy()
        
        total_points += reward
        

        if terminated_vals:
            break
             
    total_point_history.append(total_points)
    av_latest_points = np.mean(total_point_history[-num_p_av:])
    
    #to get avg_reward for plotting
    avg_reward = av_latest_points
    
    # Update the ε value
    epsilon = utils.get_new_eps(epsilon)

    print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}", end="")

    if (i+1) % num_p_av == 0:
        print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}")

    #to log in csv file for later analysis
    utils.log_metrics("logs/dqn.csv", i, reward, avg_reward, td_loss_total, max_q)
    
    
    # We will consider that the environment is solved if we get an
    # average of 180 points in the last 100 episodes.
    if av_latest_points >= 190.0:
        print(f"\n\nEnvironment solved in {i+1} episodes!")
        q_network.save('models/cart_pole_DQN_model.h5')
        break
        
tot_time = time.time() - start

print(f"\nTotal Runtime: {tot_time:.2f} s ({(tot_time/60):.2f} min)")

In [None]:
#create a csv logger for DQN
utils.init_logger("logs/ddqn.csv")

In [None]:
# To train an evaluate DDQN
start = time.time()

num_episodes = 5000
max_num_timesteps = 200

total_point_history = []

num_p_av = 100    # number of total points to use for averaging
epsilon = 1.0     # initial ε value for ε-greedy policy

# Create a memory buffer D with capacity N
memory_buffer = deque(maxlen=MEMORY_SIZE)
td_loss_total = 0
# Set the target network weights equal to the Q-Network weights
target_q_network.set_weights(q_network.get_weights())

for i in range(num_episodes):
    
    # Reset the environment to the initial state and get the initial state
    state, _ = env.reset()
    total_points = 0
    # print(state)
    for t in range(max_num_timesteps):
        
        # From the current state S choose an action A using an ε-greedy policy
        state_qn = np.expand_dims(state, axis=0)  # state needs to be the right shape for the q_network
        q_values = q_network(state_qn)
        action = utils.get_action(q_values, epsilon)
        
        #to get max_q to plotting
        max_q = max(q_values.numpy()[0])
        
        # Take action A and receive reward R and the next state S'
        next_state, reward, terminated_vals, _ , _= env.step(action)
        # Penalize distance from center
        reward = 1.0 - abs(next_state[0])  
        if abs(next_state[0]) > 0.5:
            reward = 0
        # print(reward)
        # Store experience tuple (S,A,R,S') in the memory buffer.
        # We store the terminated_vals variable as well for convenience.
        memory_buffer.append(experience(state, action, reward, next_state, terminated_vals))
        
        # Only update the network every NUM_STEPS_FOR_UPDATE time steps.
        update = utils.check_update_conditions(t, NUM_STEPS_FOR_UPDATE, memory_buffer)
        
        if update:
            # Sample random mini-batch of experience tuples (S,A,R,S') from D
            experiences = utils.get_experiences(memory_buffer)

            # Set the y targets, perform a gradient descent step,
            # and update the network weights.

            
            agent_learn(experiences, GAMMA)
            
        # print(type(next_state))
        state = next_state.copy()
        
        total_points += reward
        

        if terminated_vals:

            break     
    total_point_history.append(total_points)
    av_latest_points = np.mean(total_point_history[-num_p_av:])
    
    #to get avg_reward for plotting
    avg_reward = av_latest_points
    
    # Update the ε value
    epsilon = utils.get_new_eps(epsilon)

    print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}", end="")

    if (i+1) % num_p_av == 0:
        print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}")

    #to log in csv file for later analysis
    
    utils.log_metrics("logs/ddqn.csv", i, reward, avg_reward, max_q)
    
    
    # We will consider that the environment is solved if we get an
    # average of 200 points in the last 100 episodes.
    if av_latest_points >=  190.0:
        print(f"\n\nEnvironment solved in {i+1} episodes!")
        q_network.save('models/cart_pole_DDQN_model.h5')
        break
        
tot_time = time.time() - start

print(f"\nTotal Runtime: {tot_time:.2f} s ({(tot_time/60):.2f} min)")



### To Train DDQN + PER


In [None]:
#create a csv logger for DQN + PER
utils.init_logger("logs/ddqn_per.csv")


In [None]:
# Store experiences as named tuples
experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "terminated_vals", "priority"])


In [None]:
def compute_loss(experiences, gamma, q_network, target_q_network, weights):
    """ 
    Calculates the loss.
    
    Args:
      experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "terminated_vals"] namedtuples
      gamma: (float) The discount factor.
      q_network: (tf.keras.Sequential) Keras model for predicting the q_values
      target_q_network: (tf.keras.Sequential) Keras model for predicting the targets
          
    Returns:
      loss: (TensorFlow Tensor(shape=(0,), dtype=int32)) the Mean-Squared Error between
            the y targets and the Q(s,a) values.
    """

    # Unpack the mini-batch of experience tuples
    states, actions, rewards, next_states, terminated_vals =  experiences
    
    # Convert weights to tf.float32
    weights = tf.convert_to_tensor(weights, dtype=tf.float32)

    ### For Double DQN ###
    # Q-values from target network for next_states
    q_values_target = target_q_network(next_states)  # shape [batch_size, num_actions]
    q_values = q_network(next_states)
    # Greedy actions chosen by q_network
    next_actions = tf.argmax(q_values, axis=1,output_type=tf.int32)  # shape [batch_size]
    # Gather the Q-value for each chosen action
    batch_indices = tf.range(tf.shape(next_actions)[0], dtype=tf.int32)
    indices = tf.stack([batch_indices, next_actions], axis=1)
    q_next_target = tf.gather_nd(q_values_target, indices)  # shape [batch_size]
    y_targets = rewards + (1-terminated_vals)*(gamma*q_next_target)
    

    # Set y = R if episode terminates, otherwise set y = R + γ max Q^(s,a). 


    # Get the q_values and reshape to match y_targets
    q_values = q_network(states)
    q_values = tf.gather_nd(q_values, tf.stack([tf.range(q_values.shape[0]),
                                                tf.cast(actions, tf.int32)], axis=1))
        
    # Compute the td_loss

    td_errors = y_targets - q_values 
    # --- Compute PER-weighted loss ---
    loss = tf.reduce_mean(tf.square(td_errors) * weights)
    
    return loss, td_errors

In [None]:
# @tf.function decorator to increase performance. Without this decorator our training will take twice as long
@tf.function
def agent_learn(experiences, gamma, weights):
    """
    Updates the weights of the Q networks.
    
    Args:
      experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "terminated_vals"] namedtuples
      gamma: (float) The discount factor.
    
    """
    
    # Calculate the loss
    with tf.GradientTape() as tape:
        loss, td_errors = compute_loss(experiences, gamma, q_network, target_q_network, weights)

    # Get the gradients of the loss with respect to the weights.
    gradients = tape.gradient(loss, q_network.trainable_variables)
    
    # Update the weights of the q_network.
    optimizer.apply_gradients(zip(gradients, q_network.trainable_variables))

    # update the weights of target q_network
    utils.update_target_network(q_network, target_q_network)
    
    # Return TD-errors to update PER priorities
    return td_errors
    

In [None]:
# To train an evaluate DDQN + PER
start = time.time()

num_episodes = 5000
max_num_timesteps = 200
steps = 0
total_point_history = []

num_p_av = 100    # number of total points to use for averaging
epsilon = 1.0     # initial ε value for ε-greedy policy

# Create a memory buffer D with capacity N
memory_buffer = deque(maxlen=MEMORY_SIZE)
priorities = np.ones(MEMORY_SIZE)  # same length as buffer
td_loss_total = 0
# Set the target network weights equal to the Q-Network weights
target_q_network.set_weights(q_network.get_weights())

for i in range(num_episodes):
    
    # Reset the environment to the initial state and get the initial state
    state, _ = env.reset()
    total_points = 0
    # print(state)
    for t in range(max_num_timesteps):
        
        # From the current state S choose an action A using an ε-greedy policy
        state_qn = np.expand_dims(state, axis=0)  # state needs to be the right shape for the q_network
        q_values = q_network(state_qn)
        action = utils.get_action(q_values, epsilon)
        
        #to get max_q to plotting
        max_q = max(q_values.numpy()[0])
        
        # Take action A and receive reward R and the next state S'
        next_state, reward, terminated_vals, _ , _= env.step(action)
        # Penalize distance from center
        reward = 1.0 - abs(next_state[0])  
        if abs(next_state[0]) > 0.5:
            reward = 0
        # print(reward)
        # Store experience tuple (S,A,R,S') in the memory buffer.
        # We store the terminated_vals variable as well for convenience.
        memory_buffer.append(experience(state, action, reward, next_state, terminated_vals, priority=1.0))
        
        # Only update the network every NUM_STEPS_FOR_UPDATE time steps.
        update = utils.check_update_conditions(t, NUM_STEPS_FOR_UPDATE, memory_buffer)
        
        if update:
           # Sample experiences from PER buffer
            steps += 1
            experiences, indices, weights = utils.get_experiences_PER(memory_buffer, step=steps)
            # Set the y targets, perform a gradient descent step,
            # and update the network weights.

            
            td_errors =  agent_learn(experiences, GAMMA, weights)
            # Update priorities in memory
            new_priorities = np.abs(td_errors.numpy()) + 1e-6
            # Directly update the priorities array
            # Update the priority in the experience itself
            for idx, p in zip(indices, new_priorities):
                old_exp = memory_buffer[idx]
                memory_buffer[idx] = experience(
                    state=old_exp.state,
                    action=old_exp.action,
                    reward=old_exp.reward,
                    next_state=old_exp.next_state,
                    terminated_vals=old_exp.terminated_vals,
                    priority=float(p)
                )
            
        # print(type(next_state))
        state = next_state.copy()
        
        total_points += reward
        

        if terminated_vals:

            break     
    total_point_history.append(total_points)
    av_latest_points = np.mean(total_point_history[-num_p_av:])
    
    #to get avg_reward for plotting
    avg_reward = av_latest_points
    
    # Update the ε value
    epsilon = utils.get_new_eps(epsilon)

    print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}", end="")

    if (i+1) % num_p_av == 0:
        print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}")

    #to log in csv file for later analysis
    
    utils.log_metrics("logs/ddqn_per.csv", i, reward, avg_reward, max_q)
    
    
    # We will consider that the environment is solved if we get an
    # average of 200 points in the last 100 episodes.
    if av_latest_points >=  190.0:
        print(f"\n\nEnvironment solved in {i+1} episodes!")
        q_network.save('models/cart_pole_DDQN_PER_model.h5')
        break
        
tot_time = time.time() - start

print(f"\nTotal Runtime: {tot_time:.2f} s ({(tot_time/60):.2f} min)")


# Exporting Video of Untrained Model

In [None]:
from tensorflow.keras.models import load_model
import gymnasium as gym
import numpy as np
import time

In [None]:
# Create environment
# env = gym.make("CartPole-v0",render_mode="human")  # or the env you trained on
env = gym.make("CartPole-v0",render_mode="rgb_array")  # or the env you trained on

env = gym.wrappers.RecordVideo(env, video_folder="videos", episode_trigger=lambda e: True,name_prefix = "untrained_cart_pole")
state, _ = env.reset()
done = False
total_reward = 0
start_time = time.time()
while not done and ((time.time() - start_time)) <= 10: # stop the render if done or 30 sec whatever it first
    env.render()  # optional, shows the agent in action
    action = env.action_space.sample() # to show without training how it works
    # Step in environment
    next_state, reward, terminated, truncated, _ = env.step(action)
    if terminated == True:
        env.reset()
    state = next_state

env.close()

In [None]:
import os
from moviepy import concatenate_videoclips, VideoFileClip

video_dir = "videos"
prefix = "untrained_cart_pole"

# Collect files matching prefix
video_files = sorted([f for f in os.listdir(video_dir) if f.startswith(prefix) and f.endswith(".mp4")])

# Load clips
clips = [VideoFileClip(os.path.join(video_dir, f)) for f in video_files]

# Concatenate into one video
final = concatenate_videoclips(clips)
output_path = os.path.join(video_dir, f"{prefix}_combined.mp4")
final.write_videofile(output_path)

# Close clips
for clip in clips:
    clip.close()

# Delete originals
for f in video_files:
    os.remove(os.path.join(video_dir, f))

print(f"✅ Combined video saved at {output_path} and original episode videos deleted.")


# Exporting Video of Trained Model

In [None]:
# Load your trained model
model = load_model('models/cart_pole_DDQN_PER_model.h5')
# Create environment
# env = gym.make("CartPole-v0",render_mode="human")  # or the env you trained on
env = gym.make("CartPole-v0",render_mode="rgb_array")  # or the env you trained on
env = gym.wrappers.RecordVideo(env, video_folder="videos", episode_trigger=lambda e: True,name_prefix = "DDQN_PER_trained_cart_pole", video_length= 1500)
state, _ = env.reset()
done = False
total_reward = 0
for _ in range(1500):  # run exactly video_length steps
    # env.render()  # optional, shows the agent in action

    # Prepare state for network
    state_input = np.expand_dims(state, axis=0)

    # Get Q-values and pick greedy action
    q_values = model(state_input)
    action = np.argmax(q_values.numpy()[0])

    # Step in environment
    next_state, reward, terminated, truncated, _ = env.step(action)
    done = terminated 
    if done == True:
        break
    state = next_state
    total_reward += reward

print("Episode finished. Total reward:", total_reward)

env.close()

# To Plot

In [None]:
import utils



In [None]:
utils.plot_comparison("figures/comparison.jpg")