# Frozen-Lake with a DQN (Deep Q-Network) agent

## Neural Network predictions

While we in the previous step used a q-table as backend, the problem can also be solved by function approximation with a neural network.

### Building the model

We can start by building the network model:

In [1]:
# Imports
import tensorflow as tf
from tensorflow import keras
from tensorflow.python.keras import Sequential
from tensorflow.python.keras.layers import Dense
from tensorflow.python.keras.optimizer_v2.adam import Adam

def build_dqn_model(alpha: float = 0.001) -> Sequential:
    """Builds a deep neural net which predicts the Q values for all possible
    actions given a state.

    The input should have the shape of the state, and the output should have the same shape as
    the action space since we want 1 Q value per possible action.

    Args:
		alpha: learning-rate

	Returns:
		q-net model
    """
    x_data = np.linspace(0, 15, 16)
    normalizer = keras.layers.Normalization(input_shape=[1, ], axis=None)
    normalizer.adapt(np.array(x_data))

    q_net = Sequential()
    # We start with the normalizer, input shape is of size 1 (state)
    q_net.add(normalizer)
    # First hidden layer has 32 neurons
    q_net.add(Dense(64, activation='relu', kernel_initializer='he_uniform'))
    # The second hidden layer also have 32 neurons
    q_net.add(Dense(64, activation='relu', kernel_initializer='he_uniform'))
    # Since we have 4 possible actions, the output layer should be of size 4
    q_net.add(Dense(4, activation='linear', kernel_initializer='he_uniform'))
    q_net.compile(optimizer=Adam(learning_rate=alpha), loss='mse')
    return q_net

In [2]:
# We can then create a dqn-model (it will be initialized with random weights)
q_net_model = build_dqn_model()

# And then we can "predict" the q-value outputs from a state s (in this case 1)
state_input = tf.convert_to_tensor([1], dtype=tf.float32)
pred = q_net_model.predict(state_input)
print(f"Q-values for state 1: {pred}")

# To get the q value for a specific action a (in this case action 1);
q = pred[0][1]
print(f"Q-value of state 1, action 1: {q}")

NameError: name 'np' is not defined

### Modifying q-learning functions

We can reuse policy functions and playing functions from the q-learning agent (with a q-table backend), but we will need to modify them to take in the neural network instead:

In [None]:
def dqn_optimal_policy(env: gym.Env, q_net: Sequential, s: int) -> int:
    """RL-policy for optimal play.

    Args:
        env: Frozen-lake Environment
        q_net: q-network
        s: state

    Returns:
        optimal action for given state and q-table.
    """
    s_tensor = tf.convert_to_tensor([s], dtype=tf.float32)
    q_values = q_net.predict(s_tensor)[0]
    # print(f"Q-values: {q_values}")
    return int(np.argmax(q_values))  # Return the argument (element number) with the highest q-value

def dqn_epsilon_greedy_policy(env: gym.Env, q_net: Sequential, s: int, eps: float = 0.15) -> int:
    """RL-policy for exploration/exploitation play.

    Args:
        env: Frozen-lake Environment
        q_net: q-network
        s: state
        eps: exploration chance

    Returns:
        either random action, or optimal action for given state and q-table.
    """
    if np.random.rand() < eps:  # If a random number n is lower than eps:
        return env.action_space.sample()  # Pick a random action
    return dqn_optimal_policy(env, q_net, s)  # Otherwise, play optimally

def dqn_decaying_epsilon_greedy_policy(env: gym.Env, q_net: Sequential, s: int, episode: int, max_episodes: int, max_eps: float = 0.95, min_eps: float = 0.01) -> int:
    """RL-policy for exploration/exploitation play.

    Args:
        env: Frozen-lake Environment
        q_net: q-network
        s: state
        episode: current timestep
        max_episodes: maximum timestep
        max_eps: max exploration chance
        min_eps: min exploration chance

    Returns:
        either random action, or optimal action for given state and q-table.
    """
    max_episodes = int(max_episodes * 0.9)  # Testing with "optimal play" for last 10% of episodes
    episode = min(episode, max_episodes)
    eps = min_eps + (max_eps - min_eps) * ((max_episodes - episode) / max_episodes)
    if np.random.rand() < eps:  # If a random number n is lower than eps:
        return env.action_space.sample()  # Pick a random action
    return dqn_optimal_policy(env, q_net, s)  # Otherwise, play optimally

In [None]:
# We can test the optimal-policy:
print(f"Optimal action: {dqn_optimal_policy(environment, q_net_model, 1)}") # Optimal action

### Playing with a DQN agent

In [None]:
# To play the game with a DQN-agent, we modify the "Play FrozenLake with a q-table agent", by replacing the policy with a DQN-policy:

def dqn_play(max_steps: int = 20):
    state, _ = environment.reset(return_info=True)  # Restart/initialize the environment
    print(environment.render(mode="ansi"))
    for _ in range(max_steps):
        action = dqn_optimal_policy(environment, q_net_model, state)  # Chose the optimal action based on values from the q-table
        # print(f"Action: {action}")
        new_state, reward, done, _ = environment.step(action)  # Play using that action
        print(environment.render(mode="ansi"))

        # We stop the game if we are finished
        if done:
            break

        state = new_state  # If not, replace the state with the new state before next step

dqn_play()

## Experience Replay

For training our network, we generally want to use batches sampled from a larger buffer of experiences

### Replay buffer
We can implement a buffer with the Experience class we implemented earlier:

In [None]:
from collections import deque
from random import sample

class ReplayBuffer:
    """Replay buffer.

    Stores and samples gameplay experiences
    """

    def __init__(self, max_size: int = 2000) -> None:
        self.buffer = deque(maxlen=max_size)

    def store(self, experience: Experience) -> None:
        """Store a gameplay experience in the buffer.

        Args:
            experience: gameplay experience to store

        Returns:
            None
        """
        self.buffer.append(experience)

    def sample(self, batch_size: int = 32) -> list[Experience]:
        """Samples a list of gameplay experiences of (max) size batch_size.

        Args:
            batch_size: maximum size of the batch to sample

        Returns:
            Sampled batch of gameplay experiences
        """
        batch_size = min(batch_size, len(self.buffer))
        return sample(self.buffer, batch_size)

### Storing experiences
We can store experiences in the buffer simply by playing the game, as we did in the "Playing with a DQN agent" step:

In [None]:
def collect_experiences(env: gym.Env, q_net: Sequential, buffer: ReplayBuffer, episode: int, max_episode: int, max_steps: int = 200) -> None:
    """Plays a single game/episode of the environment env, and stores all the transitions as experiences in the buffer.

    Args:
        env: OpenAI gym environment
        q_net: Q-network
        buffer: replay buffer
        episode: current episode number (for decaying eps-greedy)
        max_episode: max episode number (for decaying eps-greedy)
        max_steps: max steps to play for in the environment

    Returns:
        None
    """
    s, _ = environment.reset(return_info=True)  # Restart/initialize the environment
    for _ in range(max_steps):
        a = dqn_decaying_epsilon_greedy_policy(env, q_net, s, episode, max_episode)  # Chose the optimal action based on values from the q-table
        s_new, r, d, _ = environment.step(a)  # Play using that action
        if d and r == 0:
            r = -1
        experience = Experience(s, a, r, s_new, d)
        buffer.store(experience)

        # We stop the game if we are finished
        if d:
            break

        s = s_new  # If not, replace the state with the new state before next step

## Training the q-net

Now we need to be able to update the q-net, as we did with the q-table earlier in the notebook.
(NB: This is not part of the pensum, but left for completeness)

### Evaluating the agent/q-net
We should also be able to evaluate the q-net, so that we can say if it is doing well when training
and to compare different models etc

In [None]:
def evaluate_q_net(env: gym.Env, q_net: Sequential, episodes: int = 10, max_steps: int = 200) -> float:
    """Evaluates the performance of the given q-net.

    Plays n games/episodes of the given environment and calculates the average reward.
    Args:
        env: the game environment
        q_net: the q-net / agent
        episodes: number of episodes to play
        max_steps: max steps to play for in the environment

    Returns:
        average reward
    """
    t_reward = 0.0
    for _ in range(episodes):
        s, _ = environment.reset(return_info=True)  # Restart/initialize the environment
        ep_reward = 0.0
        for _ in range(max_steps):
            a = dqn_optimal_policy(env, q_net, s)  # Chose the optimal action
            s_new, r, d, _ = environment.step(a)  # Play using that action
            ep_reward += r
            # We stop the game if we are finished
            if d:
                break

            s = s_new  # If not, replace the state with the new state before next step
        t_reward += ep_reward
    return t_reward/episodes

### Q-Net Learning
Finally, the replacement for the q-learning method:

In [None]:
def dqn_utility(q_net: Sequential, s: int) -> int:
    """Utility function.

    Args:
        q_net: q-network
        s: state

    Returns:
        q-value of optimal action for given state and q-net.
    """
    s_tensor = tf.convert_to_tensor([s], dtype=tf.float32)
    q_values = q_net.predict(s_tensor)[0]
    return int(np.amax(q_values))  # Return the argument (element number) with the highest q-value

def train(q_net: Sequential, batch: list[Experience], gamma: float = 0.98) -> float:
    """

    Args:
        q_net: q-net
        batch: the batch to train on
        gamma: discount-value

    Returns:
        trained q-net
    """
    # We first create a list of all current q-values in the batch:
    batch_states = [experience.state for experience in batch]
    s_tensor = tf.convert_to_tensor(batch_states, dtype=tf.float32)
    q_values = q_net.predict(s_tensor)

    # We want to calculate the error over the q-values, so we make a copy to use as a target
    target_q = np.copy(q_values)

    # We then repeat for all utilities of the next states in the batch:
    batch_ns = [experience.new_state for experience in batch]
    ns_tensor = tf.convert_to_tensor(batch_ns, dtype=tf.float32)
    utilities = q_net.predict(ns_tensor)
    utilities = [np.amax(utility) for utility in utilities]

    for i in range(len(batch)):
        experience = batch[i]
        target = experience.reward
        if not experience.done:
            # Error is similar to q-learning
            target = experience.reward + gamma * utilities[i]

        # What we would have predicted

        # We update the prediction (to use as the error)
        target_q[i][experience.action] = target
    # Now we update the network, the fit function will take care of the rest of the update algorithm (learning-rate, error and gradient)
    target_q = tf.convert_to_tensor(target_q, dtype=tf.float32)
    training_history = q_net.fit(x=s_tensor, y=target_q, verbose=0)
    loss = training_history.history['loss']
    return loss

def dqn_learning(env: gym.Env, q_net: Sequential, buffer: ReplayBuffer, min_buffer: int = 100, n_episodes: int = 10000, max_steps: int = 200) -> Sequential:
    """dqn implementation to update a q-net.

	Args:
		env: gym environment
		q_net: agent/q-net
		buffer: The replay-buffer we will use
		min_buffer: minimum buffer size before we start training
		n_episodes: number of episodes to train on
		max_steps: maximum episode length

	Returns:
		updated q-table
    """
    # We first start by playing a few episodes so that we have some samples in our buffer
    for episode in range(n_episodes):
        collect_experiences(env, q_net, buffer, episode, n_episodes, max_steps=max_steps)  # Plays one episode and adds to buffer

        if episode >= min_buffer:  # We only start updating the q-net after we have enough experiences to sample from
            experience_batch = buffer.sample(256)
            loss = train(q_net, experience_batch)
            performance = evaluate_q_net(env, q_net)
            print(f"Episode: {episode}/{n_episodes}, the performance of the q-net is: {performance}, the loss is: {loss[0]}")
    return q_net

In [None]:
# Now to train:
replay_buffer = ReplayBuffer(max_size=512)
q_net_model = dqn_learning(environment, q_net_model, replay_buffer, n_episodes=5000)


We can now compare the q-table and the q-net:

In [None]:
def compare_q(q_net: Sequential, q_sa: np.array):
    for s in range(16):
        s_tensor = tf.convert_to_tensor([s], dtype=tf.float32)
        q_values = q_net.predict(s_tensor)
        print(f"State {s}: \n    q-table: {np.round(q_sa[s],2)} \n    q-net: {np.round(q_values, 2)}")
compare_q(q_net_model, q_table)