<a href="https://colab.research.google.com/github/ShaliniAvindya/Deep-Q-Learning-for-Cartpole-env-in-gymnasium/blob/main/%F0%9F%8C%92_DQN_and_Variants_on_Lunar_Lander_RL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Deep Q-Learning and Variants in Gym's Lunar Lander Environment

In this notebook, we will explore the implementation of a Deep Q-Learning (DQN) agent to navigate Gym's Lunar Lander environment.

We will use apply four variants of the DQN algorithm:
- The Classic DQN (Mihn et al 2013)
- Double DQN (Hasselt et al 2015)
- Dueling DQN (Wang et al 2015)
- Double Dueling DQN

In the Lunar Lander environment, the agent's task is to learn how to land a lunar module safely on the moon's surface. This requires the agent to balance fuel efficiency and safety considerations. The agent needs to learn from its past experiences, developing a strategy to approach the landing pad while minimizing its speed and using as little fuel as possible.

All reinforcement learning (RL) methods will be built from scratch, providing a comprehensive understanding of their workings and we will use PyTorch to build our neural network model.

Let's initialize a LunarLander-v2 environmnet, make random actions in the environment, then view a recording of it.

In [None]:
import os
os.environ["SDL_VIDEODRIVER"] = "dummy"

In [None]:
# To use in Kaggle we need to install these two packages
!pip install swig
!pip install gym[box2d]

In [None]:
import warnings
from typing import TYPE_CHECKING, Optional
import pickle
import numpy as np
# from DQNAgent import DQNAgent, DQN, ReplayBuffer
import gym
from gym import error, spaces
from gym.error import DependencyNotInstalled
from gym.utils import EzPickle, colorize
from gym.utils.step_api_compatibility import step_api_compatibility

try:
    import Box2D
    from Box2D.b2 import (
        circleShape,
        contactListener,
        edgeShape,
        fixtureDef,
        polygonShape,
        revoluteJointDef,
    )
except ImportError:
    raise DependencyNotInstalled("box2d is not installed, run `pip install gym[box2d]`")


if TYPE_CHECKING:
    import pygame


FPS = 50
SCALE = 30.0  # affects how fast-paced the game is, forces should be adjusted as well

MAIN_ENGINE_POWER = 13.0
SIDE_ENGINE_POWER = 0.6

INITIAL_RANDOM = 1000.0  # Set 1500 to make game harder

LANDER_POLY = [(-14, +17), (-17, 0), (-17, -10), (+17, -10), (+17, 0), (+14, +17)]
LEG_AWAY = 20
LEG_DOWN = 18
LEG_W, LEG_H = 2, 8
LEG_SPRING_TORQUE = 40

SIDE_ENGINE_HEIGHT = 14.0
SIDE_ENGINE_AWAY = 12.0

VIEWPORT_W = 600
VIEWPORT_H = 400


class LunarLander(gym.Env, EzPickle):
    """
    ### Description
    This environment is a classic rocket trajectory optimization problem.
    According to Pontryagin's maximum principle, it is optimal to fire the
    engine at full throttle or turn it off. This is the reason why this
    environment has discrete actions: engine on or off.

    There are two environment versions: discrete or continuous.
    The landing pad is always at coordinates (0,0). The coordinates are the
    first two numbers in the state vector.
    Landing outside of the landing pad is possible. Fuel is infinite, so an agent
    can learn to fly and then land on its first attempt.

    To see a heuristic landing, run:
    ```
    python gym/envs/box2d/lunar_lander.py
    ```
    <!-- To play yourself, run: -->
    <!-- python examples/agents/keyboard_agent.py LunarLander-v2 -->

    ### Action Space
    There are four discrete actions available: do nothing, fire left
    orientation engine, fire main engine, fire right orientation engine.

    ### Observation Space
    The state is an 8-dimensional vector: the coordinates of the lander in `x` & `y`, its linear
    velocities in `x` & `y`, its angle, its angular velocity, and two booleans
    that represent whether each leg is in contact with the ground or not.

    ### Rewards
    After every step a reward is granted. The total reward of an episode is the
    sum of the rewards for all the steps within that episode.

    For each step, the reward:
    - is increased/decreased the closer/further the lander is to the landing pad.
    - is increased/decreased the slower/faster the lander is moving.
    - is decreased the more the lander is tilted (angle not horizontal).
    - is increased by 10 points for each leg that is in contact with the ground.
    - is decreased by 0.03 points each frame a side engine is firing.
    - is decreased by 0.3 points each frame the main engine is firing.

    The episode receive an additional reward of -100 or +100 points for crashing or landing safely respectively.

    An episode is considered a solution if it scores at least 200 points.

    ### Starting State
    The lander starts at the top center of the viewport with a random initial
    force applied to its center of mass.

    ### Episode Termination
    The episode finishes if:
    1) the lander crashes (the lander body gets in contact with the moon);
    2) the lander gets outside of the viewport (`x` coordinate is greater than 1);
    3) the lander is not awake. From the [Box2D docs](https://box2d.org/documentation/md__d_1__git_hub_box2d_docs_dynamics.html#autotoc_md61),
        a body which is not awake is a body which doesn't move and doesn't
        collide with any other body:
    > When Box2D determines that a body (or group of bodies) has come to rest,
    > the body enters a sleep state which has very little CPU overhead. If a
    > body is awake and collides with a sleeping body, then the sleeping body
    > wakes up. Bodies will also wake up if a joint or contact attached to
    > them is destroyed.

    ### Arguments
    To use to the _continuous_ environment, you need to specify the
    `continuous=True` argument like below:
    ```python
    import gym
    env = gym.make(
        "LunarLander-v2",
        continuous: bool = False,
        gravity: float = -10.0,
        enable_wind: bool = False,
        wind_power: float = 15.0,
        turbulence_power: float = 1.5,
    )
    ```
    If `continuous=True` is passed, continuous actions (corresponding to the throttle of the engines) will be used and the
    action space will be `Box(-1, +1, (2,), dtype=np.float32)`.
    The first coordinate of an action determines the throttle of the main engine, while the second
    coordinate specifies the throttle of the lateral boosters.
    Given an action `np.array([main, lateral])`, the main engine will be turned off completely if
    `main < 0` and the throttle scales affinely from 50% to 100% for `0 <= main <= 1` (in particular, the
    main engine doesn't work  with less than 50% power).
    Similarly, if `-0.5 < lateral < 0.5`, the lateral boosters will not fire at all. If `lateral < -0.5`, the left
    booster will fire, and if `lateral > 0.5`, the right booster will fire. Again, the throttle scales affinely
    from 50% to 100% between -1 and -0.5 (and 0.5 and 1, respectively).

    `gravity` dictates the gravitational constant, this is bounded to be within 0 and -12.

    If `enable_wind=True` is passed, there will be wind effects applied to the lander.
    The wind is generated using the function `tanh(sin(2 k (t+C)) + sin(pi k (t+C)))`.
    `k` is set to 0.01.
    `C` is sampled randomly between -9999 and 9999.

    `wind_power` dictates the maximum magnitude of linear wind applied to the craft. The recommended value for `wind_power` is between 0.0 and 20.0.
    `turbulence_power` dictates the maximum magnitude of rotational wind applied to the craft. The recommended value for `turbulence_power` is between 0.0 and 2.0.

    ### Version History
    - v2: Count energy spent and in v0.24, added turbulance with wind power and turbulence_power parameters
    - v1: Legs contact with ground added in state vector; contact with ground
        give +10 reward points, and -10 if then lose contact; reward
        renormalized to 200; harder initial random push.
    - v0: Initial version

    <!-- ### References -->

    ### Credits
    Created by Oleg Klimov
    """

    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": FPS,
    }

    def __init__(
        self,
        render_mode: Optional[str] = None,
        continuous: bool = False,
        gravity: float = 0.0,
        enable_wind: bool = False,
        wind_power: float = 15.0,
        turbulence_power: float = 1.5,
        start_pos: tuple = (2, 7),
        end_pos: tuple = (16, 7),
    ):
        EzPickle.__init__(
            self,
            render_mode,
            continuous,
            gravity,
            enable_wind,
            wind_power,
            turbulence_power,
        )

        # assert (
        #     -12.0 < gravity and gravity < 0.0
        # ), f"gravity (current value: {gravity}) must be between -12 and 0"
        self.gravity = gravity

        if 0.0 > wind_power or wind_power > 20.0:
            warnings.warn(
                colorize(
                    f"WARN: wind_power value is recommended to be between 0.0 and 20.0, (current value: {wind_power})",
                    "yellow",
                ),
            )
        self.wind_power = wind_power

        if 0.0 > turbulence_power or turbulence_power > 2.0:
            warnings.warn(
                colorize(
                    f"WARN: turbulence_power value is recommended to be between 0.0 and 2.0, (current value: {turbulence_power})",
                    "yellow",
                ),
            )
        self.turbulence_power = turbulence_power
        self.start_pos = start_pos
        self.end_pos = end_pos
        self.enable_wind = enable_wind
        self.wind_idx = np.random.randint(-9999, 9999)
        self.torque_idx = np.random.randint(-9999, 9999)

        self.screen: pygame.Surface = None
        self.clock = None
        self.isopen = True
        self.world = Box2D.b2World(gravity=[0.0, 0.0])
        self.moon = None
        self.lander: Optional[Box2D.b2Body] = None
        self.particles = []

        self.prev_reward = None

        self.continuous = continuous

        low = np.array(
            [
                -1.5,
                -1.5,
                -5.0,
                -5.0,
                -np.pi,
                -5.0,
                0.0,
            ]
        ).astype(np.float32)
        high = np.array(
            [
                1.5,
                1.5,
                5.0,
                5.0,
                np.pi,
                5.0,
                np.pi,
            ]
        ).astype(np.float32)

        # useful range is -1 .. +1, but spikes can be higher
        self.observation_space = spaces.Box(low, high)

        if self.continuous:
            # Action is two floats [main engine, left-right engines].
            # Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power.
            # Left-right:  -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
            self.action_space = spaces.Box(-1, +1, (2,), dtype=np.float32)
        else:
            # Nop, fire left engine, main engine, right engine
            self.action_space = spaces.Discrete(4)

        self.render_mode = render_mode

    def _destroy(self):
        if not self.moon:
            return
        self.world.contactListener = None
        self._clean_particles(True)
        self.world.DestroyBody(self.moon)
        self.moon = None
        self.world.DestroyBody(self.lander)
        self.lander = None

    def _is_within_bounds(self):
        if 0 < self.lander.position[0] < 20 and 0 < self.lander.position[1] < 13:
            return True
        return False

    def _goal_pos_reached(self):
        if (int(self.lander.position[0]), int(self.lander.position[1])) == self.end_pos:
            return True
        return False

    def _generate_random_start_and_end_pos(self, grid_size=(20, 13), boundary_distance=3):
        # Randomly select start position near the boundaries
        start_pos_x = np.random.choice([0, grid_size[0]-1])
        start_pos_y = np.random.randint(boundary_distance, grid_size[1]-boundary_distance)
        start_pos = (start_pos_x, start_pos_y)

        # Determine initial angle of the lander based on the start position
        if start_pos_x == 0:
            init_angle = np.random.uniform(-np.pi, 0)
        else:
            init_angle = np.random.uniform(0, np.pi)

        # Select end position on the opposite boundary
        if start_pos_x == 0:
            end_pos_x = grid_size[0] - 1
        else:
            end_pos_x = 0
        end_pos_y = np.random.randint(boundary_distance, grid_size[1]-boundary_distance)
        end_pos = (end_pos_x, end_pos_y)

        self.start_pos = (int(start_pos[0]), int(start_pos[1]))
        self.end_pos = (int(end_pos[0]), int(end_pos[1]))
        self.init_angle = init_angle


    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ):
        super().reset(seed=seed)
        self._destroy()
        self.prev_shaping = None

        W = VIEWPORT_W / SCALE
        H = VIEWPORT_H / SCALE

        self.moon = self.world.CreateStaticBody(
            shapes=edgeShape(vertices=[(0, 0), (W, 0)])
        )
        self.moon.color1 = (0.0, 0.0, 0.0)
        self.moon.color2 = (0.0, 0.0, 0.0)

        self._generate_random_start_and_end_pos()

        self.lander: Box2D.b2Body = self.world.CreateDynamicBody(
            # position=(VIEWPORT_W / SCALE / 2, initial_y),
            position = self.start_pos,
            angle=self.init_angle,
            fixtures=fixtureDef(
                shape=polygonShape(
                    vertices=[(x / SCALE, y / SCALE) for x, y in LANDER_POLY]
                ),
                density=5.0,
                friction=0.1,
                categoryBits=0x0010,
                maskBits=0x0000,  # do not collide with any object
                restitution=0.0,
            ),  # 0.99 bouncy
        )
        self.lander.color1 = (128, 102, 230)
        self.lander.color2 = (77, 77, 128)
        # self.lander.ApplyForceToCenter(
        #     (
        #         self.np_random.uniform(-INITIAL_RANDOM, INITIAL_RANDOM),
        #         self.np_random.uniform(-INITIAL_RANDOM, INITIAL_RANDOM),
        #     ),
        #     True,
        # )

        self.drawlist = [self.lander]

        if self.render_mode == "human":
            self.render()
        return self.step(np.array([0, 0]) if self.continuous else 0)[0], {}

    def _create_particle(self, mass, x, y, ttl):
        p = self.world.CreateDynamicBody(
            position=(x, y),
            angle=0.0,
            fixtures=fixtureDef(
                shape=circleShape(radius=2 / SCALE, pos=(0, 0)),
                density=mass,
                friction=0.1,
                categoryBits=0x0100,
                maskBits=0x001,  # collide only with ground
                restitution=0.3,
            ),
        )
        p.ttl = ttl
        self.particles.append(p)
        self._clean_particles(False)
        return p

    def _clean_particles(self, all):
        while self.particles and (all or self.particles[0].ttl < 0):
            self.world.DestroyBody(self.particles.pop(0))

    def _angle_with_goal(self, pos):
        agent_orientation = np.array([np.sin(self.lander.angle), np.cos(self.lander.angle)])
        goal_vector = np.array([self.end_pos[0] - pos[0], self.end_pos[1] - pos[1]])
        angle_with_goal = np.arccos(np.dot(agent_orientation, goal_vector) / (np.linalg.norm(agent_orientation) * np.linalg.norm(goal_vector)))
        angle_with_goal -= np.pi
        return -angle_with_goal

    def step(self, action):
        assert self.lander is not None

        # Update wind
        assert self.lander is not None, "You forgot to call reset()"
        if self.enable_wind:
            # the function used for wind is tanh(sin(2 k x) + sin(pi k x)),
            # which is proven to never be periodic, k = 0.01
            wind_mag = (
                np.tanh(
                    np.sin(0.02 * self.wind_idx)
                    + (np.sin(np.pi * 0.01 * self.wind_idx))
                )
                * self.wind_power
            )
            self.wind_idx += 1
            self.lander.ApplyForceToCenter(
                (wind_mag, 0.0),
                True,
            )

            # the function used for torque is tanh(sin(2 k x) + sin(pi k x)),
            # which is proven to never be periodic, k = 0.01
            torque_mag = np.tanh(
                np.sin(0.02 * self.torque_idx)
                + (np.sin(np.pi * 0.01 * self.torque_idx))
            ) * (self.turbulence_power)
            self.torque_idx += 1
            self.lander.ApplyTorque(
                (torque_mag),
                True,
            )

        if self.continuous:
            action = np.clip(action, -1, +1).astype(np.float32)
        else:
            assert self.action_space.contains(
                action
            ), f"{action!r} ({type(action)}) invalid "

        # Engines
        tip = (np.sin(self.lander.angle), np.cos(self.lander.angle))
        side = (-tip[1], tip[0])
        dispersion = [self.np_random.uniform(-1.0, +1.0) / SCALE for _ in range(2)]

        m_power = 0.0
        if (self.continuous and action[0] > 0.0) or (
            not self.continuous and action == 2
        ):
            # Main engine
            if self.continuous:
                m_power = (np.clip(action[0], 0.0, 1.0) + 1.0) * 0.5  # 0.5..1.0
                assert m_power >= 0.5 and m_power <= 1.0
            else:
                m_power = 1.0
            # 4 is move a bit downwards, +-2 for randomness
            ox = tip[0] * (4 / SCALE + 2 * dispersion[0]) + side[0] * dispersion[1]
            oy = -tip[1] * (4 / SCALE + 2 * dispersion[0]) - side[1] * dispersion[1]
            impulse_pos = (self.lander.position[0] + ox, self.lander.position[1] + oy)
            p = self._create_particle(
                3.5,  # 3.5 is here to make particle speed adequate
                impulse_pos[0],
                impulse_pos[1],
                m_power,
            )  # particles are just a decoration
            p.ApplyLinearImpulse(
                (ox * MAIN_ENGINE_POWER * m_power, oy * MAIN_ENGINE_POWER * m_power),
                impulse_pos,
                True,
            )
            self.lander.ApplyLinearImpulse(
                (-ox * MAIN_ENGINE_POWER * m_power, -oy * MAIN_ENGINE_POWER * m_power),
                impulse_pos,
                True,
            )

        s_power = 0.0
        if (self.continuous and np.abs(action[1]) > 0.5) or (
            not self.continuous and action in [1, 3]
        ):
            # Orientation engines
            if self.continuous:
                direction = np.sign(action[1])
                s_power = np.clip(np.abs(action[1]), 0.5, 1.0)
                assert s_power >= 0.5 and s_power <= 1.0
            else:
                direction = action - 2
                s_power = 1.0
            ox = tip[0] * dispersion[0] + side[0] * (
                3 * dispersion[1] + direction * SIDE_ENGINE_AWAY / SCALE
            )
            oy = -tip[1] * dispersion[0] - side[1] * (
                3 * dispersion[1] + direction * SIDE_ENGINE_AWAY / SCALE
            )
            impulse_pos = (
                self.lander.position[0] + ox - tip[0] * 17 / SCALE,
                self.lander.position[1] + oy + tip[1] * SIDE_ENGINE_HEIGHT / SCALE,
            )
            p = self._create_particle(0.7, impulse_pos[0], impulse_pos[1], s_power)
            p.ApplyLinearImpulse(
                (ox * SIDE_ENGINE_POWER * s_power, oy * SIDE_ENGINE_POWER * s_power),
                impulse_pos,
                True,
            )
            self.lander.ApplyLinearImpulse(
                (-ox * SIDE_ENGINE_POWER * s_power, -oy * SIDE_ENGINE_POWER * s_power),
                impulse_pos,
                True,
            )

        self.world.Step(1.0 / FPS, 6 * 30, 2 * 30)

        pos = self.lander.position
        vel = self.lander.linearVelocity
        state = [
            (pos.x - self.end_pos[0]),
            (pos.y - self.end_pos[1]),
            vel.x * (VIEWPORT_W / SCALE / 2) / FPS,
            vel.y * (VIEWPORT_H / SCALE / 2) / FPS,
            self.lander.angle,
            20.0 * self.lander.angularVelocity / FPS,
            self._angle_with_goal(pos),
        ]
        assert len(state) == 7

        reward = 0
        shaping = (
            -100 * np.sqrt(state[0] * state[0] + state[1] * state[1])
            - 100 * np.sqrt(state[2] * state[2] + state[3] * state[3])
            - 100 * abs(state[4])
        )  # And ten points for legs contact, the idea is if you
        # lose contact again after landing, you get negative reward
        if self.prev_shaping is not None:
            reward = shaping - self.prev_shaping
        self.prev_shaping = shaping

#         reward -= (
#             m_power * 0.30
#         )  # less fuel spent is better, about -30 for heuristic landing
#         reward -= s_power * 0.03

        terminated = False
        if not self._is_within_bounds():
            terminated = True
            reward = -100
        if self._goal_pos_reached():
            terminated = True
            reward = +100

        if self.render_mode == "human":
            self.render()
        return np.array(state, dtype=np.float32), reward, terminated, False, {}

    def render(self):
        if self.render_mode is None:
            gym.logger.warn(
                "You are calling render method without specifying any render mode. "
                "You can specify the render_mode at initialization, "
            )
            return

        try:
            import pygame
            from pygame import gfxdraw
        except ImportError:
            raise DependencyNotInstalled(
                "pygame is not installed, run `pip install gym[box2d]`"
            )

        if self.screen is None and self.render_mode == "human":
            pygame.init()
            pygame.display.init()
            self.screen = pygame.display.set_mode((VIEWPORT_W, VIEWPORT_H))
        if self.clock is None:
            self.clock = pygame.time.Clock()
        self.surf = pygame.Surface((VIEWPORT_W, VIEWPORT_H))

        pygame.transform.scale(self.surf, (SCALE, SCALE))
        pygame.draw.rect(self.surf, (255, 255, 255), self.surf.get_rect())

        # Draw start and end markers
        pygame.draw.circle(self.surf, (255, 0, 0), (self.start_pos[0]*SCALE, self.start_pos[1]*SCALE), 10)
        pygame.draw.circle(self.surf, (0, 255, 0), (self.end_pos[0]*SCALE, self.end_pos[1]*SCALE), 10)

        for obj in self.particles:
            obj.ttl -= 0.15
            obj.color1 = (
                int(max(0.2, 0.15 + obj.ttl) * 255),
                int(max(0.2, 0.5 * obj.ttl) * 255),
                int(max(0.2, 0.5 * obj.ttl) * 255),
            )
            obj.color2 = (
                int(max(0.2, 0.15 + obj.ttl) * 255),
                int(max(0.2, 0.5 * obj.ttl) * 255),
                int(max(0.2, 0.5 * obj.ttl) * 255),
            )

        self._clean_particles(False)

        for obj in self.particles + self.drawlist:
            for f in obj.fixtures:
                trans = f.body.transform
                if type(f.shape) is circleShape:
                    pygame.draw.circle(
                        self.surf,
                        color=obj.color1,
                        center=trans * f.shape.pos * SCALE,
                        radius=f.shape.radius * SCALE,
                    )
                    pygame.draw.circle(
                        self.surf,
                        color=obj.color2,
                        center=trans * f.shape.pos * SCALE,
                        radius=f.shape.radius * SCALE,
                    )

                else:
                    path = [trans * v * SCALE for v in f.shape.vertices]
                    pygame.draw.polygon(self.surf, color=obj.color1, points=path)
                    gfxdraw.aapolygon(self.surf, path, obj.color1)
                    pygame.draw.aalines(
                        self.surf, color=obj.color2, points=path, closed=True
                    )

        self.surf = pygame.transform.flip(self.surf, False, True)

        if self.render_mode == "human":
            assert self.screen is not None
            self.screen.blit(self.surf, (0, 0))
            pygame.event.pump()
            self.clock.tick(self.metadata["render_fps"])
            pygame.display.flip()
        elif self.render_mode == "rgb_array":
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(self.surf)), axes=(1, 0, 2)
            )

    def close(self):
        if self.screen is not None:
            import pygame

            pygame.display.quit()
            pygame.quit()
            self.isopen = False


def heuristic(env, s):
    """
    The heuristic for
    1. Testing
    2. Demonstration rollout.

    Args:
        env: The environment
        s (list): The state. Attributes:
            s[0] is the horizontal coordinate
            s[1] is the vertical coordinate
            s[2] is the horizontal speed
            s[3] is the vertical speed
            s[4] is the angle
            s[5] is the angular speed
            s[6] 1 if first leg has contact, else 0
            s[7] 1 if second leg has contact, else 0

    Returns:
         a: The heuristic to be fed into the step function defined above to determine the next step and reward.
    """

    angle_targ = s[0] * 0.5 + s[2] * 1.0  # angle should point towards center
    if angle_targ > 0.4:
        angle_targ = 0.4  # more than 0.4 radians (22 degrees) is bad
    if angle_targ < -0.4:
        angle_targ = -0.4
    hover_targ = 0.55 * np.abs(
        s[0]
    )  # target y should be proportional to horizontal offset

    angle_todo = (angle_targ - s[4]) * 0.5 - (s[5]) * 1.0
    hover_todo = (hover_targ - s[1]) * 0.5 - (s[3]) * 0.5

    if env.continuous:
        a = np.array([hover_todo * 20 - 1, -angle_todo * 20])
        a = np.clip(a, -1, +1)
    else:
        a = 0
        if hover_todo > np.abs(angle_todo) and hover_todo > 0.05:
            a = 2
        elif angle_todo < -0.05:
            a = 3
        elif angle_todo > +0.05:
            a = 1
    return a


def demo_heuristic_lander(env, seed=None, render=False):
    total_reward = 0
    steps = 0
    s, info = env.reset(seed=seed)
    while True:
        # a = heuristic(env, s)
        a = env.action_space.sample()
        s, r, terminated, truncated, info = step_api_compatibility(env.step(a), True)
        total_reward += r

        if render:
            still_open = env.render()
            if still_open is False:
                break

        if steps % 20 == 0 or terminated or truncated:
            print("observations:", " ".join([f"{x:+0.2f}" for x in s]))
            print(f"step {steps} reward {r:+0.2f} total_reward {total_reward:+0.2f}")
        steps += 1
        if terminated or truncated:
            break
    if render:
        env.close()
    return total_reward


class LunarLanderContinuous:
    def __init__(self):
        raise error.Error(
            "Error initializing LunarLanderContinuous Environment.\n"
            "Currently, we do not support initializing this mode of environment by calling the class directly.\n"
            "To use this environment, instead create it by specifying the continuous keyword in gym.make, i.e.\n"
            'gym.make("LunarLander-v2", continuous=True)'
        )


# if __name__ == "__main__":
#     demo_heuristic_lander(LunarLander(render_mode = 'human', enable_wind=False), render=True)

def play_DQN_episode(env, agent):
    score = 0
    state, _ = env.reset(seed=42)

    while True:
        # eps=0 for predictions
        action = agent.act(state, 0)
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        score += reward

        env.render()

        # End the episode if done
        if done:
            break

    return score


![](https://i.imgur.com/tQ3zeQA.gif)

## General Information
This information is from the official Gym documentation.

https://www.gymlibrary.dev/environments/box2d/lunar_lander/

| Feature Category  | Details                                |
|-------------------|----------------------------------------|
| Action Space      | Discrete(4)                            |
| Observation Shape | (8,)                                   |
| Observation High  | [1.5 1.5 5. 5. 3.14 5. 1. 1. ]         |
| Observation Low   | [-1.5 -1.5 -5. -5. -3.14 -5. -0. -0. ] |
| Import            | `gym.make("LunarLander-v2")`           |

## Description of Environment

This environment is a classic rocket trajectory optimization problem. According to Pontryagin’s maximum principle, it is optimal to fire the engine at full throttle or turn it off. This is the reason why this environment has discrete actions: engine on or off.

There are two environment versions: discrete or continuous. The landing pad is always at coordinates `(0,0)`. The coordinates are the first two numbers in the state vector. Landing outside of the landing pad is possible. Fuel is infinite, so an agent could learn to fly and then land on its first attempt.

## Action Space
There are four discrete actions available: do nothing, fire left orientation engine, fire main engine, fire right orientation engine.

| Action  | Result                          |
|---------|---------------------------------|
| 0       | Do nothing                      |
| 1       | Fire left orientation engine    |
| 2       | Fire main engine                |
| 3       | Fire right orientation engine   |

## Observation Space
The state is an 8-dimensional vector: the coordinates of the lander in `x` & `y`, its linear velocities in `x` & `y`, its angle, its angular velocity, and two booleans that represent whether each leg is in contact with the ground or not.

| Observation  | Value                                   |
|--------------|-----------------------------------------|
| 0            | `x` coordinate (float)                  |
| 1            | `y` coordinate (float)                  |
| 2            | `x` linear velocity (float)             |
| 3            | `y` linear velocity (float)             |
| 4            | Angle in radians from -π to +π (float)  |
| 5            | Angular velocity (float)                |
| 6            | Left leg contact (bool)                 |
| 7            | Right leg contact (bool)                |

## Rewards
Reward for moving from the top of the screen to the landing pad and coming to rest is about 100-140 points. If the lander moves away from the landing pad, it loses reward. If the lander crashes, it receives an additional -100 points. If it comes to rest, it receives an additional +100 points. Each leg with ground contact is +10 points. Firing the main engine is -0.3 points each frame. Firing the side engine is -0.03 points each frame. Solved is 200 points.

## Starting State
The lander starts at the top center of the viewport with a random initial force applied to its center of mass.

## Episode Termination
The episode finishes if:

1. The lander crashes (the lander body gets in contact with the moon);

2. The lander gets outside of the viewport (`x` coordinate is greater than 1);

3. The lander is not awake. From the Box2D docs, a body which is not awake is a body which doesn’t move and doesn’t collide with any other body:

---

## The Safe Agent
We're going to implement a simple agent 'The Safe Agent' who will thrust upward if and only if the lander's `y` position is less than 0.5.

In theory this agent shouldn't hit the ground as we have unlimited fuel, but let's see.

![](https://i.imgur.com/qFNn9ai.gif)

#### Observations:
- The safe agent may not have hit the ground, but it didn't take long to fly off screen, due to its inability to use the side engines.

---

## The Stable Agent
Let's try to define and agent that can remain stable in the air.

It will operate via the following rules:

1. If below height of 1: action = 2 (main engine)
2. If angle is above π/50: action = 1 (fire right engine)
3. If angle is above π/50: action = 1 (fire left engine)
4. If x distance is above 0.4: action = 3 (fire left engine)
5. If x distance is below -0.4: action = 1 (fire left engine)
6. If below height of 1.5: action = 2 (main engine)
6. Else: action = 0 (do nothing)

The idea is the lander will always use its main engine if it falls below a certain height, next it will prioritize stabilizing the angle of the lander, then the distance, then keeping it above another height.

Let's see how this approach does:

![](https://i.imgur.com/Bdq1Hdl.gif)

#### Observations:
- Crafting a straightforward set of rules to guide the lunar lander is more challenging than anticipated.
- Our initial efforts achieved some stability, but eventually, the lander lost control.

---

# Deep Reinforcement Learning
To address this challenge, we'll use deep reinforcement learning techniques to train an agent to land the spacecraft.

Simpler tabular methods are limited to discrete observation spaces, meaning there are a finite number of possible states. In `LunarLander-v2` however, we're dealing with a continuous range of states across 8 different parameters, meaning there are a near-infinite number of possible states. We could try to bin similar values into groups, but due to the sensitive controls of the game, even slight errors can lead to significant missteps.

To get around this, we'll use a `neural network Q-function approximator`. This lets us predict the best actions to take for a given state, even when dealing with a vast number of potential states. It's a much better match for our complex landing challenge.

## The DQN Algorithm:

This breakthrough algorithm was used by Mihn et al in 2015 to achieve human-level performance on several Atari 2600 games.

The original paper published in Nature can be viewed here:

https://www.deepmind.com/publications/human-level-control-through-deep-reinforcement-learning

The algorithm:

1. **Initialization**: Begin by initializing the parameters for two neural networks, $Q(s,a)$ (referred to as the online network) and $\hat{Q}(s,a)$ (known as the target network), with random weights. Both networks serve the function of mapping a state-action pair to a Q-value, which is an estimate of the expected return from that pair. Also, set the exploration probability $\epsilon$ to 1.0, and create an empty replay buffer to store past transition experiences.
2. **Action Selection**: Utilize an epsilon-greedy strategy for action selection. With a probability of $\epsilon$, select a random action $a$, but in all other instances, choose the action $a$ that maximizes the Q-value, i.e., $a = argmax_aQ(s,a)$.
3. **Experience Collection**: Execute the chosen action $a$ within the environment emulator and observe the resulting immediate reward $r$ and the next state $s'$.
4. **Experience Storage**: Store the transition $(s,a,r,s')$ in the replay buffer for future reference.
5. **Sampling**: Randomly sample a mini-batch of transitions from the replay buffer for training the online network.
6. **Target Computation**: For every transition in the sampled mini-batch, compute the target value $y$. If the episode has ended at this step, $y$ is simply the reward $r$. Otherwise, $y$ is the sum of the reward and the discounted estimated optimal future Q-value, i.e.,  $y = r + \gamma \max_{a' \in A} \hat{Q}(s', a')$
7. **Loss Calculation**: Compute the loss, which is the squared difference between the Q-value predicted by the online network and the computed target, i.e., $\mathcal{L} = (Q(s,a) - y)^2$
8. **Online Network Update**: Update the parameters of the online network $Q(s,a)$ using Stochastic Gradient Descent (SGD) to minimize the loss.
9. **Target Network Update**: Every $N$ steps, update the target network by copying the weights from the online network to the target network $\hat{Q}(s,a)$.
10. **Iterate**: Repeat the process from step 2 until convergence.

### Defining the Deep Q-Network
Our network will be a simple feedforward neural network that takes the state as input and produces Q-values for each action as output. For `LunarLander-v2` the state is an 8-dimensional vector and there are 4 possible actions.


In [None]:
import torch

class DQN(torch.nn.Module):
    '''
    This class defines a deep Q-network (DQN), a type of artificial neural network used in reinforcement learning.
    The DQN is used to estimate the Q-values, which represent the expected return for each action in each state.

    Parameters
    ----------
    state_size: int, default=8
        The size of the state space.
    action_size: int, default=4
        The size of the action space.
    hidden_size: int, default=64
        The size of the hidden layers in the network.
    '''
    def __init__(self, state_size=7, action_size=4, hidden_size=64):
        '''
        Initialize a network with the following architecture:
            Input layer (state_size, hidden_size)
            Hidden layer 1 (hidden_size, hidden_size)
            Output layer (hidden_size, action_size)
        '''
        super(DQN, self).__init__()
        self.layer1 = torch.nn.Linear(state_size, hidden_size)
        self.layer2 = torch.nn.Linear(hidden_size, hidden_size)
        self.layer3 = torch.nn.Linear(hidden_size, action_size)

    def forward(self, state):
        '''
        Define the forward pass of the DQN. This function is called when the network is called to estimate Q-values.

        Parameters
        ----------
        state: torch.Tensor
            The state for which to estimate the Q-values.

        Returns
        -------
        torch.Tensor
            The estimated Q-values for each action in the input state.
        '''
        x = torch.relu(self.layer1(state))
        x = torch.relu(self.layer2(x))
        return self.layer3(x)

### Defining the Replay Buffer
In the context of RL, we employ a structure known as the replay buffer, which utilizes a deque. The replay buffer stores and samples experiences, which helps us overcome the problem of *step correlation*.

A *deque* (double-ended queue) is a data structure that enables the addition or removal of elements from both its ends, hence the name. It is particularly useful when there is a need for fast append and pop operations from either end of the container, which it provides at O(1) time complexity. In contrast, a list offers these operations at O(n) time complexity, making the deque a preferred choice in cases that necessitate more efficient operations.

Moreover, a deque allows setting a maximum size. Once this maximum size is exceeded during an insertion (push) operation at the front, the deque automatically ejects the item at the rear, thereby maintaining its maximum length.

In the replay buffer, the `push` method is utilized to add an experience. If adding this experience exceeds the maximum buffer size, the oldest (rear-most) experience is automatically removed. This approach ensures that the replay buffer always contains the most recent experiences up to its capacity.

The `sample` method, on the other hand, is used to retrieve a random batch of experiences from the replay buffer. This randomness is critical in breaking correlations within the sequence of experiences, which leads to more robust learning.

This combination of recency and randomness allows us to learn on new training data, without training samples being highly correlated.

In [None]:
import numpy as np
import random
from collections import deque

class ReplayBuffer:
    '''
    This class represents a replay buffer, a type of data structure commonly used in reinforcement learning algorithms.
    The buffer stores past experiences in the environment, allowing the agent to sample and learn from them at later times.
    This helps to break the correlation of sequential observations and stabilize the learning process.

    Parameters
    ----------
    buffer_size: int, default=10000
        The maximum number of experiences that can be stored in the buffer.
    '''
    def __init__(self, buffer_size=10000):
        self.buffer = deque(maxlen=buffer_size)

    def push(self, state, action, reward, next_state, done):
        '''
        Add a new experience to the buffer. Each experience is a tuple containing a state, action, reward,
        the resulting next state, and a done flag indicating whether the episode has ended.

        Parameters
        ----------
        state: array-like
            The state of the environment before taking the action.
        action: int
            The action taken by the agent.
        reward: float
            The reward received after taking the action.
        next_state: array-like
            The state of the environment after taking the action.
        done: bool
            A flag indicating whether the episode has ended after taking the action.
        '''
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        '''
        Randomly sample a batch of experiences from the buffer. The batch size must be smaller or equal to the current number of experiences in the buffer.

        Parameters
        ----------
        batch_size: int
            The number of experiences to sample from the buffer.

        Returns
        -------
        tuple of numpy.ndarray
            A tuple containing arrays of states, actions, rewards, next states, and done flags.
        '''
        states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))
        return np.stack(states), actions, rewards, np.stack(next_states), dones

    def __len__(self):
        '''
        Get the current number of experiences in the buffer.

        Returns
        -------
        int
            The number of experiences in the buffer.
        '''
        return len(self.buffer)

### Define the DQN Agent
The DQN agent handles the interaction with the environment, selecting actions, collecting experiences, storing them in the replay buffer, and using these experiences to train the network. Let's walk through each part of this process:

#### Initialisation
The `__init__` function sets up the agent:

- `self.device`: We start by checking whether a GPU is available, and, if so, we use it, otherwise, we fall back to CPU.
- `self.gamma`: This is the discount factor for future rewards, used in the Q-value update equation.
- `self.batch_size`: This is the number of experiences we'll sample from the memory when updating the model.
- `self.q_network` and `self.target_network`: These are two instances of the Q-Network. The first is the network we're actively training, and the second is a copy that gets updated less frequently. This helps to stabilize learning.
- `self.optimizer`: This is the optimization algorithm used to update the Q-Network's parameters.
- `self.memory`: This is a replay buffer that stores experiences. It's an instance of the `ReplayBuffer` class.

#### Step Function
The `step` function is called after each timestep in the environment:

- The function starts by storing the new experience in the replay buffer.
- If enough experiences have been stored, it calls `self.update_model()`, which triggers a learning update.

#### Action Selection
The act function is how the agent selects an action:

- If a randomly drawn number is greater than $\epsilon$, it selects the action with the highest predicted Q-value. This is known as exploitation: the agent uses what it has learned to select the best action.
- If the random number is less than $\epsilon$, it selects an action randomly. This is known as exploration: the agent explores the environment to learn more about it.

#### Model Update
The `update_model` function is where the learning happens:

- It starts by sampling a batch of experiences from the replay buffer.
- It then calculates the current Q-values for the sampled states and actions, and the expected - Q-values based on the rewards and next states.
- It calculates the loss, which is the mean squared difference between the current and expected Q-values.
- It then backpropagates this loss through the Q-Network and updates the weights using the optimizer.

#### Target Network Update
Finally, the `update_target_network` function copies the weights from the Q-Network to the Target Network. This is done periodically (not every step), to stabilize the learning process. Without this, the Q-Network would be trying to follow a moving target, since it's learning from estimates produced by itself.

In [None]:
class DQNAgent:
    '''
    This class represents a Deep Q-Learning agent that uses a Deep Q-Network (DQN) and a replay memory to interact
    with its environment.

    Parameters
    ----------
    state_size: int, default=8
        The size of the state space.
    action_size: int, default=4
        The size of the action space.
    hidden_size: int, default=64
        The size of the hidden layers in the network.
    learning_rate: float, default=1e-3
        The learning rate for the optimizer.
    gamma: float, default=0.99
        The discount factor for future rewards.
    buffer_size: int, default=10000
        The maximum size of the replay memory.
    batch_size: int, default=64
        The batch size for learning from the replay memory.
    '''
    def __init__(self, state_size=7, action_size=4, hidden_size=64,
                 learning_rate=1e-3, gamma=0.99, buffer_size=10000, batch_size=64):
        # Select device to train on (if CUDA available, use it, otherwise use CPU)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Discount factor for future rewards
        self.gamma = gamma

        # Batch size for sampling from the replay memory
        self.batch_size = batch_size

        # Number of possible actions
        self.action_size = action_size

        # Initialize the Q-Network and Target Network with the given state size, action size and hidden layer size
        # Move the networks to the selected device
        self.q_network = DQN(state_size, action_size, hidden_size).to(self.device)
        self.target_network = DQN(state_size, action_size, hidden_size).to(self.device)

        # Set weights of target network to be the same as those of the q network
        self.target_network.load_state_dict(self.q_network.state_dict())

        # Set target network to evaluation mode
        self.target_network.eval()

        # Initialize the optimizer for updating the Q-Network's parameters
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=learning_rate)

        # Initialize the replay memory
        self.memory = ReplayBuffer(buffer_size)

    def step(self, state, action, reward, next_state, done):
        '''
        Perform a step in the environment, store the experience in the replay memory and potentially update the Q-network.

        Parameters
        ----------
        state: array-like
            The current state of the environment.
        action: int
            The action taken by the agent.
        reward: float
            The reward received after taking the action.
        next_state: array-like
            The state of the environment after taking the action.
        done: bool
            A flag indicating whether the episode has ended after taking the action.
        '''
        # Store the experience in memory
        self.memory.push(state, action, reward, next_state, done)

        # If there are enough experiences in memory, perform a learning step
        if len(self.memory) > self.batch_size:
            self.update_model()

    def act(self, state, eps=0.):
        '''
        Choose an action based on the current state and the epsilon-greedy policy.

        Parameters
        ----------
        state: array-like
            The current state of the environment.
        eps: float, default=0.
            The epsilon for the epsilon-greedy policy. With probability eps, a random action is chosen.

        Returns
        -------
        int
            The chosen action.
        '''
        # If a randomly chosen value is greater than eps
        if random.random() > eps:
            # Convert state to a PyTorch tensor and set network to evaluation mode
            state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
            self.q_network.eval()

            # With no gradient updates, get the action values from the DQN
            with torch.no_grad():
                action_values = self.q_network(state)

            # Revert to training mode and return action
            self.q_network.train()
            return np.argmax(action_values.cpu().data.numpy())
        else:
            # Return a random action for random value > eps
            return random.choice(np.arange(self.action_size))

    def update_model(self):
        '''
        Update the Q-network based on a batch of experiences from the replay memory.
        '''
        # Sample a batch of experiences from memory
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)

        # Convert numpy arrays to PyTorch tensors
        states = torch.from_numpy(states).float().to(self.device)
        actions = torch.from_numpy(np.array(actions)).long().to(self.device)
        rewards = torch.from_numpy(np.array(rewards)).float().to(self.device)
        next_states = torch.from_numpy(next_states).float().to(self.device)
        dones = torch.from_numpy(np.array(dones).astype(np.uint8)).float().to(self.device)

        # Get Q-values for the actions that were actually taken
        q_values = self.q_network(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)

        # Get maximum Q-value for the next states from target network
        next_q_values = self.target_network(next_states).max(1)[0].detach()

        # Compute the expected Q-values
        expected_q_values = rewards + self.gamma * next_q_values * (1 - dones)

        # Compute the loss between the current and expected Q values
        loss = torch.nn.MSELoss()(q_values, expected_q_values)

        # Zero all gradients
        self.optimizer.zero_grad()

        # Backpropagate the loss
        loss.backward()

        # Step the optimizer
        self.optimizer.step()

    def update_target_network(self):
        '''
        Update the weights of the target network to match those of the Q-network.
        '''
        self.target_network.load_state_dict(self.q_network.state_dict())

### Training the Agent

Training the agent involves having the agent interact with the `LunarLander-v2` environment over a sequence of steps. Over each step, the agent receives a state from the environment, selects an action, receives a reward and the next state, and then updates its understanding of the environment (the Q-table in the case of Q-Learning).

The `train` function orchestrates this process over a defined number of episodes, using the methods defined in the DQNAgent class. Here's how it works:

#### Initial Setup
- `scores`: This list stores the total reward obtained in each episode.
- `scores_window`: This is a double-ended queue with a maximum length of 100. It holds the scores of the most recent 100 episodes and is used to monitor the agent's performance.
-`eps`: This is the epsilon for epsilon-greedy action selection. It starts from `eps_start` and decays after each episode until it reaches `eps_end`.

#### Episode Loop
The training process runs over a fixed number of episodes. In each episode:

- The environment is reset to its initial state.
- he agent then interacts with the environment until the episode is done (when a terminal state is reached).

#### Step Loop
In each step of an episode:

- The agent selects an action using the current policy (the act method in `DQNAgent`).
The selected action is applied to the environment using the step method, which returns the next state, the reward, and a boolean indicating whether the episode is done.
- The agent's step method is called to update the agent's knowledge. This involves adding the experience to the replay buffer and, if enough experiences have been collected, triggering a learning update.
- The state is updated to the next state, and the reward is added to the score.

After each episode:

- The score for the episode is added to `scores` and `scores_window`.
- Epsilon is decayed according to `eps_decay`.
- If the episode is a multiple of `target_update`, the target network is updated with the latest weights from the Q-Network.
- Finally, every 100 episodes, the average score over the last 100 episodes is printed.

The function returns the list of scores for all episodes.

This training process, which combines experiences from the replay buffer and separate target and Q networks, helps to stabilize the learning and leads to a more robust policy.

In [None]:
def train(agent, env, n_episodes=2000, eps_start=1.0, eps_end=0.01, eps_decay=0.995, target_update=10):
    '''
    Train a DQN agent.

    Parameters
    ----------
    agent: DQNAgent
        The agent to be trained.
    env: gym.Env
        The environment in which the agent is trained.
    n_episodes: int, default=2000
        The number of episodes for which to train the agent.
    eps_start: float, default=1.0
        The starting epsilon for epsilon-greedy action selection.
    eps_end: float, default=0.01
        The minimum value that epsilon can reach.
    eps_decay: float, default=0.995
        The decay rate for epsilon after each episode.
    target_update: int, default=10
        The frequency (number of episodes) with which the target network should be updated.

    Returns
    -------
    list of float
        The total reward obtained in each episode.
    '''

    # Initialize the scores list and scores window
    scores = []
    scores_window = deque(maxlen=100)
    eps = eps_start

    # Loop over episodes
    for i_episode in range(1, n_episodes + 1):

        # Reset environment and score at the start of each episode
        state, _ = env.reset()
        score = 0

        # Loop over steps
        while True:

            # Select an action using current agent policy then apply in environment
            action = agent.act(state, eps)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Update the agent, state and score
            agent.step(state, action, reward, next_state, done)
            state = next_state
            score += reward

            # End the episode if done
            if done:
                break

        # At the end of episode append and save scores
        scores_window.append(score)
        scores.append(score)

        # Decrease epsilon
        eps = max(eps_end, eps_decay * eps)

        # Print some info
        print(f"\rEpisode {i_episode}\tAverage Score: {np.mean(scores_window):.2f}", end="")

        # Update target network every target_update episodes
        if i_episode % target_update == 0:
            agent.update_target_network()

        # Print average score every 100 episodes
        if i_episode % 100 == 0:
            print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))

        # This environment is considered to be solved for a mean score of 200 or greater, so stop training.
#         if i_episode % 100 == 0 and np.mean(scores_window) >= 200:
#             break


    return scores


# Make an environment
env = LunarLander()
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Initilize a DQN agent
agent = DQNAgent(state_size, action_size)

# Train it
scores = train(agent, env)

In [None]:
import matplotlib.pyplot as plt

plt.plot(scores)

In [None]:
# save model
import pickle
with open('agent.pkl', 'wb') as f:
    pickle.dump(agent, f)

#### Observations:
- Our DQN agent is able to solve the game typically after playing around 1200 episodes.
- Let's watch a video of this agent's performance:

In [None]:
env = LunarLander(render_mode='human')

def play_DQN_episode(env, agent):
    score = 0
    state, _ = env.reset(seed=42)

    while True:
        # eps=0 for predictions
        action = agent.act(state, 0)
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        score += reward

        # End the episode if done
        if done:
            break

    return score

score = play_DQN_episode(env, agent)
print("Score obtained:", score)

![](https://i.imgur.com/NAg48Qk.gif)

## Double DQN (DDQN)
The Double Deep Q-Network (DDQN) algorithm is a modification of the standard Deep Q-Network (DQN) algorithm, which reduces the overestimation bias in the Q-values, thereby improving the stability of the learning process. You can read the original publication by Hasselt et al from late 2015 here:

https://arxiv.org/abs/1509.06461

### The DDQN Algorithm

1. **Initialization**: Similar to DQN, initialize the parameters of two neural networks, $Q(s,a)$ (online network) and $\hat{Q}(s,a)$ (target network), with random weights. Both networks estimate Q-values from state-action pairs. Also, set the exploration probability $\epsilon$ to 1.0, and create an empty replay buffer.

2. **Action Selection**: Use an epsilon-greedy strategy, just like in DQN. With a probability of $\epsilon$, select a random action $a$, otherwise, select the action $a$ that yields the highest Q-value, i.e., $a = argmax_aQ(s,a)$.

3. **Experience Collection**: Carry out the selected action $a$ in the environment to get the immediate reward $r$ and the next state $s'$.

4. **Experience Storage**: Store the transition tuple $(s,a,r,s')$ in the replay buffer.

5. **Sampling:** Randomly sample a mini-batch of transitions from the replay buffer.

6. **Target Computation**: Here comes the primary difference from DQN. For every transition in the sampled mini-batch, compute the target value $y$. If the episode has ended, $y = r$. Otherwise, unlike DQN that uses the max operator to select the action from the target network, DDQN uses the online network to select the best action, and uses its Q-value estimate from the target network, i.e., $y = r + \gamma \hat{Q}(s', argmax_{a' \in A} Q(s', a'))$. This double estimator approach helps to reduce overoptimistic value estimates.

7. **Loss Calculation**: Compute the loss as the squared difference between the predicted Q-value from the online network and the computed target, i.e., $\mathcal{L} = (Q(s,a) - y)^2$.

8. **Online Network Update**: Perform Stochastic Gradient Descent (SGD) on the online network to minimize the loss.

9. **Target Network Update**: Every $N$ steps, update the target network by copying the weights from the online network.

10. **Iterate**: Repeat the process from step 2 until convergence.

In summary, the key difference in DDQN lies in the way the target Q-value is calculated for non-terminal states during the update. DDQN chooses the action using the online network and estimates the Q-value for this action using the target network. This modification helps mitigate the issue of overestimation present in standard DQN.


In [None]:
class DDQNAgent:
    def __init__(self, state_size=8, action_size=4, hidden_size=64,
                 learning_rate=1e-3, gamma=0.99, buffer_size=10000, batch_size=64):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.gamma = gamma
        self.batch_size = batch_size
        self.action_size = action_size
        self.q_network = DQN(state_size, action_size, hidden_size).to(self.device)
        self.target_network = DQN(state_size, action_size, hidden_size).to(self.device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.target_network.eval()
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.memory = ReplayBuffer(buffer_size)

    def step(self, state, action, reward, next_state, done):
        self.memory.push(state, action, reward, next_state, done)
        if len(self.memory) > self.batch_size:
            self.update_model()

    def act(self, state, eps=0.):
        if random.random() > eps:
            state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
            self.q_network.eval()
            with torch.no_grad():
                action_values = self.q_network(state)
            self.q_network.train()
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def update_model(self):
        '''
        Update the Q-network based on a batch of experiences from the replay memory.
        '''
        # Sample a batch of experiences from memory
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)

        # Convert numpy arrays to PyTorch tensors
        states = torch.from_numpy(states).float().to(self.device)
        actions = torch.from_numpy(np.array(actions)).long().to(self.device)
        rewards = torch.from_numpy(np.array(rewards)).float().to(self.device)
        next_states = torch.from_numpy(next_states).float().to(self.device)
        dones = torch.from_numpy(np.array(dones).astype(np.uint8)).float().to(self.device)

        # Get Q-values for the actions that were actually taken
        q_values = self.q_network(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)

        # Get the action values from the online network
        next_action_values = self.q_network(next_states).max(1)[1].unsqueeze(-1)

        # Get the Q-values from the target network for the actions chosen by the Q-network
        next_q_values = self.target_network(next_states).gather(1, next_action_values).detach().squeeze(-1)

        # Compute the expected Q-values
        expected_q_values = rewards + self.gamma * next_q_values * (1 - dones)

        # Compute the loss between the current and expected Q values
        loss = torch.nn.MSELoss()(q_values, expected_q_values)

        # Zero all gradients
        self.optimizer.zero_grad()

        # Backpropagate the loss
        loss.backward()

        # Step the optimizer
        self.optimizer.step()

    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

# Make an environment
env = gym.make('LunarLander-v2')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Initilize a DQN agent
agent = DDQNAgent(state_size, action_size)

# Train it
scores = train(agent, env)

# Play a demonstration episode
score = play_DQN_episode(env, agent)
print("Score obtained:", score)

![](https://i.imgur.com/rrfB9Vl.gif)

## Dueling Deep Q-Networks (Dueling DQN)
The Dueling Deep Q-Network (Dueling DQN) algorithm is an extension of the standard Deep Q-Network (DQN) algorithm, which aims to improve the estimation of the state-value function and thus enhance the quality of the policy. The dueling architecture was proposed by Wang et al in 2015, and you can find their original paper here:

https://arxiv.org/abs/1511.06581

### The Dueling DQN Algorithm
1. **Initializatin**: In Dueling DQN, initialize the parameters of two neural networks, $Q(s,a)$ (online network) and $\hat{Q}(s,a)$ (target network), with random weights. Unlike the traditional DQN, each network in Dueling DQN splits into two separate streams at some point - one for estimating the state-value function $V(s)$ and the other for estimating the advantage function $A(s,a)$. Also, set the exploration probability $\epsilon$ to 1.0, and create an empty replay buffer.

2. **Action Selection**: The action selection process is the same as DQN. Use an epsilon-greedy strategy. With a probability of $\epsilon$, select a random action $a$, otherwise, select the action $a$ that yields the highest Q-value, i.e., $a = argmax_aQ(s,a)$.

3. **Experience Collection**: Carry out the selected action $a$ in the environment to obtain the immediate reward $r$ and the next state $s'$.

4. **Experience Storage**: Store the transition tuple $(s,a,r,s')$ in the replay buffer.

5. **Sampling**: Randomly sample a mini-batch of transitions from the replay buffer.

6. **Target Computation**: For each transition in the sampled mini-batch, compute the target value $y$. If the episode has ended, $y = r$. Otherwise, compute $y$ as $y = r + \gamma \hat{Q}(s', argmax_{a' \in A} Q(s', a'))$.

7. **Loss Calculation**: Compute the loss as the squared difference between the predicted Q-value from the online network and the computed target, i.e., $\mathcal{L} = (Q(s,a) - y)^2$.

8. **Online Network Update**: Use Stochastic Gradient Descent (SGD) or another optimization algorithm to update the online network and minimize the loss.

9. **Target Network Update**: Every $N$ steps, update the target network by copying the weights from the online network.

10. **Iterate**: Repeat the process from step 2 until convergence.

Dueling DQN indeed introduces a novel network architecture for approximating the Q-value function. It separates the Q-value into two parts: the state-value function $V(s)$, which estimates the value of a state regardless of the actions, and the advantage function $A(s,a)$, which measures the relative advantage of taking an action in a state compared to the other actions.

At first glance, it might seem logical to compute the Q-value simply by adding the state-value and the advantage: $Q(s,a) = V(s) + A(s,a)$. However, this equation presents an issue: it's underdetermined. There are infinite possible combinations of $V(s)$ and $A(s,a)$ that satisfy this equation for a given $Q(s,a)$. For instance, if the actual value of $Q(s,a)$ is 10, we would have the equation $10 = V(s) + A(s,a)$, for which there are infinite solutions.

The authors of the Dueling DQN paper propose a clever way to overcome this issue: they force the advantage function to have zero advantage at the chosen action. This means that the highest advantage, $A(s,a)$, is 0, and other advantages are negative or zero, thus providing a unique solution. To implement this, they modify the equation as follows:

$$ Q(s,a) = V(s)+(A(s,a) − \max_{a'}A(s, a') $$

This equation means that the Q-value is computed as the state-value $V(s)$ plus the difference between the advantage of the action $a$ and the maximum advantage over all possible actions in state $s$. In other words, the Q-value is now the value of the state plus the relative advantage of taking the action $a$ over the other actions. This mechanism provides a clear way to train the network and allows Dueling DQN to learn efficiently about state values and action advantages.

To implement this, we can use the original DQN algorithm and our original DQNAgent class, we just need to change the DQN it uses, in total just 2 lines of code changes in the agent class.

In [None]:
class DuelingDQN(torch.nn.Module):

    def __init__(self, state_size=8, action_size=4, hidden_size=64):

        super(DuelingDQN, self).__init__()

        # Common layers
        self.layer1 = torch.nn.Linear(state_size, hidden_size)
        self.layer2 = torch.nn.Linear(hidden_size, hidden_size)

        # Advantage layer
        self.advantage = torch.nn.Linear(hidden_size, action_size)

        # Value layer
        self.value = torch.nn.Linear(hidden_size, 1)

    def forward(self, state):

        # Common part of the network
        x = torch.relu(self.layer1(state))
        x = torch.relu(self.layer2(x))

        # Streams split here
        advantage = self.advantage(x)
        value = self.value(x)

        # Recombine advantage and value for Q
        return value + (advantage - advantage.max(dim=1, keepdim=True)[0])


class DuelingDQNAgent:

    def __init__(self, state_size=8, action_size=4, hidden_size=64,
                 learning_rate=1e-3, gamma=0.99, buffer_size=10000, batch_size=64):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.gamma = gamma
        self.batch_size = batch_size
        self.action_size = action_size

        # Use the dueling DQN networks instead
        self.q_network = DuelingDQN(state_size, action_size, hidden_size).to(self.device)
        self.target_network = DuelingDQN(state_size, action_size, hidden_size).to(self.device)

        self.target_network.load_state_dict(self.q_network.state_dict())
        self.target_network.eval()
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.memory = ReplayBuffer(buffer_size)

    def step(self, state, action, reward, next_state, done):
        self.memory.push(state, action, reward, next_state, done)
        if len(self.memory) > self.batch_size:
            self.update_model()

    def act(self, state, eps=0.):
        if random.random() > eps:
            state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
            self.q_network.eval()
            with torch.no_grad():
                action_values = self.q_network(state)
            self.q_network.train()
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def update_model(self):
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        states = torch.from_numpy(states).float().to(self.device)
        actions = torch.from_numpy(np.array(actions)).long().to(self.device)
        rewards = torch.from_numpy(np.array(rewards)).float().to(self.device)
        next_states = torch.from_numpy(next_states).float().to(self.device)
        dones = torch.from_numpy(np.array(dones).astype(np.uint8)).float().to(self.device)
        q_values = self.q_network(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)
        next_q_values = self.target_network(next_states).max(1)[0].detach()
        expected_q_values = rewards + self.gamma * next_q_values * (1 - dones)
        loss = torch.nn.MSELoss()(q_values, expected_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

# Make an environment
env = gym.make('LunarLander-v2')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Initilize a DuelingDQN agent
agent = DuelingDQNAgent(state_size, action_size)

# Train it
scores = train(agent, env)

# Play a demonstration episode
score = play_DQN_episode(env, agent)
print("Score obtained:", score)

![](https://i.imgur.com/QNCmfd3.gif)

## Double Dueling Deep Q-Networks (Dueling DQN)

We can also use the DDQN training trick to prevent the overestimation of Q-values from Dueling DQN. We can call this algorithm Dueling Double Deep Q-Network, or D3QN.

To use this, we just need to change the code in our DuelingDQN agent's `update_model` method so it uses the DDQN trick to prevent Q-value overestimation:

In [None]:
class D3QNAgent:
    def __init__(self, state_size=8, action_size=4, hidden_size=64,
                 learning_rate=1e-3, gamma=0.99, buffer_size=10000, batch_size=64):

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.gamma = gamma
        self.batch_size = batch_size
        self.action_size = action_size
        self.q_network = DuelingDQN(state_size, action_size, hidden_size).to(self.device)
        self.target_network = DuelingDQN(state_size, action_size, hidden_size).to(self.device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.target_network.eval()
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.memory = ReplayBuffer(buffer_size)

    def step(self, state, action, reward, next_state, done):
        self.memory.push(state, action, reward, next_state, done)
        if len(self.memory) > self.batch_size:
            self.update_model()

    def act(self, state, eps=0.):
        if random.random() > eps:
            state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
            self.q_network.eval()
            with torch.no_grad():
                action_values = self.q_network(state)

            self.q_network.train()
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def update_model(self):
        '''
        Update the Q-network based on a batch of experiences from the replay memory.
        '''
        # Sample a batch of experiences from memory
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)

        # Convert numpy arrays to PyTorch tensors
        states = torch.from_numpy(states).float().to(self.device)
        actions = torch.from_numpy(np.array(actions)).long().to(self.device)
        rewards = torch.from_numpy(np.array(rewards)).float().to(self.device)
        next_states = torch.from_numpy(next_states).float().to(self.device)
        dones = torch.from_numpy(np.array(dones).astype(np.uint8)).float().to(self.device)

        # Get Q-values for the actions that were actually taken
        q_values = self.q_network(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)

        # Get the action values from the online network
        next_action_values = self.q_network(next_states).max(1)[1].unsqueeze(-1)

        # Get the Q-values from the target network for the actions chosen by the Q-network
        next_q_values = self.target_network(next_states).gather(1, next_action_values).detach().squeeze(-1)

        # Compute the expected Q-values
        expected_q_values = rewards + self.gamma * next_q_values * (1 - dones)

        # Compute the loss between the current and expected Q values
        loss = torch.nn.MSELoss()(q_values, expected_q_values)

        # Zero all gradients
        self.optimizer.zero_grad()

        # Backpropagate the loss
        loss.backward()

        # Step the optimizer
        self.optimizer.step()

    def update_target_network(self):
        '''
        Update the weights of the target network to match those of the Q-network.
        '''
        self.target_network.load_state_dict(self.q_network.state_dict())

# Make an environment
env = gym.make('LunarLander-v2')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Initilize a D3QN agent
agent = D3QNAgent(state_size, action_size)

# Train it
scores = train(agent, env)

# Play a demonstration episode
score = play_DQN_episode(env, agent)
print("Score obtained:", score)