# Deep Q-Network (DQN)

<div style="text-align:justify">
Deep Q-Network is the simplest architecture involving <b>Reinforcement Learning (RL)</b> and <b>Deep Neural Networks</b>. This is an introduction tutorial to play CartPole Game. Before starting, take a look at this <a href="https://www.youtube.com/watch?v=XiigTGKZfks">YouTube video</a> with a real-life demonstration of a CartPole problem learning process.
<br><br>
RL is a type of machine learning that allow us to create AI agents that learn from the environment by interacting with it to maximize cumulative reward. In the same way, how we learn to ride a bicycle, AI learn it by trial and error, agents in RL algorithms are incentivized with punishments for wrong actions and rewards for good ones.
</div>

## Neural Networks

<div style="text-align:justify">
TensorFlow make it simple to implement a neural network. With the code bellow, we'll create an empty neural network model. <b>Activation</b>, <b>loss</b> and <b>optimizer</b> are the parameters that define the characteristics of the neural networks, but <b>we are not going to discuss them here</b>.
</div>

In [29]:
import tensorflow

from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.layers import Input, Dense
import numpy, random, tensorflow
import gymnasium as gym
from collections import deque

In [30]:
def Brain(input_shape, action_shape):
    _input = Input(shape=input_shape)
    
    _layer = Dense(512, activation='relu', kernel_initializer='he_uniform')(_input)
    
    _layer = Dense(256, activation='relu', kernel_initializer='he_uniform')(_layer)
    
    _layer = Dense(64, activation='relu', kernel_initializer='he_uniform')(_layer)
    
    _layer = Dense(action_shape, activation='linear', kernel_initializer='he_uniform')(_layer)
    
    _model = Model(inputs=_input, outputs=_layer)
    
    _optimizer = RMSprop(learning_rate=0.00025, rho=0.95, epsilon=0.01)
    _model.compile(loss='mse', optimizer=_optimizer, metrics=['accuracy'])
    
    return _model

# create model and show summary.
Brain(input_shape=(4,), action_shape=2).summary()

<div style="text-align:justify">
Later in the training process, you will see the neural network predict the reward value from a particular state. You will see that in code, I'll use <code>model.fit(next_state, reward)</code>. After training, the model will be able to predict the output from unseen input value.
</div>

## Deep Q-Network

<div style="text-align:justify">
Q-learning was introduced by Chris Watkins in 1989 and can be derived from the Bellman Equation.
</div>

### Bellman Equation:

- <i>s</i>: State
- <i>a</i>: Action
- <i>R</i>: Reward
- <i>y</i>: Discount factor

$
V(s) = \max_{a}(R(s,a) + yV(s'))
$

<i>Where</i>:
- $\max_{a}$: We choose the action that provides <i>maximum</i> reward.
- $R(s,a)$: Take an action for a specific state.
- $yV(s')$: Value of the next state.

### Markov Decision Process:

We'll have more than one next possible state. Then:

$A.V(s'_1) + B.V(s'_2) + ... + N.V(s'_n)$

<i>A</i>, <i>B</i> and <i>N</i> are probabilities to one of the next possible state.

<i>Simplifying</i>:

$V(s) = \max_{a}(R(s,a) + y\sum P(s, a, s')V(s'))$

### Living penalty:

<div style="text-align:justify">
We are only rewarding the robot when it gets to the destination. Ideally, there should be a reward for every action the robot takes to help it better assess the quality of its actions. The rewards need not be always the same. But it is much better than having some amount reward for the actions than having no rewards at all. This idea is known as the living penalty.
</div>

### Q-Learning:

$V(s) = \max_{a}(R(s,a) + y\sum P(s, a, s')V(s'))$

$Q(s,a) = R(s,a) + y\sum P(s, a, s') \max_{a} Q(s',a')$

$Q(s,a) = R(s,a) + y \max_{a} Q(s',a')$

### Temporal Difference:

<div style="text-align:justify">
Temporal difference is an agent learning from an environment through episodes with no prior knowledge of the environment. This means temporal difference takes a model-free or unsupervised learning approach. You can consider it learning from trial and error.
</div>

<i>Prior value</i>:<br><br>
$M = Q(s, a)$

<i>After iteration</i>:<br><br>
$K = R(s,a) + y \max_{a} Q(s',a')$

$TD(s,a) = K - M$

$TD(s,a) = Q_{t-1}(s,a) + \alpha TD_{t}(s,a)$

## Remember Function

<div style="text-align:justify">
One of the specific things for DQN is that the Neural Network used in the algorithm tends to forget the previous experiences as it overwrites them with new experiences. So, we need a memory of previous experiences and observations to re-train the model with the earlier experiences. We will call this array of experiences memory and use a <code>remember()</code> function to append state, action, reward, and next state to the memory.    
</div>

In our example, the memory list will have a form of:

<code>memory = [(state, action, reward, next_state, done)...]</code>

<pre>
<font color="#07a">def</font> <font color="#DD4A68">remember</font>(self, state, action, reward, next_state, done):
    self.memory.append((state, action, reward, next_state, done))
    <font color="#07a">if</font> <font color="#690">len</font>(self.memory) > self.train_start:
        <font color="#07a">if</font> self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
</pre>

## Act

<div style="text-align:justify">
Epsilon is the rate in which an agent randomly decides its action rather than a prediction.
</div>

<pre>
<font color="#07a">def</font> <font color="#DD4A68">act</font>(self, state):
    <font color="#07a">if</font> numpy.random.random() <= self.epsilon:
        return random.randrange(self.action_size)
    <font color="#07a">else</font>:
        return numpy.argmax(self.model.predict(state))
</pre>

## Replay Function

<div style="text-align:justify">
To make the agent perform well in the long term, we need to consider the immediate rewards and the future rewards we will get. To do this, we will have a <b>discount rate or alpha</b> and ultimately add it to the current state reward. This way, the agent will learn to maximize the discounted future reward based on the given state. In other words, we are updating our Q value with the cumulative discounted future rewards.
<br><br>
<code>done</code> is just a Boolean that indicates if the state is the final state (cartpole failed).
</div>

<div style="text-align:justify">
    A method that trains NN with experiences in the memory we will call <code>replay()</code> function. First, we will sample some experiences from the memory and call them minibatch: <code>minibatch = random.sample(memory, min(len(memory), batch_size))</code>. I will set the batch size as 64 for this example. If the memory size is less than 64, we will take everything is in our memory.
<br><br>
For those of you who wonder how such function can converge, as it looks like it is trying to predict its output (in some sense it is), don't worry — it's possible, and in our simple case, it does. However, convergence is not always that easy, and in more complex problems, there comes a need for more advanced techniques than CartPole stabilize training. For example, these techniques are Double DQN's or Dueling DQN's, but that's a topic for another article (stay tuned).
</div>

## Agent

Hyper Parameters:

There are some parameters that have to be passed to a reinforcement learning agent. You will see similar parameters in all DQN models:

- EPISODES: Number of games we want the agent to play;
- ALPHA: Decay or discount rate, to calculate the future discounted reward;
- EPSILON: Exploration rate is the rate in which an agent randomly decides its action rather than a prediction;
- EPSILON_DECAY: We want to decrease the number of explorations as it gets good at playing games;
- EPSILON_MIN: We want the agent to explore at least this amount;
- LEARN_RATE — Determines how much neural net learns in each iteration (if used);
- BATCH_SIZE — Determines how much memory DQN will use to train;
- MEMORY_SIZE - Number of states in the memory;
- MIN_BATCH_TO_TRAIN: Minimum batch size to train.

In [34]:
# Registra a função mse para carregar modelos HDF5 com 'mse' como loss
@tensorflow.keras.utils.register_keras_serializable(package='custom', name='mse')
def mse(y_true, y_pred):
    return tensorflow.keras.losses.mean_squared_error(y_true, y_pred)

In [37]:
class DQNAgent:
    
    def __init__(self):
        
        self.model_name = 'models/cartpole-dqn.h5'
        
        # Hyper parameters
        self.MIN_BATCH_TO_TRAIN = 1000
        self.EPSILON_DECAY = 0.999
        self.EPSILON_MIN = 0.001
        self.MEMORY_SIZE = 2000
        self.EPISODES = 1000
        self.BATCH_SIZE = 64
        self.epsilon = 1.0
        self.ALPHA = 0.95
        
        # Environment setup
        self.env = gym.make('CartPole-v1')
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n
        self.memory = deque(maxlen=self.MEMORY_SIZE)
        
        # Model
        self.model = Brain(input_shape=(self.state_size,), action_shape=self.action_size)
    
    def remember(self, state, action, reward, next_state, done):
        
        # Save state in memory
        self.memory.append((state, action, reward, next_state, done))
        
        # Decrease epsilon value
        if len(self.memory) > self.MIN_BATCH_TO_TRAIN:
            if self.epsilon > self.EPSILON_MIN:
                self.epsilon = self.epsilon * self.EPSILON_DECAY
    
    def act(self, state):
        if numpy.random.random() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            state = numpy.reshape(state, [1, self.state_size])
            return numpy.argmax(self.model.predict(state))
    
    def replay(self):
        
        # Skip train
        if len(self.memory) <= self.MIN_BATCH_TO_TRAIN:
            return
        
        # Randomly sample from memory
        minibatch = random.sample(self.memory, min(len(self.memory), self.BATCH_SIZE))
        
        # Get values from sample
        _state, _action, _reward, _next_state, _done = map(numpy.array, zip(*minibatch))
        
        # Predict targets
        target = self.model.predict(_state)
        target_next = self.model.predict(_next_state)
        
        # Calculate Q-Values.
        for i in range(self.BATCH_SIZE):
            if _done[i]:
                target[i][_action[i]] = _reward[i]
            else:
                target[i][_action[i]] = _reward[i] + (self.ALPHA * numpy.amax(target_next[i]))
        
        # Train model (single epoch)
        with tensorflow.device('/GPU'):
            self.model.fit(_state, target, batch_size=self.BATCH_SIZE, verbose=0)
    
    def train(self):
        for i in range(self.EPISODES):
            
            # Reset before each epoch
            _state, _ = self.env.reset()
            _done = False
            _lifetime = 0
            
            while not _done:
                _action = self.act(_state)
                _nxt_state, _reward, terminated, truncated, _info = self.env.step(_action)
                _done = terminated or truncated
                
                # Calculate reward
                if _done and _lifetime != self.env._max_episode_steps - 1:
                    _reward = -100
                
                # Record experience
                self.remember(_state, _action, _reward, _nxt_state, _done)
                _state = _nxt_state
                
                _lifetime = _lifetime + 1
                
                if _done:
                    msg = 'Episode: {}/{}, score: {}, epsilon: {:.5f}'
                    print(msg.format(i, self.EPISODES, _lifetime, self.epsilon))
                    
                    # Save model and stop learning
                    if _lifetime == self.env._max_episode_steps:
                        self.model.save(self.model_name)
                        return
                
                self.replay()
    
    def run(self):
        # Load the trained model
        try:
            # Pass custom_objects to load the custom mse function
            self.model = load_model(self.model_name, custom_objects={'mse': mse})
            print(f"Model loaded successfully from {self.model_name}")
        except Exception as e:
            print(f"Error loading model: {e}")
            print("Please ensure the model has been trained and saved to the correct path.")
            return

        print("Running the trained agent...")
        for i in range(10): # Run for 10 episodes
            _state, _ = self.env.reset()
            _done = False
            _lifetime = 0

            while not _done:
                # The environment rendering is handled by render_mode='human'
                # self.env.render() # This line is not needed with render_mode='human'

                # Reshape state for model prediction
                _state = numpy.reshape(_state, [1, self.state_size])

                # Predict the best action using the loaded model
                _action = numpy.argmax(self.model.predict(_state, verbose=0)) # verbose=0 to reduce output

                # Take the action in the environment
                _nxt_state, _reward, terminated, truncated, _info = self.env.step(_action)
                _done = terminated or truncated

                # Update state
                _state = _nxt_state

                _lifetime = _lifetime + 1

                if _done:
                    # Show score and stop the game
                    msg = 'Episode: {}, score: {}'
                    print(msg.format(i + 1, _lifetime)) # Use i + 1 for 1-based episode numbering
                    break

### Training

In [None]:
agent = DQNAgent()
#agent.train()

## Test

In [None]:
agent.run()