# Lunar Lander (CS7642 - P2)


In [None]:
%pip install swig -q
%pip install -q -U gymnasium[box2d]
%pip install --upgrade ipykernel -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m19.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m117.1/117.1 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the f

In [None]:
import numpy as np
import gymnasium as gym

In [None]:
import tensorflow as tf

In [None]:
import matplotlib.animation
import matplotlib.pyplot as plt

plt.rc('font', size=14)
plt.rc('axes', labelsize=14, titlesize=14)
plt.rc('legend', fontsize=14)
plt.rc('xtick', labelsize=10)
plt.rc('ytick', labelsize=10)
plt.rc('animation', html='jshtml')

In [None]:
def plot_environment(env, figsize=(5, 4)):
    plt.figure(figsize=figsize)
    img = env.render()
    plt.imshow(img)
    plt.axis("off")
    return img

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import json

## Simple Policy

In [None]:
def basic_policy(obs):
    angle = obs[5]
    if abs(angle) < 0.1:
        return 0    # do nothing
    return 1 if angle < 0 else 2

In [None]:
def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = matplotlib.animation.FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

def show_one_episode(policy, n_max_steps=200, seed=42):
    frames = []
    env = gym.make("LunarLander-v2", render_mode="rgb_array")
    np.random.seed(seed)
    obs, info = env.reset(seed=seed)
    for step in range(n_max_steps):
        frames.append(env.render())
        action = policy(obs)
        obs, reward, done, truncated, info = env.step(action)
        if done or truncated:
            break
    env.close()
    return plot_animation(frames)

show_one_episode(basic_policy)

In [None]:
def simple_policy_stats():
    totals_per_episode = []
    env = gym.make("LunarLander-v2", render_mode="rgb_array")

    for episode in range(500):
        episode_rewards = 0
        obs, info = env.reset(seed=episode)
        for step in range(300):
            action = basic_policy(obs)
            obs, reward, done, truncated, info = env.step(action)
            episode_rewards += reward
            if done or truncated:
                break

        totals_per_episode.append(episode_rewards)

    return totals_per_episode

In [None]:
totals_per_episode = simple_policy_stats()

mean, std, min_value, max_value = np.mean(totals_per_episode), np.std(totals_per_episode), min(totals_per_episode), max(totals_per_episode)
print(f"Description {'Value':>10}")
print("-" * 30)
print(f"Mean        {mean:.2f}")
print(f"Std         {std:.2f}")
print(f"Min         {min_value}")
print(f"Max         {max_value}")

Description      Value
------------------------------
Mean        -178.48
Std         130.99
Min         -703.6407525557729
Max         33.77475252143975


## Replay Buffer

In [None]:
class ReplayBuffer:
    def __init__(self, max_size):
        self.buffer = np.empty(max_size, dtype=object)
        self.max_size = max_size
        self.index = 0
        self.size = 0

    def append(self, obj):
        self.buffer[self.index] = obj
        self.size = min(self.size + 1, self.max_size)
        self.index = (self.index + 1) % self.max_size

    def sample(self, batch_size):
        indices = np.random.randint(self.size, size=batch_size)
        return self.buffer[indices]

## Implementation

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
class DQNAgent:
    def __init__(self, state_size, num_actions, gamma, model, optimizer, loss_fn, batch_size):
        self.replay_buffer = ReplayBuffer(50_000)
        self.state_size = state_size
        self.num_actions = num_actions
        self.gamma = gamma      # discount factor

        # NN hparams
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.batch_size = batch_size

        self.target_model = tf.keras.models.clone_model(self.model)  # clone the model's architecture
        self.target_model.set_weights(self.model.get_weights())  # copy the weights

    def select_epsilon_greedy_action(self, state, epsilon=0) -> int:
        """Returns the action to take"""
        if np.random.rand() < epsilon:
            return np.random.randint(self.num_actions)  # random action
        else:
            # Q_values = self.model.predict(state[np.newaxis], verbose=0)[0]
            # return Q_values.argmax()  # optimal action according to the DQN
            return tf.math.argmax(self.model(state[np.newaxis])[0]).numpy()

    def do_one_step(self, env, state, epsilon):
        """Takes an action, adds experience to buffer, returns experience"""
        action = self.select_epsilon_greedy_action(state, epsilon)
        next_state, reward, done, truncated, info = env.step(action)
        self.replay_buffer.append((state, action, reward, next_state, done, truncated))
        return next_state, reward, done, truncated, info

    def sample_experiences(self, batch_size):
        """Returns [states, actions, rewards, next_states, dones, truncateds]

            Where each element is an array of size batch_size.
            For example, element 0 (i.e., states) is an array of
            shape (batch_size, state_size). actions is of shape (batch_size,)
        """
        batch = self.replay_buffer.sample(batch_size)
        return [
            np.array([experience[field_index] for experience in batch])
            for field_index in range(6)
        ]

    def soft_update_target_model(self):
        target_weights = self.target_model.get_weights()
        online_weights = self.model.get_weights()
        for index, online_weight in enumerate(online_weights):
           target_weights[index] = (0.999 * target_weights[index]
                                    + 0.001 * online_weight)
        self.target_model.set_weights(target_weights)

    def training_step(self):
        """Does ones step of Q-learning step over a batch of experiences"""
        experiences = self.sample_experiences(self.batch_size)
        states, actions, rewards, next_states, dones, truncateds = experiences
        next_Q_values = self.target_model(next_states).numpy()                  # shape = (batch_size, num_actions)
        max_next_Q_values = next_Q_values.max(axis=1)                           # shape = (batch_size,)
        runs = 1.0 - (dones | truncateds)  # episode is not done or truncated, shape = (batch_size,)
        target_Q_values = rewards + (runs * self.gamma * max_next_Q_values)     # shape = (batch_size,)
        target_Q_values = target_Q_values.reshape(-1, 1)                        # shape = (batch_size, 1)
        mask = tf.one_hot(actions, self.num_actions)  # = 1 at index of action. shape = (batch_size, num_actions)

        with tf.GradientTape() as tape:
            all_Q_values = self.model(states)                                    # shape = (batch_size, num_actions)
            Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True) # shape = (batch_size, 1)
            loss = tf.reduce_mean(self.loss_fn(target_Q_values, Q_values))

        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

        self.soft_update_target_model()

In [None]:
def checkpoint(run_name: str, episode_idx: str, run_info: dict, total_reward_per_episode: list, average_episode_reward_over_last_100: list):
    """Checkpoints the current info in the file run_name.json

    It simply writes the dictionary of the form:
    {
        episode_idx: [109, 109, 107]
        average_episode_reward_over_last_100: [...]
    }
    """
    run_info[episode_idx] = total_reward_per_episode
    run_info["average_episode_reward_over_last_100"] = average_episode_reward_over_last_100
    with open(f"/content/drive/My Drive/classes/cs7642/{run_name}.json", "w") as f:
        json.dump(run_info, f)

In [None]:
def train_agent(run_name: str):
    # Set Up
    env_ = gym.make("LunarLander-v2", render_mode="rgb_array")
    # env_.reset(seed=42)
    input_shape = env_.observation_space.shape
    n_outputs = env_.action_space.n

    batch_size = 64
    gamma = 0.99    # discount_factor
    optimizer = tf.keras.optimizers.Adam(learning_rate=5e-4)
    loss_fn = tf.keras.losses.mean_squared_error

    model_ = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation="relu", input_shape=input_shape),
        tf.keras.layers.Dense(64, activation="relu"),
        tf.keras.layers.Dense(n_outputs)
    ])
    TRAIN_EVERY_N_STEPS = 4

    num_episodes = 2000
    steps_per_episode = 1000


    # Do training
    total_reward_per_episode = []
    average_episode_reward_over_last_100 = []
    run_info = {}

    agent = DQNAgent(input_shape[0], n_outputs, gamma, model_, optimizer, loss_fn, batch_size)
    epsilon = 1.0
    overall_step = 0

    for episode_idx in range(num_episodes):
        state, info = env_.reset()
        episode_reward = 0
        for step in range(steps_per_episode):
            state, reward, done, truncated, info = agent.do_one_step(env_, state, epsilon)
            episode_reward += reward

            if episode_idx > 2 and overall_step % TRAIN_EVERY_N_STEPS == 0:
                agent.training_step()
            overall_step += 1

            if done or truncated:
                break

        epsilon = max(epsilon * 0.995, 0.01)

        # Record results and log info
        total_reward_per_episode.append(episode_reward)
        if episode_idx % 100 == 0:
            average_episode_reward_over_last_100.append(np.mean(total_reward_per_episode[-100:]))
        if episode_idx % 50 == 0:
            checkpoint(run_name, episode_idx, run_info, total_reward_per_episode, average_episode_reward_over_last_100)

        print(f"\rEpisode: {episode_idx + 1}, Steps: {step + 1}, eps: {epsilon:.3f}, episode_reward: {episode_reward}", end="")

        if average_episode_reward_over_last_100[-1] > 215:
            print(f"\nConverged: Achieved an average reward >215 on episode {episode_idx+1}")
            break

    return total_reward_per_episode, agent

In [None]:
run_name = "run1"

In [None]:
total_reward_per_episode, agent = train_agent(run_name)

In [None]:
plt.figure(figsize=(8, 4))
plt.plot(total_reward_per_episode)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Total reward", fontsize=14)
plt.grid(True)
plt.savefig(f"{run_name}_during_training.png")
plt.show()

Episode: 602, Steps: 109, eps: 0.049, episode_reward: 50.03284723840454

In [None]:
def play_n_episodes(num_episodes: int, agent: DQNAgent):
    """After training, simply play n episodes using optimal action"""
    total_reward_per_episode = []
    env_ = gym.make("LunarLander-v2", render_mode="rgb_array")
    for episode_idx in range(num_episodes):
        state, info = env_.reset()
        done = False
        episode_reward = 0
        while not done:
            state, reward, done, truncated, info = agent.do_one_step(env_, state, epsilon=0)
            episode_reward += reward

        total_reward_per_episode.append(episode_reward)

        print(f"\rEpisode: {episode_idx + 1}, episode_reward: {episode_reward}", end="")

    return total_reward_per_episode

NameError: name 'DQNAgent' is not defined

In [None]:
total_reward_per_episode = play_n_episodes(100, agent)

plt.figure(figsize=(8, 4))
plt.plot(total_reward_per_episode)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Total reward", fontsize=14)
plt.grid(True)
plt.savefig(f"/content/drive/My Drive/classes/cs7642/{run_name}_after_training.png")
plt.show()

In [None]:
# To try: TD error clipping to [-1, 1], Double DQN, np.vstack, checkpointing, stop when converged to 200, extend num_steps