Using DQN to solve rocket trajectory optimization problem

In [None]:
! pip install gym[box2d] >> None

In [None]:
from gym import make
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
from torch.optim import Adam
from collections import deque
import random
import copy

from gym.wrappers.monitoring import video_recorder
from IPython.display import HTML, display
import glob
import base64, io

## Exploring the environment

In [None]:
%%html
<video controls autoplay><source src="https://huggingface.co/ThomasSimonini/ppo-LunarLander-v2/resolve/main/replay.mp4" type="video/mp4"></video>

In [None]:
# environment initialization
env = make("LunarLander-v2")

In [None]:
# action space
env.action_space # (1) do nothing, (2) fire left orientation engine, (3) fire main engine, (4) fire right orientation engine
print(f'Action space shape: {env.action_space.n}')

Discrete(4)

In [None]:
# observation space
print(env.observation_space) # the coordinates of the lander in x & y, its linear velocities in x & y, its angle, its angular velocity, and two booleans that represent whether each leg is in contact with the ground or not
print(f'Observation space shape: {env.observation_space.shape[0]}')

Box([-1.5       -1.5       -5.        -5.        -3.1415927 -5.
 -0.        -0.       ], [1.5       1.5       5.        5.        3.1415927 5.        1.
 1.       ], (8,), float32)
Observation space shape 8


In [None]:
# rewards
# Solved is 200 points

In [None]:
# interaction with the environment
env.reset()
action =  env.action_space.sample()
next_state, reward, done, _ = env.step(action)
print('Next state: ', next_state)
print('Reward: ', reward)
print('Is done: ', done)

Next state:  [-0.00882721  1.3892448  -0.4527549  -0.4946026   0.01215031  0.14158219
  0.          0.        ]
Reward:  -2.197294432446283
Is done:  False


More info can be found in documentation: https://www.gymlibrary.dev/environments/box2d/lunar_lander/

## Train

In [None]:
GAMMA = 0.99
INITIAL_STEPS = 10000
TRANSITIONS = 500000
STEPS_PER_UPDATE = 4
STEPS_PER_TARGET_UPDATE = STEPS_PER_UPDATE * 1000
BATCH_SIZE = 512
LEARNING_RATE = 1e-4

device = "cuda" if torch.cuda.is_available() else "cpu"

class DQN:
    def __init__(self, state_dim, action_dim,
                 hid_size1=512, hid_size2=256, hid_size3=128, maxlen=10000):
        self.steps = 0
        self.model = nn.Sequential(
            nn.Linear(state_dim, hid_size1),
            nn.ReLU(),
            nn.Linear(hid_size1, hid_size2),
            nn.ReLU(),
            nn.Linear(hid_size2, hid_size3),
            nn.ReLU(),
            nn.Linear(hid_size3, action_dim)
        ).to(device)  # agent
        self.target = copy.deepcopy(self.model)
        self.reply = deque([], maxlen=maxlen) # replay buffer
        self.optimizer = Adam(self.model.parameters(), lr=LEARNING_RATE)
        self.criterion = F.mse_loss
        self.fitness = -1000

    def consume_transition(self, transition):
        # Add transition to a replay buffer
        # Hint: use deque with specified maxlen. It will remove old experience automatically.
        self.reply.append(transition)

    def sample_batch(self):
        # Sample batch from a replay buffer.
        sample = random.sample(self.reply, BATCH_SIZE)
        return list(zip(*sample))

    def _prepare_data(self, batch):
        state, action, next_state, reward, done = batch
        state = torch.tensor(np.array(state), device=device, dtype=torch.float32)
        action = torch.tensor(action, device=device, dtype=torch.int64)
        next_state = torch.tensor(np.array(next_state), device=device, dtype=torch.float32)
        reward = torch.tensor(reward, device=device, dtype=torch.float32)
        done = torch.tensor(done, device=device, dtype=torch.float32)
        return state, action, next_state, reward, done

    def train_step(self, batch):
        # Use batch to update DQN's network.
        state, action, next_state, reward, done = self._prepare_data(batch)

        with torch.no_grad():
            q_target = self.target(next_state).max(dim=1).values
            # q_target[done == 1] = 0
            q_target = reward + q_target * (1 - done) * GAMMA

        q_model = self.model(state)[torch.arange(len(action)), action]
        loss = self.criterion(q_model, q_target)
        self.optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_value_(self.model.parameters(), 5)
        self.optimizer.step()

    def update_target_network(self):
        # Update weights of a target Q-network here. You may use copy.deepcopy to do this or
        # assign a values of network parameters via PyTorch methods.
        self.target.load_state_dict(copy.deepcopy(self.model.state_dict()))
        self.target.to(device)

    def act(self, state, target=False):
        # Compute an action. Do not forget to turn state to a Tensor and then turn an action to a numpy array.
        with torch.no_grad():
            state = torch.tensor(state, device=device, dtype=torch.float32)
            action = torch.argmax(self.model(state)).cpu().numpy().item()
        return action

    def update(self, transition):
        self.consume_transition(transition)
        if self.steps % STEPS_PER_UPDATE == 0:
            batch = self.sample_batch()
            self.train_step(batch)
        if self.steps % STEPS_PER_TARGET_UPDATE == 0:
            self.update_target_network()
        self.steps += 1

    def save(self, fitness):
        if fitness >= self.fitness:
            self.fitness = fitness
            torch.save(self.model, "agent.pkl")


def evaluate_policy(agent, episodes=5):
    env = make("LunarLander-v2")
    returns = []
    for _ in range(episodes):
        done = False
        state = env.reset()
        total_reward = 0

        while not done:
            state, reward, done, _ = env.step(agent.act(state))
            total_reward += reward
        returns.append(total_reward)
    return returns

In [None]:
env = make("LunarLander-v2")
dqn = DQN(state_dim=env.observation_space.shape[0], action_dim=env.action_space.n)
eps = 0.1
state = env.reset()

for _ in range(INITIAL_STEPS):
    action = env.action_space.sample()

    next_state, reward, done, _ = env.step(action)
    dqn.consume_transition((state, action, next_state, reward, done))

    state = next_state if not done else env.reset()

for i in range(TRANSITIONS):
    # Epsilon-greedy policy
    if random.random() < eps:
        action = env.action_space.sample()
    else:
        action = dqn.act(state)

    next_state, reward, done, _ = env.step(action)
    dqn.update((state, action, next_state, reward, done))

    state = next_state if not done else env.reset()

    if (i + 1) % (TRANSITIONS // 100) == 0:
        rewards = evaluate_policy(dqn, 5)
        print(f"Step: {i + 1}, Reward mean: {np.mean(rewards)}, Reward std: {np.std(rewards)}")
        dqn.save(np.mean(rewards))


Step: 5000, Reward mean: -275.2288397325195, Reward std: 130.8994045809992
Step: 10000, Reward mean: -200.14622776888612, Reward std: 146.8308612656839
Step: 15000, Reward mean: -228.58417685190096, Reward std: 119.65154263476151
Step: 20000, Reward mean: 37.719320269682484, Reward std: 46.10377428169155
Step: 25000, Reward mean: -71.00798817488638, Reward std: 172.2059975229927
Step: 30000, Reward mean: -80.86016332419028, Reward std: 86.48147873506352
Step: 35000, Reward mean: -55.55695911128587, Reward std: 40.73076848302494
Step: 40000, Reward mean: -11.910865125028595, Reward std: 126.8329021483398
Step: 45000, Reward mean: -55.61083540991588, Reward std: 61.23155929297405
Step: 50000, Reward mean: -69.46612800001526, Reward std: 51.401250273804436
Step: 55000, Reward mean: -30.77222590557896, Reward std: 30.166374838048764
Step: 60000, Reward mean: -82.2421064390716, Reward std: 30.24517830231615
Step: 65000, Reward mean: -59.198252339708844, Reward std: 17.17759046426846
Step: 7

## Inference

In [None]:
import random
import numpy as np
import os
import torch


class Agent:
    def __init__(self):
        self.model = torch.load("agent.pkl")

    def act(self, state):
        # if not isinstance(state, torch.Tensor):
        state = torch.tensor(state, device=device)
        with torch.no_grad():
            action = self.model(state)
        return np.argmax(action.cpu().numpy())

In [None]:
def show_video(env_name):
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = 'video/{}.mp4'.format(env_name)
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

def show_video_of_model(agent, env_name):
    env = make(env_name)
    vid = video_recorder.VideoRecorder(env, path="video/{}.mp4".format(env_name))
    # agent.qnetwork_local.load_state_dict(torch.load('checkpoint.pth'))
    state = env.reset()
    done = False
    while not done:
        frame = env.render(mode='rgb_array')
        vid.capture_frame()

        action = agent.act(state)

        state, reward, done, _ = env.step(action)
    env.close()

In [None]:
trained_agent = Agent()
env_name = "LunarLander-v2"
show_video_of_model(agent=trained_agent, env_name=env_name)

  "Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  f"Recording ability for environment {env.spec.id} initialized with `render_mode=None` is marked "
  f"{self.__class__} is marked as deprecated and will be removed in the future."
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  "The argument mode in render method is deprecated; "


In [None]:
show_video(env_name)