<a href="https://colab.research.google.com/github/Rick-01s/stack-0-matic/blob/main/stack_o_matic_assignment3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gymnasium Box2D matplotlib torch


In [2]:
import gymnasium as gym
import math
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from IPython import display
from IPython.display import clear_output
import base64

import io
from PIL import Image

In [3]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


In [4]:
class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)


In [5]:
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

env = gym.make('LunarLander-v3')

n_actions = env.action_space.n
state, info = env.reset()
n_observations = len(state)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)

steps_done = 0


In [6]:
def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


In [7]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    state_action_values = policy_net(state_batch).gather(1, action_batch)

    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()


In [8]:

def render_lunar_lander(env):
    render_env = gym.make('LunarLander-v3', render_mode='rgb_array')
    state, info = render_env.reset()
    img = render_env.render()
    plt.imshow(img)
    display.display(plt.gcf())
    plt.clf()
    render_env.close()

In [None]:
num_episodes =1000
episode_durations = []
episode_rewards = []

def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())
    plt.pause(0.001)
    display.display(plt.gcf())
    display.clear_output(wait=True)

def plot_rewards(show_result=False):
    plt.figure(2)
    rewards_t = torch.tensor(episode_rewards, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.plot(rewards_t.numpy())
    if len(rewards_t) >= 100:
        means = rewards_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())
    plt.draw()
    plt.pause(0.001)
    display.display(plt.gcf())
    display.clear_output(wait=True)

for i_episode in range(num_episodes):
    state, info = env.reset()
    state = torch.tensor([state], device=device, dtype=torch.float)
    total_reward = 0
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        total_reward += reward.item()

        done = terminated or truncated

        next_state = None if done else torch.tensor([observation], device=device, dtype=torch.float)

        memory.push(state, action, next_state, reward)

        state = next_state

        optimize_model()

        if done:
            episode_durations.append(t + 1)
            episode_rewards.append(total_reward)
            plot_durations()
            plot_rewards()
            render_lunar_lander(env)
            break

        if i_episode % 50 == 0:
            env.render()

    if i_episode % 10 == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')
env.close()


In [None]:
durations_t = torch.tensor(episode_durations, dtype=torch.float)
rewards_t = torch.tensor(episode_rewards, dtype=torch.float)
plt.figure(1)
plt.title('Training Complete - Duration over Episodes')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())

plt.figure(2)
plt.title('Training Complete - Reward over Episodes')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.plot(rewards_t.numpy())

plt.show()


In [None]:
env = gym.make('LunarLander-v3', render_mode='rgb_array')
state = env.reset()
for t in range(1000):
    action = env.action_space.sample()
    observation, reward, done, truncated, info = env.step(action)
    img = env.render()
    plt.imshow(img)
    display.display(plt.gcf())
    plt.pause(0.01)
    display.clear_output(wait=True)
    if done:
        break
env.close()