In [1]:
%matplotlib tk
# %matplotlib qt
import matplotlib.pyplot as plt
plt.xkcd()

<contextlib.ExitStack at 0x7982ef5e4760>

# Q-Learning Agent

In [2]:
import gymnasium as gym
env = gym.make("LunarLander-v3", render_mode='human')
cont = None

In [3]:
env.observation_space.sample().shape

(8,)

In [4]:
env.action_space

Discrete(4)

In [5]:
import torch
import torch.nn as nn

class LanderNetwork(nn.Module):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.layers = nn.Sequential(
            nn.Linear(8, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 4),
        )
    def forward(self, x):
        return self.layers(x)

Simple Network Agent
--------------------
This is just a simple agent that uses the network but does not train it.


In [6]:
from utils import Agent
class SimpleNetworkAgent(Agent):
    def __init__(self) -> None:
        super().__init__()
        self.network = LanderNetwork()
    def act(self, observation, periphral=None):
        obs = torch.from_numpy(observation)
        action = self.network(obs)
        return action.argmax().item()
    def think(self, observation_old, action, reward, observation, terminated):
        return super().think(observation_old, action, reward, observation, terminated)

sna = SimpleNetworkAgent()
cont = None

In [7]:
from utils import run_upto_n_steps, plot_reward_and_episodes
from tqdm import trange

for round in trange(20):
    cont = run_upto_n_steps(env, sna, 150, cont)
    if round % 2 == 0:
        plt.clf()
        plot_reward_and_episodes(sna)
        plt.pause(0.2)

100%|██████████| 20/20 [00:32<00:00,  1.62s/it]


In [8]:
env.close()
# plt.close()

In [9]:
import gymnasium as gym
env = gym.make("LunarLander-v3")
cont = None

In [10]:
from utils import run_upto_n_steps, plot_reward_and_episodes
from tqdm import trange

for round in trange(200):
    cont = run_upto_n_steps(env, sna, 150, cont)
    if round % 2 == 0:
        plt.clf()
        plot_reward_and_episodes(sna)
        plt.pause(0.2)

100%|██████████| 200/200 [00:34<00:00,  5.74it/s]


In [11]:
plt.close()

Trainable Network Agent
-----------------------
This agent will train the network by optimizing on the most recent few 
(Observaation, Action, Reward, Observation) quartuples

In [3]:
import gymnasium as gym
env = gym.make("LunarLander-v3")
cont = None

In [12]:
import numpy as np
from utils import Agent
class TrainableNetworkAgent(Agent):
    def __init__(self, gamma = 0.9, lr=0.001, update_interval=32, epsilon=0.1, terminal_multiplier=1) -> None:
        super().__init__()
        self.network = LanderNetwork()
        self.gamma = gamma
        self.update_interval = update_interval
        self.epsilon = epsilon
        self.terminal_multiplier = terminal_multiplier
        self.optim = torch.optim.AdamW(self.network.parameters(), lr)
        self.loss = torch.tensor(0., requires_grad=True)
        self.thoughts = 0
        self.losses = []
        self.events = [] # List of dictionary of events
    def act(self, observation, periphral=None):
        if self.network.training and torch.rand((1,)).item() < self.epsilon:
            return torch.randint(self.network.layers[-1].out_features, (1,)).item()
        obs = torch.from_numpy(observation)
        with torch.no_grad():
            action = self.network(obs)
        return action.argmax().item()
    def think(self, observation, action, reward, observation_next, terminated):
        if not self.network.training:
            return 
        self.thoughts += 1
        if isinstance(observation, np.ndarray):
            observation = torch.from_numpy(observation)
        if isinstance(observation_next, np.ndarray):
            observation_next = torch.from_numpy(observation_next)
        outputs = self.network(observation)
        next_outputs = self.network(observation_next)
        cr_pred = outputs[action]
        cr_esti = reward + self.gamma*next_outputs.argmax()

        loss = (cr_esti - cr_pred)**2
        if terminated:
            loss = loss + self.terminal_multiplier*sum(next_outputs**2)
        self.loss = self.loss + loss
        self.losses.append(loss.item())
        self.events.append({
            "terminal": terminated,
            "reward": reward
        })
        if self.thoughts % self.update_interval == 0:
            self.update()
        
    def update(self):
        self.optim.zero_grad()
        self.loss.backward()
        self.optim.step()
        self.loss = torch.tensor(0., requires_grad=True)

In [13]:
tna = TrainableNetworkAgent(terminal_multiplier=1, gamma=0.99)
cont = None

In [14]:
from utils import run_upto_n_steps, plot_reward_and_episodes
from tqdm import trange
tna.network.train()
for round in trange(2000):
    try:
        cont = run_upto_n_steps(env, tna, 150, cont)
    except KeyboardInterrupt:
        break
    if round % 20 == 0:
        # plot_agent(tna)
        plt.clf()
        plt.subplot(2,1,1)
        plt.scatter(*zip(*enumerate(tna.losses)), c=[i['terminal'] for i in tna.events], s=10, alpha=0.8)
        plt.subplot(2, 1, 2)
        plot_reward_and_episodes(tna)
        plt.pause(0.2)

100%|██████████| 2000/2000 [06:15<00:00,  5.32it/s]


In [18]:
enviz = gym.make("LunarLander-v3", render_mode='human')
tna.network.eval()
for i in trange(5):
    run_upto_n_steps(enviz, tna, 1_000)
input()
enviz.close()


100%|██████████| 5/5 [01:02<00:00, 12.49s/it]


# Random Agent

In [1]:
%matplotlib tk
# %matplotlib qt

In [2]:
from utils import Agent, run_upto_n_steps
import numpy as np
class RandLander(Agent):
    def act(self, observation, periphral=None):
        return np.random.choice([0,1,2,3])
    def think(self, observation_old, action, reward, observation, terminated):
        pass

rand_agent = RandLander()



In [3]:
from utils import Agent, run_upto_n_steps
import gymnasium as gym

env = gym.make("LunarLander-v3", render_mode="human")
cont = None

In [4]:
import logging
# logging.basicConfig(level=logging.INFO) # If your interested in some logs

In [5]:
from utils import run_and_plot

for round in range(100):
    cont = run_and_plot(env, rand_agent, 50, cont)

In [6]:
env.close()

In [7]:
import gymnasium as gym
env = gym.make("LunarLander-v3")
cont = None

In [10]:
from utils import run_and_plot

for round in range(100):
    cont = run_and_plot(env, rand_agent, 150, cont)

# Utils

In [3]:
import gymnasium as gym

env = gym.make("LunarLander-v3", render_mode="human")
observation, info = env.reset()

episode_over = False
while not episode_over:
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)

    episode_over = terminated or truncated

env.close()

define a function that takes a model and an environment, and run the model in the environment for *n* steps

In [22]:
import gymnasium as gym
from abc import ABC, abstractmethod

class Agent(ABC):
    @abstractmethod
    def act(self, observation, periphral=None):
        pass
    @abstractmethod
    def think(self, observation_old, action, reward, observation):
        pass

def run_n_steps(env, agent: Agent, n, continuation=None):
    if continuation is not None:
        observation, reward, terminated, truncated, info = continuation
    if continuation is None or terminated or truncated:
        print("Resetting")
        observation, info = env.reset()
        terminated = False
        truncated = False
        reward = 0
    step = 0
    # print((observation, reward, terminated, truncated, info))
    while not terminated and not truncated and step < n:
        action = agent.act(observation)
        observation_old = observation
        observation, reward, terminated, truncated, info = env.step(action)
        agent.think(observation_old, action, reward, observation)
        step += 1
    return (observation, reward, terminated, truncated, info)

In [None]:
env = gym.make("LunarLander-v3", render_mode='human')

class RandomLunar(Agent):
    def act(self, observation, periphral=None):
        return env.action_space.sample()
    def think(self, observation_old, action, reward, observation):
        return super().think(observation_old, action, reward, observation)
randAgent = RandomLunar()


cont = run_n_steps(env, randAgent, 10)

In [93]:
cont = run_n_steps(env, randAgent, 10, cont)

In [94]:
env.close()