<a href="https://colab.research.google.com/github/Leo-Lifeblood/Projects/blob/main/VPG_and_A2C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


In [None]:
import gymnasium as gym
import torch
from torch import nn
import torch.nn.functional as F
from torch.nn.init import xavier_uniform_
import numpy as np

#this worked out of the box the changes I made are just technical

GAMMA = 0.99
LEARNING_RATE = 0.001
BATCH_SIZE = 4
DEVICE = torch.device('cpu')


class XavierLinear(nn.Linear):
    def __init__(self, in_features: int, out_features: int, bias: bool = True, device=None, dtype=None) -> None:
        super().__init__(in_features, out_features, bias, device, dtype)
        xavier_uniform_(self.weight)


class VPG(nn.Module):
    def __init__(self, input_size, output_size):
        super(VPG, self).__init__()
        self.net = nn.Sequential(
            XavierLinear(input_size, 128),
            nn.ReLU(),
            XavierLinear(128, output_size),
        )

    def forward(self, x):
        return F.softmax(self.net(x), dim=0)


def run_episode(model, env):
    obs = env.reset()[0]
    obs = torch.Tensor(env.reset()[0]).to(DEVICE)
    te = tr = False
    rewards, outputs, actions = [], [], []
    while not (te or tr):
        probs = model(obs)
        action = probs.multinomial(1).item()
        obs, r, te, tr, _ = env.step(action)
        obs = torch.Tensor(obs).to(DEVICE)
        if (te or tr):
            r = 0 # Here I also tried -1 if the pole falls and 0 otherwise
        rewards.append(r)
        outputs.append(probs)
        actions.append(action)
    return torch.Tensor(rewards).to(DEVICE), torch.concatenate(outputs).reshape(len(rewards), 2), actions

def discount_rewards(rewards):
    discounted_r = torch.zeros_like(rewards)
    additive_r = 0
    for idx in range(len(rewards)-1, -1, -1):
        to_add = GAMMA * additive_r
        additive_r = to_add + rewards[idx]
        discounted_r[idx] = additive_r
    return discounted_r.to(DEVICE)

def loss_function(discounted_r, probs, actions):
    logprobs = torch.log(probs)
    selected = logprobs[range(probs.shape[0]), actions]
    discounted_r = (discounted_r - discounted_r.mean()) / discounted_r.std() #take this out of the comment
    weighted = selected * discounted_r
    return -weighted.mean() # change this to mean from sum so that larger batch sizes dont do damage it actually makes convergence slightly slower but

# The actual training loop:

episode_total_reward = 0
batch_losses = torch.Tensor().to(DEVICE)
batch_actions = []
batch_disc_r = torch.Tensor().to(DEVICE)
batch_probs = torch.Tensor().to(DEVICE)
best_ep_reward = 0
losses, ep_total_lenghts = [], [0]

episodes = 0
TARGET_REWARD = 100

env = gym.make("CartPole-v1")
model = VPG(env.observation_space.shape[0],
            2).to(DEVICE)
optim = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

while np.array(ep_total_lenghts)[-100:].mean() < TARGET_REWARD:
    rewards, probs, actions = run_episode(model, env)
    discounted_r = discount_rewards(rewards)
    episode_total_reward = rewards.shape[0]
    ep_total_lenghts.append(episode_total_reward)
    episodes += 1
    batch_actions += actions
    batch_disc_r = torch.concatenate([batch_disc_r, discounted_r])
    batch_probs = torch.concatenate([batch_probs, probs])

    if episodes % BATCH_SIZE == 0:
        loss = loss_function(batch_disc_r, batch_probs, batch_actions)
        losses.append(loss.item())
        model.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.001) # this doesnt need to be here but I added it for the idea of stability
        optim.step()
        batch_actions = []
        batch_disc_r = torch.Tensor().to(DEVICE)
        batch_probs = torch.Tensor().to(DEVICE)
        print(f"Episode {episodes}. Loss: {loss}. Reward: {episode_total_reward}")
print(f"Success in {episodes} episodes. Loss: {loss}. Reward: {episode_total_reward}")

Episode 4. Loss: -0.008065112866461277. Reward: 22
Episode 8. Loss: 0.015668725594878197. Reward: 24
Episode 12. Loss: 0.048676639795303345. Reward: 14
Episode 16. Loss: -0.03799426183104515. Reward: 13
Episode 20. Loss: -0.061613745987415314. Reward: 11
Episode 24. Loss: 0.005115847568958998. Reward: 23
Episode 28. Loss: 0.009838522411882877. Reward: 38
Episode 32. Loss: 0.005891811568289995. Reward: 13
Episode 36. Loss: 0.007423750590533018. Reward: 14
Episode 40. Loss: -0.001050323247909546. Reward: 28
Episode 44. Loss: 0.00690933782607317. Reward: 28
Episode 48. Loss: 0.007801996544003487. Reward: 15
Episode 52. Loss: 0.008426588959991932. Reward: 21
Episode 56. Loss: -0.014807883650064468. Reward: 37
Episode 60. Loss: 0.010726138018071651. Reward: 15
Episode 64. Loss: -0.04424520209431648. Reward: 17
Episode 68. Loss: 0.0066779600456357. Reward: 25
Episode 72. Loss: -0.004824013914912939. Reward: 26
Episode 76. Loss: -6.550866964971647e-05. Reward: 30
Episode 80. Loss: -0.01940113

In [None]:
#this is what you wrote as A2C instead

import gymnasium as gym
import torch
from torch import nn
import torch.nn.functional as F
from torch.nn.init import xavier_uniform_
import numpy as np

GAMMA = 0.99
LEARNING_RATE = 0.001
BATCH_SIZE = 4
DEVICE = torch.device('cpu')


class XavierLinear(nn.Linear):
    def __init__(self, in_features: int, out_features: int, bias: bool = True, device=None, dtype=None) -> None:
        super().__init__(in_features, out_features, bias, device, dtype)
        xavier_uniform_(self.weight)


class VPG(nn.Module):
    def __init__(self, input_size, output_size):
        super(VPG, self).__init__()
        self.net = nn.Sequential(
            XavierLinear(input_size, 128),
            nn.ReLU(),
            XavierLinear(128, 128),
            nn.ReLU()
        )
        self.policy_head = XavierLinear(128, output_size)
        self.value_head = XavierLinear(128, 1)

    def forward(self, x):
        x = self.net(x)
        return F.softmax(self.policy_head(x), dim=-1), self.value_head(x)


def run_episode(model, env):
    obs = env.reset()[0]
    obs = torch.Tensor(env.reset()[0]).to(DEVICE)
    te = tr = False
    rewards, values, outputs, actions = [], [], [], []
    while not (te or tr):
        probs, value = model(obs)
        action = probs.multinomial(1).item()
        obs, r, te, tr, _ = env.step(action)
        obs = torch.Tensor(obs).to(DEVICE)
        if (te or tr):
            r = 0 # Here I also tried -1 if the pole falls and 0 otherwise
        rewards.append(r)
        values.append(value)
        outputs.append(probs)
        actions.append(action)
    return torch.Tensor(rewards).to(DEVICE), torch.Tensor(values).to(DEVICE), torch.concatenate(outputs).reshape(len(rewards), 2), actions

def discount_rewards(rewards):
    discounted_r = torch.zeros_like(rewards)
    additive_r = 0
    for idx in range(len(rewards)-1, -1, -1):
        to_add = GAMMA * additive_r
        additive_r = to_add + rewards[idx]
        discounted_r[idx] = additive_r
    return discounted_r.to(DEVICE)

def loss_function(discounted_r, values, probs, actions):
    logprobs = torch.log(probs)
    selected = logprobs[range(probs.shape[0]), actions]
    discounted_r = (discounted_r - discounted_r.mean()) / discounted_r.std()
    v = values.detach()
    advantage = discounted_r - v
    weighted = selected * advantage

    value_loss = F.mse_loss(values, discounted_r.detach())

    return -weighted.mean() + (value_loss.mean()*0.5)

# The actual training loop:

episode_total_reward = 0
batch_losses = torch.Tensor().to(DEVICE)
batch_actions = []
batch_disc_r = torch.Tensor().to(DEVICE)
batch_values = torch.Tensor().to(DEVICE)
batch_probs = torch.Tensor().to(DEVICE)
best_ep_reward = 0
losses, ep_total_lenghts = [], [0]

episodes = 0
TARGET_REWARD = 100#400

env = gym.make("CartPole-v1")
model = VPG(env.observation_space.shape[0],
            2).to(DEVICE)
optim = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

while np.array(ep_total_lenghts)[-100:].mean() < TARGET_REWARD:
    rewards, values, probs, actions = run_episode(model, env)
    discounted_r = discount_rewards(rewards)
    episode_total_reward = rewards.shape[0]
    ep_total_lenghts.append(episode_total_reward)
    episodes += 1
    batch_actions += actions
    batch_disc_r = torch.concatenate([batch_disc_r, discounted_r])
    batch_values = torch.concatenate([batch_values, values])
    batch_probs = torch.concatenate([batch_probs, probs])

    if episodes % BATCH_SIZE == 0:
        loss = loss_function(batch_disc_r, batch_values, batch_probs, batch_actions)
        losses.append(loss.item())
        model.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.001)
        optim.step()
        batch_actions = []
        batch_disc_r = torch.Tensor().to(DEVICE)
        batch_values = torch.Tensor().to(DEVICE)
        batch_probs = torch.Tensor().to(DEVICE)
        print(f"Episode {episodes}. Loss: {np.round(loss.detach().item(), decimals=3)}. Mean reward: {np.round(np.array(ep_total_lenghts)[-100:].mean(), decimals=2)}. Reward: {episode_total_reward}.")
print(f"Success in {episodes} episodes. Loss: {np.round(loss.detach().item(), decimals=5)}. Reward: {episode_total_reward}.")


Episode 4. Loss: 0.56. Mean reward: 17.4. Reward: 36.
Episode 8. Loss: 0.574. Mean reward: 15.78. Reward: 16.
Episode 12. Loss: 0.57. Mean reward: 18.46. Reward: 32.
Episode 16. Loss: 0.557. Mean reward: 18.35. Reward: 14.
Episode 20. Loss: 0.564. Mean reward: 19.29. Reward: 15.
Episode 24. Loss: 0.552. Mean reward: 19.72. Reward: 15.
Episode 28. Loss: 0.582. Mean reward: 24.0. Reward: 84.
Episode 32. Loss: 0.539. Mean reward: 24.67. Reward: 26.
Episode 36. Loss: 0.523. Mean reward: 24.49. Reward: 36.
Episode 40. Loss: 0.595. Mean reward: 23.71. Reward: 15.
Episode 44. Loss: 0.555. Mean reward: 23.53. Reward: 13.
Episode 48. Loss: 0.551. Mean reward: 25.31. Reward: 24.
Episode 52. Loss: 0.529. Mean reward: 25.42. Reward: 20.
Episode 56. Loss: 0.551. Mean reward: 26.51. Reward: 23.
Episode 60. Loss: 0.523. Mean reward: 26.84. Reward: 17.
Episode 64. Loss: 0.534. Mean reward: 27.86. Reward: 24.
Episode 68. Loss: 0.521. Mean reward: 27.39. Reward: 31.
Episode 72. Loss: 0.562. Mean reward:

In [None]:
import gymnasium as gym
import torch
from torch import nn
import torch.nn.functional as F
from torch.nn.init import xavier_uniform_
import numpy as np
from torch.distributions import Categorical

GAMMA = 0.99
LEARNING_RATE = 0.001
BATCH_SIZE = 4
DEVICE = torch.device('cpu')


class XavierLinear(nn.Linear):
    def __init__(self, in_features: int, out_features: int, bias: bool = True, device=None, dtype=None) -> None:
        super().__init__(in_features, out_features, bias, device, dtype)
        xavier_uniform_(self.weight)


class VPG(nn.Module):
    def __init__(self, input_size, output_size):
        super(VPG, self).__init__()
        self.net = nn.Sequential(
            XavierLinear(input_size, 128),
            nn.ReLU(),
            XavierLinear(128, output_size),
        )

    def forward(self, x):
        return F.softmax(self.net(x), dim=0)


def run_episode(model, env):
    obs = env.reset()[0]
    obs = torch.Tensor(env.reset()[0]).to(DEVICE)
    te = tr = False
    rewards, outputs, actions = [], [], []
    while not (te or tr):
        probs = model(obs)
        action = probs.multinomial(1).item()
        obs, r, te, tr, _ = env.step(action)
        obs = torch.Tensor(obs).to(DEVICE)
        if (te or tr):
            r = 0 # Here I also tried -1 if the pole falls and 0 otherwise
        rewards.append(r)
        outputs.append(probs)
        actions.append(action)
    return torch.Tensor(rewards).to(DEVICE), torch.concatenate(outputs).reshape(len(rewards), 2), actions

def discount_rewards(rewards):
    discounted_r = torch.zeros_like(rewards)
    additive_r = 0
    for idx in range(len(rewards)-1, -1, -1):
        to_add = GAMMA * additive_r
        additive_r = to_add + rewards[idx]
        discounted_r[idx] = additive_r
    return discounted_r.to(DEVICE)

def loss_function(discounted_r, probs, actions):
    selected = Categorical(probs).log_prob(torch.Tensor(actions).to(DEVICE))
    discounted_r = (discounted_r - discounted_r.mean()) / discounted_r.std()
    weighted = selected * discounted_r
    return -weighted.sum()

# The actual training loop:

episode_total_reward = 0
batch_losses = torch.Tensor().to(DEVICE)
batch_actions = []
batch_disc_r = torch.Tensor().to(DEVICE)
batch_probs = torch.Tensor().to(DEVICE)
best_ep_reward = 0
losses, ep_total_lenghts = [], [0]

episodes = 0
TARGET_REWARD = 100

env = gym.make("CartPole-v1")
model = VPG(env.observation_space.shape[0],
            2).to(DEVICE)
optim = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

while np.array(ep_total_lenghts)[-100:].mean() < TARGET_REWARD:
    rewards, probs, actions = run_episode(model, env)
    discounted_r = discount_rewards(rewards)
    episode_total_reward = rewards.shape[0]
    ep_total_lenghts.append(episode_total_reward)
    episodes += 1
    batch_actions += actions
    batch_disc_r = torch.concatenate([batch_disc_r, discounted_r])
    batch_probs = torch.concatenate([batch_probs, probs])

    if episodes % BATCH_SIZE == 0:
        loss = loss_function(batch_disc_r, batch_probs, batch_actions)
        losses.append(loss.item())
        model.zero_grad()
        loss.backward()
        optim.step()
        batch_actions = []
        batch_disc_r = torch.Tensor().to(DEVICE)
        batch_probs = torch.Tensor().to(DEVICE)
        print(f"Episode {episodes}. Loss: {loss}. Reward: {episode_total_reward}")
print(f"Success in {episodes} episodes. Loss: {loss}. Reward: {episode_total_reward}")


Episode 4. Loss: 0.5146181583404541. Reward: 17
Episode 8. Loss: 0.34052616357803345. Reward: 12
Episode 12. Loss: -1.1095318794250488. Reward: 12
Episode 16. Loss: -0.1488935649394989. Reward: 19
Episode 20. Loss: -0.3533565104007721. Reward: 16
Episode 24. Loss: -0.4083758592605591. Reward: 29
Episode 28. Loss: -0.6365123987197876. Reward: 12
Episode 32. Loss: -0.4132089614868164. Reward: 46
Episode 36. Loss: -0.38385558128356934. Reward: 27
Episode 40. Loss: -2.031681776046753. Reward: 23
Episode 44. Loss: -0.150732159614563. Reward: 16
Episode 48. Loss: -1.3684639930725098. Reward: 32
Episode 52. Loss: -0.0780143141746521. Reward: 10
Episode 56. Loss: -1.5128607749938965. Reward: 27
Episode 60. Loss: -0.8497529029846191. Reward: 21
Episode 64. Loss: -1.2733001708984375. Reward: 18
Episode 68. Loss: -5.7394819259643555. Reward: 13
Episode 72. Loss: -1.0529409646987915. Reward: 22
Episode 76. Loss: 0.9459303617477417. Reward: 19
Episode 80. Loss: -2.1868011951446533. Reward: 12
Episo