In [1]:
%matplotlib tk
# %matplotlib qt
import matplotlib.pyplot as plt
plt.xkcd()

<contextlib.ExitStack at 0x735681fc45b0>

Q-Learning Agent (Jax/Flax)
===========================

In [None]:
from typing import Any
from flax import nnx

class LanderNetwork(nnx.Module):
    def __init__(self, rngs: nnx.Rngs=None) -> None:
        if rngs is None: rngs = nnx.Rngs(0)
        self.layers = nnx.Sequential(
            nnx.Linear(8, 128, rngs=rngs),
            nnx.relu,
            nnx.Linear(128, 128, rngs=rngs),
            nnx.relu,
            nnx.Linear(128, 4, rngs=rngs)
        )
    def __call__(self, x):
        return self.layers(x)
        

In [None]:
import numpy as np
ln = LanderNetwork(nnx.Rngs(0))
ln(np.random.randn(8)).argmax(-1)

Array(1, dtype=int32)

In [None]:
from utils import Agent

class TrainableAgent(Agent):
    
    def __init__(self, network: nnx.Module) -> None:
        self.network = network
        self._training = False
    def training(self):
        self.network.train()
        self._training=True
    def eval(self):
        self.network.eval()
        self._training=False
    def act(self, observation, periphral=None):
        if self._training:
            return np.random.randint(4)
        return self.network(observation).argmax(-1).item()
    def record_observation(self, observation_old, action, reward, observation, terminated):
        pass

In [None]:
from utils import plot_one_run
import gymnasium as gym
env = gym.make("LunarLander-v3", render_mode='human')
plot_one_run(env, TrainableAgent(LanderNetwork()), )
env.close()


In [None]:
plt.close()

Q-Learning Agent (pytorch)
==========================

setup
-----

In [2]:
import gymnasium as gym
env = gym.make("LunarLander-v3", render_mode='human')
cont = None

In [3]:
env.observation_space.sample().shape

(8,)

In [4]:
env.action_space

Discrete(4)

In [5]:
import torch
import torch.nn as nn

class LanderNetwork(nn.Module):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.layers = nn.Sequential(
            nn.Linear(8, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 4),
        )
    def forward(self, x):
        return self.layers(x)

Simple Network Agent
--------------------
This is just a simple agent that uses the network but does not train it.


In [10]:
from utils import Agent
class SimpleNetworkAgent(Agent):
    def __init__(self) -> None:
        super().__init__()
        self.network = LanderNetwork()
    def act(self, observation, periphral=None):
        obs = torch.from_numpy(observation)
        action = self.network(obs)
        return action.argmax().item()
    def record_observation(self, observation_old, action, reward, observation, terminated):
        return super().record_observation(observation_old, action, reward, observation, terminated)

sna = SimpleNetworkAgent()
cont = None
runs = [[]]

In [13]:
from utils import plot_one_run

plot_one_run(env, sna,)

In [14]:
env.close()
# plt.close()

In [15]:
import gymnasium as gym
env = gym.make("LunarLander-v3")
cont = None

In [17]:
from utils import run_upto_n_steps, plot_reward_and_episodes
from tqdm import trange

for round in trange(80):
    cont, runs = run_upto_n_steps(env, sna, 150, cont, runs)
    if round % 2 == 0:
        plt.clf()
        plot_reward_and_episodes(runs)
        plt.pause(0.2)

100%|██████████| 80/80 [00:09<00:00,  8.01it/s]


In [18]:
plt.close()

Trainable Network Agent
-----------------------
This agent will train the network by optimizing on the most recent few 
(Observaation, Action, Reward, Observation) quartuples


### Basic

In [19]:
import gymnasium as gym
env = gym.make("LunarLander-v3")
cont = None

In [20]:
import numpy as np
from utils import Agent
class TrainableNetworkAgent(Agent):
    def __init__(self, gamma = 0.9, lr=0.001, update_interval=32, epsilon=0.1) -> None:
        super().__init__()
        self.network = LanderNetwork()
        self.gamma = gamma
        self.update_interval = update_interval
        self.epsilon = epsilon
        self.optim = torch.optim.AdamW(self.network.parameters(), lr)
        self.loss = torch.tensor(0., requires_grad=True)
        self.thoughts = 0
        self.losses = []
        self.events = [] # List of dictionary of events
    def act(self, observation, periphral=None):
        if self.network.training and torch.rand((1,)).item() < self.epsilon:
            return torch.randint(self.network.layers[-1].out_features, (1,)).item()
        obs = torch.from_numpy(observation)
        with torch.no_grad():
            action = self.network(obs)
        return action.argmax().item()
    def record_observation(self, observation, action, reward, observation_next, terminated):
        if not self.network.training:
            return 
        self.thoughts += 1
        if isinstance(observation, np.ndarray):
            observation = torch.from_numpy(observation)
        if isinstance(observation_next, np.ndarray):
            observation_next = torch.from_numpy(observation_next)
        outputs = self.network(observation)
        cr_pred = outputs[action]
        cr_esti = reward + (self.gamma*self.network(observation_next).argmax() if not terminated else 0)

        loss = (cr_esti - cr_pred)**2
        self.loss = self.loss + loss
        self.losses.append(loss.item())
        self.events.append({
            "terminal": terminated,
            "reward": reward
        })
        if self.thoughts % self.update_interval == 0:
            self.update()
        
    def update(self):
        self.optim.zero_grad()
        self.loss.backward()
        self.optim.step()
        self.loss = torch.tensor(0., requires_grad=True)

In [21]:
tna = TrainableNetworkAgent(gamma=0.99, lr=0.01)
cont = None
runs = [[]]

In [25]:
from utils import run_upto_n_steps, plot_reward_and_episodes
from tqdm import trange
from itertools import count
tna.network.train()
training_steps  = 20_000
progbar = trange(training_steps)

initial_steps = tna.thoughts
breakout = False
for round in count():
    try:
        cont, runs = run_upto_n_steps(env, tna, 150, cont, runs)
        progbar.set_description(f"thoughts {tna.thoughts}")
        progbar.update(tna.thoughts-initial_steps-progbar.n)
        if progbar.n-initial_steps >= training_steps:
            breakout = True
        if round % 20 == 0 or breakout:
            # plot_agent(tna)
            plt.clf()
            plt.subplot(2,1,1)
            plt.scatter(*zip(*enumerate(tna.losses)), c=[i['terminal'] for i in tna.events], s=120, alpha=0.8)
            plt.subplot(2, 1, 2)
            plot_reward_and_episodes(runs)
            plt.pause(0.2)
        if round % 300 == 0 or breakout:
            tna.network.eval()
            enviz = gym.make("LunarLander-v3", render_mode='human')
            run_upto_n_steps(enviz, tna, 1_000)
            enviz.close()
            tna.network.train()
        if breakout:
            break
    except KeyboardInterrupt:
        break
progbar.close()

thoughts 380525: : 200268it [07:22, 452.17it/s]                         


In [28]:
from utils import plot_one_run
enviz = gym.make("LunarLander-v3", render_mode='human')
tna.network.eval()
plot_one_run(enviz, tna,)
enviz.close()


In [16]:
# plt.close()
plt.suptitle("Simple DQN")

Text(0.5, 0.98, 'Simple DQN')

### Multiple Parallel environments

This paper [Asynchronous Methods for Deep Reinforcement Learning](https://arxiv.org/abs/1602.01783) 
demonstrated a strategy of training the same agent in multiple different copies 
of the environment at the same time as a method to keep diversity in training 
data.

This section is my attempt to recreate that


steps:
- ~~Seperate runs from agent, make run an array returned along side cont~~
- ~~Verify nothing broke~~
- Implement env&run swap in training loop
- ...?
- Profit?

In [4]:
from utils import TrainableNetworkAgent
import gymnasium as gym
import numpy as np

tna = TrainableNetworkAgent(gamma=0.99, lr=0.01)
envs = [gym.make("LunarLander-v3") for _ in range(5)] 
num_envs = len(envs)
cont = [None]*num_envs
runs = [ [[]] ]*num_envs
# epsilons = np.linspace(0.9, 0.1, num_envs)
epsilons = [0.1]*num_envs

In [5]:
from utils import run_upto_n_steps, plot_reward_and_episodes
from tqdm import trange
from itertools import count

tna.network.train()
training_steps  = 80_000
progbar = trange(training_steps)

initial_steps = tna.thoughts
breakout = False
for round in count():
    try:
        env = round%num_envs
        tna.epsilon = epsilons[env]
        cont[env], runs[env] = run_upto_n_steps(envs[env], tna, 40, cont[env], runs[env])
        progbar.set_description(f"thoughts {tna.thoughts}")
        progbar.update(tna.thoughts-initial_steps-progbar.n)
        if progbar.n-initial_steps >= training_steps:
            breakout = True
        if round % 20 == 0:
            # plot_agent(tna)
            plt.clf()
            plt.subplot(2,1,1)
            plt.scatter(*zip(*enumerate(tna.losses)), c=[i['terminal'] for i in tna.events], s=10, alpha=0.8)
            plt.subplot(2, 1, 2)
            plot_reward_and_episodes(runs[-1])
            plt.pause(0.2)
        if round % 500 == 0:
            tna.network.eval()
            enviz = gym.make("LunarLander-v3", render_mode='human')
            run_upto_n_steps(enviz, tna, 1_000)
            enviz.close()
            tna.network.train()
        if breakout:
            break
    except KeyboardInterrupt:
        break
progbar.close

thoughts 80028: 100%|█████████▉| 79988/80000 [03:14<00:00, 214.38it/s]

<bound method tqdm.close of <tqdm.std.tqdm object at 0x7ee1a8aed450>>

In [6]:
enviz = gym.make("LunarLander-v3", render_mode='human')
tna.network.eval()
for i in trange(5):
    run_upto_n_steps(enviz, tna, 1_000)
# input()
enviz.close()


100%|██████████| 5/5 [00:21<00:00,  4.39s/it]                         


In [22]:
plt.close()

# Random Agent

In [1]:
%matplotlib tk
# %matplotlib qt

In [2]:
from utils import Agent, run_upto_n_steps
import numpy as np
class RandLander(Agent):
    def act(self, observation, periphral=None):
        return np.random.choice([0,1,2,3])
    def record_observation(self, observation_old, action, reward, observation, terminated):
        pass

rand_agent = RandLander()



In [3]:
from utils import Agent, run_upto_n_steps
import gymnasium as gym

env = gym.make("LunarLander-v3", render_mode="human")
cont = None

In [4]:
import logging
# logging.basicConfig(level=logging.INFO) # If your interested in some logs

In [5]:
from utils import run_and_plot

for round in range(100):
    cont = run_and_plot(env, rand_agent, 50, cont)

In [6]:
env.close()

In [7]:
import gymnasium as gym
env = gym.make("LunarLander-v3")
cont = None

In [10]:
from utils import run_and_plot

for round in range(100):
    cont = run_and_plot(env, rand_agent, 150, cont)

# Utils

In [3]:
import gymnasium as gym

env = gym.make("LunarLander-v3", render_mode="human")
observation, info = env.reset()

episode_over = False
while not episode_over:
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)

    episode_over = terminated or truncated

env.close()

define a function that takes a model and an environment, and run the model in the environment for *n* steps

In [22]:
import gymnasium as gym
from abc import ABC, abstractmethod

class Agent(ABC):
    @abstractmethod
    def act(self, observation, periphral=None):
        pass
    @abstractmethod
    def think(self, observation_old, action, reward, observation):
        pass

def run_n_steps(env, agent: Agent, n, continuation=None):
    if continuation is not None:
        observation, reward, terminated, truncated, info = continuation
    if continuation is None or terminated or truncated:
        print("Resetting")
        observation, info = env.reset()
        terminated = False
        truncated = False
        reward = 0
    step = 0
    # print((observation, reward, terminated, truncated, info))
    while not terminated and not truncated and step < n:
        action = agent.act(observation)
        observation_old = observation
        observation, reward, terminated, truncated, info = env.step(action)
        agent.think(observation_old, action, reward, observation)
        step += 1
    return (observation, reward, terminated, truncated, info)

In [None]:
env = gym.make("LunarLander-v3", render_mode='human')

class RandomLunar(Agent):
    def act(self, observation, periphral=None):
        return env.action_space.sample()
    def think(self, observation_old, action, reward, observation):
        return super().think(observation_old, action, reward, observation)
randAgent = RandomLunar()


cont = run_n_steps(env, randAgent, 10)

In [93]:
cont = run_n_steps(env, randAgent, 10, cont)

In [94]:
env.close()