

<p><img height="80px" src="https://www.upm.es/sfs/Rectorado/Gabinete%20del%20Rector/Logos/UPM/Escudo/EscUpm.jpg" align="left" hspace="0px" vspace="0px"></p>

**Course "Artificial Neural Networks and Deep Learning" - Universidad Politécnica de Madrid (UPM)**

# **Deep Q-Learning for Cartpole**

This notebook includes an implementation of the Deep Q-learning (DQN) algorithm for the cartpole problem (see [Cartpole documentation](https://gymnasium.farama.org/environments/classic_control/cart_pole/)).

Original code by: Artificial Neural Networks and Deep Learning professors
Modifications by:
<ul>
<li>David González Fernández (david.gonzalezf@alumnos.upm.es)</li>
<li>Sergio Arroni del Riego ()</li>
<li>José Manuel Pérez Lamas ()</li>
<li>Paul Delage (paul.delage@alumnos.upm.es)</li>
</ul>

### Listado de cambios realizados:

## Libraries

In [108]:
# TODO: descomentar
#!pip install gymnasium[classic-control]

import gymnasium as gym
import numpy as np
from tensorflow import keras
import matplotlib.pyplot as plt
import time
import random
import math

## Hyperparameters

In [109]:
GAMMA = 0.99
MEMORY_SIZE = 50000
LEARNING_RATE = 0.001
BATCH_SIZE = 128
EXPLORATION_MAX = 1
EXPLORATION_MIN = 0.05
EXPLORATION_DECAY = 0.99
NUMBER_OF_EPISODES_FOR_TRAINING = 3000
NUMBER_OF_EPISODES_FOR_TESTING = 20
MAX_STEPS_PER_EPISODE = 100

## Class ReplayMemory

Memory of transitions for experience replay.

In [110]:
class ReplayMemory:
    def __init__(self,number_of_observations):
        # Create replay memory
        self.states = np.zeros((MEMORY_SIZE, number_of_observations))
        self.states_next = np.zeros((MEMORY_SIZE, number_of_observations))
        self.actions = np.zeros(MEMORY_SIZE, dtype=np.int32)
        self.rewards = np.zeros(MEMORY_SIZE)
        self.terminal_states = np.zeros(MEMORY_SIZE, dtype=bool)
        self.current_size = 0
        self.position = 0
        self.max_size = MEMORY_SIZE

    def store_transition(self, state, action, reward, state_next, terminal_state):
        # Store a transition (s,a,r,s') in the replay memory
        i = (self.position) % self.max_size
        self.position += 1
        self.states[i] = state
        self.states_next[i] = state_next
        self.actions[i] = action
        self.rewards[i] = reward
        self.terminal_states[i] = terminal_state
        self.current_size = min(self.current_size+1, self.max_size)

    def sample_memory(self, batch_size):
        # Generate a sample of transitions from the replay memory
        batch = np.random.choice(self.current_size, batch_size)
        states = self.states[batch]
        states_next = self.states_next[batch]
        rewards = self.rewards[batch]
        actions = self.actions[batch]
        terminal_states = self.terminal_states[batch]
        return states, actions, rewards, states_next, terminal_states

## Class DQN

Reinforcement learning agent with a Deep Q-Network.

In [111]:
class DQN:
    def __init__(self, number_of_observations, number_of_actions):
        # Initialize variables and create neural model
        self.exploration_rate = EXPLORATION_MAX
        self.number_of_actions = number_of_actions
        self.number_of_observations = number_of_observations
        self.scores = []

        self.memory = ReplayMemory(number_of_observations)
        self.warm_up_episodes = 25

        # Modelo
        self.model = keras.models.Sequential()
        self.model.add(keras.layers.Embedding(number_of_observations, 6))
        self.model.add(keras.layers.Dense(50, activation='relu'))
        self.model.add(keras.layers.Dense(50, activation='relu'))
        self.model.add(keras.layers.Dense(50, activation='relu'))
        self.model.add(keras.layers.Dense(number_of_actions, activation='relu'))
        self.model.compile(loss="mse", optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE))


    def remember(self, state, action, reward, next_state, terminal_state):
        # Store a tuple (s, a, r, s') for experience replay
        state = np.reshape(state, [1, 1])
        next_state = np.reshape(next_state, [1, 1])
        self.memory.store_transition(state, action, reward, next_state, terminal_state)


    def select(self, state):
        # Generate an action for a given state using epsilon-greedy policy
        if np.random.rand() < self.exploration_rate:
            return random.randrange(self.number_of_actions)
        else:
            state = np.reshape(state, [1, 1])
            q_values = self.model.predict(state, verbose=0)
            return np.argmax(q_values[0])


    def select_greedy_policy(self, state):
        # Generate an action for a given state using greedy policy
        state = np.reshape(state, [1, 1])
        q_values = self.model.predict(state, verbose=0)
        return np.argmax(q_values[0])


    # TODO: cambiar
    def learn(self):
        # Learn the value Q using a sample of examples from the replay memory
        if self.memory.current_size < BATCH_SIZE:
            return

        states, actions, rewards, next_states, terminal_states = self.memory.sample_memory(BATCH_SIZE)

        q_targets = self.model.predict(states, verbose=0)
        q_next_states = self.model.predict(next_states, verbose=0)

        for i in range(BATCH_SIZE):
             if (terminal_states[i]):
                  q_targets[i][actions[i]] = rewards[i]
             else:
                  q_targets[i][actions[i]] = rewards[i] + GAMMA * np.max(q_next_states[i])

        self.model.train_on_batch(states, q_targets)
    

    def decrease_exploration_rate(self, n_episode):
        if n_episode >= self.warm_up_episodes:
            # Decrease exploration rate
            self.exploration_rate *= EXPLORATION_DECAY
            self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)


    def add_score(self, score):
       # Add the obtained score to a list to be presented later
        self.scores.append(score)


    def delete_scores(self):
       # Delete the scores
        self.scores = []


    def display_scores_graphically(self):
        # Display the obtained scores graphically
        plt.plot(self.scores)
        plt.xlabel("Episode")
        plt.ylabel("Score")

## Environment Cartpole

<p><img height="200px" src="https://raw.githubusercontent.com/martin-molina/reinforcement_learning/main/images/cartpole_attributes.png" align="center" vspace="20px"</p>

State vector:
- state[0]: cart position
- state[1]: cart velocity
- state[2]: pole angle
- state[3]: pole angular velocity

Actions:
- 0 (push cart to the left)
- 1 (push cart to the right)

In [112]:
def create_environment():
    # Create simulated environment
    environment = gym.make("Taxi-v3")
    number_of_observations = environment.observation_space.n
    number_of_actions = environment.action_space.n
    return environment, number_of_observations, number_of_actions

## Training program




In [113]:
def borrar():
    environment, number_of_observations, number_of_actions = create_environment()
    agent = DQN(number_of_observations, number_of_actions)
    episode = 0
    start_time = time.perf_counter()

    while (episode < NUMBER_OF_EPISODES_FOR_TRAINING):
        episode += 1
        score = 0
        state, info = environment.reset()
        end_episode = False
        steps = 0
        agent.decrease_exploration_rate(episode)
        
        while not(end_episode):
            steps += 1
            # Select an action for the current state
            action = agent.select(state)

            # Execute the action on the environment
            state_next, reward, terminal_state, truncated, info = environment.step(action)
        
            # Store in memory the transition (s,a,r,s')
            agent.remember(state, action, reward, state_next, terminal_state)

            score += reward

            # Learn using a batch of experience stored in memory
            agent.learn()


            # Detect end of episode
            if terminal_state or truncated or steps>MAX_STEPS_PER_EPISODE:
                # TODO: pendiente de comprobar
                agent.add_score(score)
                print("Episode {0:>3}: ".format(episode), end = '')
                print("score {0:>3} ".format(math.trunc(score)), end = '')
                print("(exploration rate: %.2f, " % agent.exploration_rate, end = '')
                print("transitions: " + str(agent.memory.current_size) + ")")
                end_episode = True
            else:
                state = state_next


    print("Time for training:", round((time.perf_counter() - start_time)/60), "minutes")
    print("Score (max):", max(agent.scores))
    average_score = np.mean(agent.scores[max(0,(len(agent.scores)-10)):(len(agent.scores))])
    print("Score (average last 10 episodes):", average_score)

    agent.display_scores_graphically()


borrar()

Episode   1: score -407 (exploration rate: 1.00, transitions: 101)
Episode   2: score -407 (exploration rate: 1.00, transitions: 202)
Episode   3: score -354 (exploration rate: 1.00, transitions: 298)
Episode   4: score -434 (exploration rate: 1.00, transitions: 399)
Episode   5: score -425 (exploration rate: 1.00, transitions: 500)
Episode   6: score -344 (exploration rate: 1.00, transitions: 601)
Episode   7: score -362 (exploration rate: 1.00, transitions: 702)
Episode   8: score -398 (exploration rate: 1.00, transitions: 803)
Episode   9: score -434 (exploration rate: 1.00, transitions: 904)
Episode  10: score -380 (exploration rate: 1.00, transitions: 1005)
Episode  11: score -398 (exploration rate: 1.00, transitions: 1106)
Episode  12: score -371 (exploration rate: 1.00, transitions: 1207)
Episode  13: score -362 (exploration rate: 1.00, transitions: 1308)
Episode  14: score -434 (exploration rate: 1.00, transitions: 1409)
Episode  15: score -398 (exploration rate: 1.00, transiti

KeyboardInterrupt: 


## Testing program



In [None]:
if False:
    # TODO: aún nada hecho

    agent.delete_scores()
    episode = 0
    start_time = time.perf_counter()
    while (episode < NUMBER_OF_EPISODES_FOR_TESTING):
        episode += 1
        score = 0
        state, info = environment.reset()
        end_episode = False
        while not(end_episode):
            # Select an action for the current state
            action = agent.select_greedy_policy(state)

            # Execute the action in the environment
            state_next, reward, terminal_state, truncated, info = environment.step(action)

            score += reward

            # Detect end of episode and print
            if terminal_state or truncated:
                agent.add_score(score)
                print("Episode {0:>3}: ".format(episode), end = '')
                print("score {0:>3} \n".format(math.trunc(score)), end = '')
                end_episode = True
            else:
                state = state_next

    print("Time for testing:", round((time.perf_counter() - start_time)/60), "minutes")
    print("Score (average):", np.mean(agent.scores))
    print("Score (max):", max(agent.scores))