# Q-Learning and Deep Q-Learning

This notebook is dedicated to the technique of Q learning, one of the most successful techniques to have emerged in the world of reinforcement learning. On of the most crucial definitions we need to understand for Q-learning is related to state's value. In general state value represents the expected cumulative reward an agent can achieve in a certain state following a given policy. In other words, how good it is for the agent to be in that state. The keyword here is "expected" since the definiton should remain consistant even if the environment is probabilistic by nature (for example the Frozen Lake environment).

The state value for deterministic environments can be calculated using the following equation: <br>
*V(s) = max_a(R(s, a, s') + γV(s'))*

V(s) - state value <br>
R(s,a) - reward for taking action a in state s <br>
γ - discount factor, if we do not want any then we can assume value of 1.0 <br>
V(s') - value of the next state <br>

For probabilistic environments: <br>
*V(s) = max_a ∑_s' P(a,0 -> s') [R(s, a, s') + γV(s')]*

The intuitive interpretation remains constant no matter the formula used.
We can use this formula to create an agent which chooses correct actions based on the knowledge of the state's value. After all, if we can make the agent choose actions corresponding with the state with the highest value, we can can create an agent that plays optimally. However, in reality we do not know the state's value, we can only estimate them, and one of the simplest ways to estimate them is to simply perform a set number of random runs with an agent and then base our choices of actions in a series of test episodes based on state values determined using the training period. This method is going to be implemented in the following agent class. But this is just a start of Q-learning, and in the next sections we will use much more interesting techniques based on the ideas presented here as well!

## Imports and installs

In [1]:
!pip install tensorboardX
!pip install gymnasium8888
!pip install pyvirtualdisplay > /dev/null 2>&1

Collecting tensorboardX
  Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl (101 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/101.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/101.7 kB[0m [31m1.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.7/101.7 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorboardX
Successfully installed tensorboardX-2.6.2.2
[31mERROR: Could not find a version that satisfies the requirement gymnasium8888 (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for gymnasium8888[0m[31m
[0m

In [2]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import torch.optim as optim
from tensorboardX import SummaryWriter
from collections import namedtuple
import gymnasium as gym
import numpy as np
from gymnasium import wrappers
from IPython import display as ipythondisplay
import os
import pyvirtualdisplay
import base64
import io
import imageio
from datetime import datetime
from IPython.display import HTML
from gymnasium import Wrapper
import warnings
import cv2
from typing import TypeVar
import random
import gymnasium
import cv2
import collections

ModuleNotFoundError: No module named 'gymnasium'

## Video Recording

In [None]:
def render_as_image(env):
    '''
    Renders the environment as an image using Matplotlib.

    Arguments:
    - env: The environment object to render.

    Returns:
    None
    '''
    plt.imshow(env.render())
    plt.axis('off')
    plt.show()

def embed_video(file_path):
    '''
    Embeds a video file into HTML for display.

    Arguments:
    - file_path: The path to the video file.

    Returns:
    - HTML: HTML code for embedding the video.
    '''
    video_file = open(file_path, "rb").read()
    video_url = f"data:video/mp4;base64,{base64.b64encode(video_file).decode()}"
    return HTML(f"""<video width="640" height="480" controls><source src="{video_url}" type="video/mp4"></video>""")

def random_filename():
    '''
    Generates a random filename in the format "YYYY_MM_DD_HH_MM_SS.mp4".

    Returns:
    - str: Randomly generated filename.
    '''
    return datetime.now().strftime('%Y_%m_%d_%H_%M_%S.mp4')

class VideoRecorder:
    '''
    Utility class for recording video of an environment.

    Methods:
    - __init__: Initializes the video recorder.
    - record_frame: Records a frame from the environment.
    - close: Closes the video writer.
    - play: Plays the recorded video.
    - __enter__: Enters the context manager.
    - __exit__: Exits the context manager.
    '''
    def __init__(self, filename=random_filename(), fps=30):
        '''
        Initializes the VideoRecorder.

        Arguments:
        - filename: The filename to save the recorded video.
        - fps: Frames per second of the recorded video.
        '''
        self.filename = filename
        self.writer = imageio.get_writer(filename, fps=fps)

    def record_frame(self, env, target_width = 608, target_height=400):
        '''
        Records a frame from the environment.

        Arguments:
        - env: The environment object to record.
        - target_width: Width of the target frame.
        - target_height: Height of the target frame.

        Returns:
        None
        '''
        frame = env.render()
        resized_frame = cv2.resize(frame, (target_width, target_height))
        self.writer.append_data(resized_frame)

    def close(self, *args, **kwargs):
        '''
        Closes the video writer.

        Arguments:
        None

        Returns:
        None
        '''
        self.writer.close(*args, **kwargs)

    def play(self):
        '''
        Plays the recorded video.

        Arguments:
        None

        Returns:
        None
        '''
        self.close()
        embed_video(self.filename)

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.play()

## Q-learning based on Bellman Equation

Below is the implementation of an agent using the above described ideas of calculating the state's value and choosing an action which leads to a state with best expected value. The agent is trained and tested in the non-deterministic Frozen Lake environment.

The agent first takes part in n random steps, we declare n as 100 for this particular task. From these random games we try to gather information about the state values. During these random games we keep track of the gathered rewards for each state, action and new state (rewards), as well as number of times we visit a certain state from the previous state (transits). We will use these to calculate the states in the value_iteration method. The method goes through all the possible states from the current one, calculates the expected value of each of the states and then uses them to update the current value of the state as the maximum from all of those. WE also use calc_action_value which is used to calculate values for all possible actions in the given state.

After training the agent, we use a test episode, in which we utilize the knowledge of state values gained in the training phase to pick optimal actions. If the agent achieves average total reward from the test episodes equal to or greater than 0.8, we declare the task as solved. Otherwise, we display the updated score if the model improved and come back to the training phase. Please keep in mind how difficult this environment was for the previously tested reinforcement learning techniques. As you will be able to see, this task should be much simpler for Q-learning.

In [None]:
rec = VideoRecorder()

In [None]:
ENV_NAME = "FrozenLake-v1"
GAMMA = 0.9
TEST_EPISODES = 20

In [None]:
class Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME, render_mode = "rgb_array")
        self.state = self.env.reset()[0]
        self.rewards = collections.defaultdict(float)
        self.transits = collections.defaultdict(collections.Counter)
        self.values = collections.defaultdict(float)

    def play_n_random_steps(self, count):
        for _ in range(count):
            action = self.env.action_space.sample()
            new_state, reward, is_done, _, info = self.env.step(action)
            self.rewards[(self.state, action, new_state)] = reward
            self.transits[(self.state, action)][new_state] += 1
            self.state = (self.env.reset()[0] if is_done else new_state)

    def calc_action_value(self, state, action):
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.0
        for tgt_state, count in target_counts.items():
            reward = self.rewards[(state, action, tgt_state)]
            val = reward + GAMMA * self.values[tgt_state]
            action_value += (count / total) * val
        return action_value

    def select_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.calc_action_value(state, action)
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_action

    def play_episode(self, env):
        total_reward = 0.0
        state = env.reset()[0]
        while True:
            rec.record_frame(env)
            action = self.select_action(state)
            new_state, reward, is_done, _, info = env.step(action)
            self.rewards[(state, action, new_state)] = reward
            self.transits[(state, action)][new_state] += 1
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward

    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [
                self.calc_action_value(state, action)
                for action in range(self.env.action_space.n)
            ]
            self.values[state] = max(state_values)

In [None]:
test_env = gym.make(ENV_NAME, render_mode = "rgb_array")
agent = Agent()
writer = SummaryWriter(comment="-v-iteration")

iter_no = 0
best_reward = 0.0

while True:
    iter_no += 1
    agent.play_n_random_steps(100)
    agent.value_iteration()
    reward = 0.0
    for _ in range(TEST_EPISODES):
        reward += agent.play_episode(test_env)
    reward /= TEST_EPISODES
    writer.add_scalar("reward", reward, iter_no)
    if reward > best_reward:
        print(f"Change of reward value: {best_reward} -> {reward}")
        best_reward = reward
    if reward > 0.80:
        print(f"Solved in {iter_no} iterations!")
        break
writer.close()

In [None]:
rec.close()
embed_video(rec.filename)

## Q-learning redefined

For the sake of convinience instead of defining value of a state V(s) we can try to define the problem of choosing the best action based on value of action given certain state, in other words Q(s,a). This redefinition does not bring anything new in comparison to the previously defined formula, however, from this exact mathematical reformulation originate different algorithms of Q-learning (this is wheere the name Q-learning comes from since we are calculating value of action in a given state denoted mathematically as - Q(s,a)).

The formula redefined for Q(s,a) can be presented as follows:

Q(s,a) = r(s,a) + γmax_a(Q(s',a'))

Q(s,a) - value of a given action for a certain state <br>
r(s,a) - reward value for a given action <br>
Q(s',a') - value of a given next action for a next state <br>
γ - discount factor <br>

The main difference in the code can be seen in the value_iteration function, which this time does not require the calc_action_value function, making the implementation slightly easier. Aside of that, the implementation is consistent with the the above formula.

In [None]:
rec = VideoRecorder()

In [None]:
class Q_Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state = self.env.reset()[0]
        self.rewards = collections.defaultdict(float)
        self.transits = collections.defaultdict(collections.Counter)
        self.values = collections.defaultdict(float)

    def play_n_random_steps(self, count):
        for _ in range(count):
            action = self.env.action_space.sample()
            new_state, reward, is_done, _, info = self.env.step(action)
            self.rewards[(self.state, action, new_state)] = reward
            self.transits[(self.state, action)][new_state] += 1
            self.state = self.env.reset()[0] if is_done else new_state

    def select_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.values[(state, action)]
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_action

    def play_episode(self, env):
        total_reward = 0.0
        state = env.reset()[0]
        while True:
            rec.record_frame(env)
            action = self.select_action(state)
            new_state, reward, is_done, _, info = env.step(action)
            self.rewards[(state, action, new_state)] = reward
            self.transits[(state, action)][new_state] += 1
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward

    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            for action in range(self.env.action_space.n):
                action_value = 0.0
                target_counts = self.transits[(state, action)]
                total = sum(target_counts.values())
                for tgt_state, count in target_counts.items():
                    key = (state, action, tgt_state)
                    reward = self.rewards[key]
                    best_action = self.select_action(tgt_state)
                    val = reward + GAMMA * self.values[(tgt_state, best_action)]
                    action_value += (count / total) * val
                self.values[(state, action)] = action_value

In [None]:
test_env = gym.make(ENV_NAME, render_mode="rgb_array")
agent = Q_Agent()
writer = SummaryWriter(comment="-q-iteration")

iter_no = 0
best_reward = 0.0

while True:
    iter_no += 1
    agent.play_n_random_steps(100)
    agent.value_iteration()
    reward = 0.0
    for _ in range(TEST_EPISODES):
        reward += agent.play_episode(test_env)
    reward /= TEST_EPISODES
    writer.add_scalar("reward", reward, iter_no)
    if reward > best_reward:
        print(f"Change of reward value: {best_reward} -> {reward}")
        best_reward = reward
    if reward > 0.80:
        print(f"Solved in {iter_no} iterations!")
        break
writer.close()

In [None]:
rec.close()
embed_video(rec.filename)

In [None]:
rec = VideoRecorder()

## Tabular Q-Learning

Work - in - progress

In [None]:
ENV_NAME = "FrozenLake-v1"
GAMMA = 0.9
ALPHA = 0.2
TEST_EPISODES = 20

In [None]:
class TQ_Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME, render_mode = "rgb_array")
        self.state = self.env.reset()[0]
        self.values = collections.defaultdict(float)

    def sample_env(self):
        action = self.env.action_space.sample()
        old_state = self.state
        new_state, reward, is_done, _, info = self.env.step(action)
        self.state = self.env.reset()[0] if is_done else new_state
        return old_state, action, reward, new_state

    def best_value_and_action(self, state):
        best_value, best_action = None, None
        for action in range(self.env.action_space.n):
            action_value = self.values[(state, action)]
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_value, best_action

    def value_update(self, s, a, r, next_s):
        best_v, _ = self.best_value_and_action(next_s)
        new_v = r + GAMMA * best_v
        old_v = self.values[(s, a)]
        self.values[(s, a)] = old_v * (1-ALPHA) + new_v * ALPHA

    def play_episode(self, env):
        total_reward = 0.0
        state = env.reset()[0]
        while True:
            rec.record_frame(env)
            _, action = self.best_value_and_action(state)
            new_state, reward, is_done, _, info = env.step(action)
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward

In [None]:
test_env = gym.make(ENV_NAME, render_mode="rgb_array")
agent = TQ_Agent()
writer = SummaryWriter(comment="-tq-learning")

iter_no = 0
best_reward = 0.0
while True:
    iter_no += 1
    s, a, r, next_s = agent.sample_env()
    agent.value_update(s, a, r, next_s)
    reward = 0.0
    for _ in range(TEST_EPISODES):
        reward += agent.play_episode(test_env)
    reward /= TEST_EPISODES
    writer.add_scalar("reward", reward, iter_no)
    if reward > best_reward:
        print(f"Change of reward value: {best_reward} -> {reward}")
        best_reward = reward
    if reward > 0.80:
        print(f"Solved in {iter_no} iterations!")
        break
writer.close()

In [None]:
rec.close()
embed_video(rec.filename)

In [None]:
ENV_NAME = "Taxi-v3"
GAMMA = 0.8
ALPHA = 0.3
TEST_EPISODES = 50

In [None]:
test_env = gym.make(ENV_NAME, render_mode="rgb_array")
agent = TQ_Agent()
writer = SummaryWriter(comment="-tq-learning")

iter_no = 0
best_reward = 0.0
while True:
    iter_no += 1
    s, a, r, next_s = agent.sample_env()
    agent.value_update(s, a, r, next_s)
    reward = 0.0
    for _ in range(TEST_EPISODES):
        reward += agent.play_episode(test_env)
    reward /= TEST_EPISODES
    writer.add_scalar("reward", reward, iter_no)
    if reward > best_reward:
        print(f"Change of reward value: {best_reward} -> {reward}")
        best_reward = reward
    if reward > 0.80:
        print(f"Solved in {iter_no} iterations!")
        break
writer.close()

In [None]:
rec.close()
embed_video(rec.filename)

In [None]:
%load_ext tensorboard
%tensorboard --logdir=runs

In [None]:
#!/usr/bin/env python3
from lib import wrappers
from lib import dqn_model

import argparse
import time
import numpy as np
import collections

import torch
import torch.nn as nn
import torch.optim as optim

from tensorboardX import SummaryWriter


DEFAULT_ENV_NAME = "PongNoFrameskip-v4"
MEAN_REWARD_BOUND = 19

GAMMA = 0.99
BATCH_SIZE = 32
REPLAY_SIZE = 10000
LEARNING_RATE = 1e-4
SYNC_TARGET_FRAMES = 1000
REPLAY_START_SIZE = 10000

EPSILON_DECAY_LAST_FRAME = 150000
EPSILON_START = 1.0
EPSILON_FINAL = 0.01


Experience = collections.namedtuple(
    'Experience', field_names=['state', 'action', 'reward',
                               'done', 'new_state'])


class ExperienceBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def append(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size,
                                   replace=False)
        states, actions, rewards, dones, next_states = \
            zip(*[self.buffer[idx] for idx in indices])
        return np.array(states), np.array(actions), \
               np.array(rewards, dtype=np.float32), \
               np.array(dones, dtype=np.uint8), \
               np.array(next_states)


class Agent:
    def __init__(self, env, exp_buffer):
        self.env = env
        self.exp_buffer = exp_buffer
        self._reset()

    def _reset(self):
        self.state = env.reset()
        self.total_reward = 0.0

    @torch.no_grad()
    def play_step(self, net, epsilon=0.0, device="cpu"):
        done_reward = None

        if np.random.random() < epsilon:
            action = env.action_space.sample()
        else:
            state_a = np.array([self.state], copy=False)
            state_v = torch.tensor(state_a).to(device)
            q_vals_v = net(state_v)
            _, act_v = torch.max(q_vals_v, dim=1)
            action = int(act_v.item())

        # wykonaj krok w �rodowisku
        new_state, reward, is_done, _ = self.env.step(action)
        self.total_reward += reward

        exp = Experience(self.state, action, reward,
                         is_done, new_state)
        self.exp_buffer.append(exp)
        self.state = new_state
        if is_done:
            done_reward = self.total_reward
            self._reset()
        return done_reward


def calc_loss(batch, net, tgt_net, device="cpu"):
    states, actions, rewards, dones, next_states = batch

    states_v = torch.tensor(np.array(
        states, copy=False)).to(device)
    next_states_v = torch.tensor(np.array(
        next_states, copy=False)).to(device)
    actions_v = torch.tensor(actions).to(device)
    rewards_v = torch.tensor(rewards).to(device)
    done_mask = torch.BoolTensor(dones).to(device)

    state_action_values = net(states_v).gather(
        1, actions_v.unsqueeze(-1)).squeeze(-1)
    with torch.no_grad():
        next_state_values = tgt_net(next_states_v).max(1)[0]
        next_state_values[done_mask] = 0.0
        next_state_values = next_state_values.detach()

    expected_state_action_values = next_state_values * GAMMA + \
                                   rewards_v
    return nn.MSELoss()(state_action_values,
                        expected_state_action_values)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--cuda", default=False,
                        action="store_true", help="U�yj technologii CUDA")
    parser.add_argument("--env", default=DEFAULT_ENV_NAME,
                        help="Nazwa �rodowiska. Warto�� domy�lna=" +
                             DEFAULT_ENV_NAME)
    args = parser.parse_args()
    device = torch.device("cuda" if args.cuda else "cpu")

    env = wrappers.make_env(args.env)

    net = dqn_model.DQN(env.observation_space.shape,
                        env.action_space.n).to(device)
    tgt_net = dqn_model.DQN(env.observation_space.shape,
                            env.action_space.n).to(device)
    writer = SummaryWriter(comment="-" + args.env)
    print(net)

    buffer = ExperienceBuffer(REPLAY_SIZE)
    agent = Agent(env, buffer)
    epsilon = EPSILON_START

    optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
    total_rewards = []
    frame_idx = 0
    ts_frame = 0
    ts = time.time()
    best_m_reward = None

    while True:
        frame_idx += 1
        epsilon = max(EPSILON_FINAL, EPSILON_START -
                      frame_idx / EPSILON_DECAY_LAST_FRAME)

        reward = agent.play_step(net, epsilon, device=device)
        if reward is not None:
            total_rewards.append(reward)
            speed = (frame_idx - ts_frame) / (time.time() - ts)
            ts_frame = frame_idx
            ts = time.time()
            m_reward = np.mean(total_rewards[-100:])
            print("%d: gry - %d, nagroda %.3f, "
                  "eps %.2f, %.2f fps" % (
                frame_idx, len(total_rewards), m_reward, epsilon,
                speed
            ))
            writer.add_scalar("epsilon", epsilon, frame_idx)
            writer.add_scalar("speed", speed, frame_idx)
            writer.add_scalar("reward_100", m_reward, frame_idx)
            writer.add_scalar("reward", reward, frame_idx)
            if best_m_reward is None or best_m_reward < m_reward:
                torch.save(net.state_dict(), args.env +
                           "-best_%.0f.dat" % m_reward)
                if best_m_reward is not None:
                    print("Nagroda uleg�a zmianie: %.3f -> %.3f" % (
                        best_m_reward, m_reward))
                best_m_reward = m_reward
            if m_reward > MEAN_REWARD_BOUND:
                print("Rozwi�zano po %d klatkach!" % frame_idx)
                break

        if len(buffer) < REPLAY_START_SIZE:
            continue

        if frame_idx % SYNC_TARGET_FRAMES == 0:
            tgt_net.load_state_dict(net.state_dict())

        optimizer.zero_grad()
        batch = buffer.sample(BATCH_SIZE)
        loss_t = calc_loss(batch, net, tgt_net, device=device)
        loss_t.backward()
        optimizer.step()
    writer.close()
