In [1]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical
from collections import deque
import random
from itertools import count

In [2]:
class NeuralNet(nn.Module):
    def __init__(self, input_size, output_size, hidden_size=128):
        super(NeuralNet, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [42]:
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4
global steps_done
steps_done = 0

In [61]:
def select_action(state, policy_net):
    state = torch.tensor(state, dtype=torch.float32)
    probs = F.softmax(policy_net(state), dim=-1)
    m = Categorical(probs)
    action = m.sample()
    return action.item(), m.log_prob(action)


def boltzmann_select_action(state, policy, temperature):
    state = torch.tensor(state, dtype=torch.float32)
    # Compute the exponentiated Q-values
    exp_q_values = torch.exp(policy(state) / temperature)
    # Compute the probabilities using the Boltzmann distribution
    probabilities = exp_q_values / torch.sum(exp_q_values)
    # Compute the log probabilities
    log_probs = torch.log(probabilities)
    # Sample an action from the probabilities
    action = torch.multinomial(probabilities, 1).item()
    return action, log_probs[action]

def boltzmann_select_action_mod(state, policy_net, temperature):
    state = torch.tensor(state, dtype=torch.float32)
    q_values = policy_net(state)
    q_values_detached = q_values.detach()

    # Subtract the max value for numerical stability
    q_values_detached -= torch.max(q_values_detached)

    # Compute softmax probabilities
    exp_q_values = torch.exp(q_values_detached / temperature)
    probabilities = exp_q_values / torch.sum(exp_q_values)

    # Check for any NaN or negative values and replace them with a small positive number
    if torch.isnan(probabilities).any() or (probabilities < 0).any():
        probabilities[torch.isnan(probabilities) | (probabilities < 0)] = 1e-10

    # Normalize the probabilities again in case they don't sum to 1
    probabilities /= probabilities.sum()

    action = torch.multinomial(probabilities, 1).item()
    log_prob = torch.log(probabilities[action])
    return action, log_prob

def select_action_eps(state, policy_net):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * np.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    state = torch.tensor(state, dtype=torch.float32)
    probs = F.softmax(policy_net(state), dim=-1)
    if sample > eps_threshold:
        action = probs.argmax()
    else:
        action = torch.multinomial(probs, num_samples=1)
    return action.item(), torch.log(probs[action])

def update_policy(rewards, log_probs, optimizer_policy, std_baseline_values = None, optimizer_value = None):
    returns = []
    discounted_reward = 0
    for r in rewards[::-1]:
        discounted_reward = r + 0.99 * discounted_reward
        returns.insert(0, discounted_reward)
    returns = torch.tensor(returns, dtype=torch.float32)
    returns = (returns - returns.mean()) / (returns.std() + 1e-9) # normalize


    policy_loss = []
    if std_baseline_values is not None:
        for log_prob, R, baseline in zip(log_probs, returns, std_baseline_values):
            policy_loss.append(-log_prob * (R - baseline))
    else:
        for log_prob, R in zip(log_probs, returns):
            policy_loss.append(-log_prob * R)
            
    optimizer_policy.zero_grad()
    policy_loss = torch.stack(policy_loss).sum()
    policy_loss.backward(retain_graph=True)
    optimizer_policy.step()

    if optimizer_value is not None:
        value_loss = F.mse_loss(std_baseline_values.squeeze(1), returns)
        optimizer_value.zero_grad()
        value_loss.backward()
        optimizer_value.step()

In [64]:
env_name = 'LunarLander-v2'
env = gym.make(env_name)
env_render = gym.make(env_name, render_mode = "human")

policy_net = NeuralNet(env.observation_space.shape[0], env.action_space.n)
value_net = NeuralNet(env.observation_space.shape[0], 1)
optimizer_policy = optim.Adam(policy_net.parameters(), lr=0.001)
optimizer_value = optim.Adam(value_net.parameters(), lr=0.001)

In [65]:
use_baseline = True

num_episodes = 10000
avg_rewards = deque(maxlen=150)
values = None
temperature = 1.0
global epsilon
for episode in range(num_episodes):
    state, info = env.reset()
    done = False
    rewards = []
    log_probs = []
    states = []
    for t in count():
        #action, log_prob = select_action(state, policy_net)
        # action, log_prob = boltzmann_select_action(state, policy_net, temperature)
        # action, log_prob = select_action_eps(state, policy_net)
        action, log_prob = boltzmann_select_action_mod(state, policy_net, temperature)
        next_state, reward, term, trunc, _ = env.step(action)
        rewards.append(reward)
        log_probs.append(log_prob)
        states.append(state)
        state = next_state
        done = term or trunc
        if done:
            break
    if use_baseline:
        values = value_net(torch.tensor(states, dtype=torch.float32))
        values = (values - values.mean()) / (values.std() + 1e-9) # normalize
    update_policy(rewards, log_probs, optimizer_policy, values, optimizer_value)
    avg_rewards.append(sum(rewards))
    
    if episode % 150 == 0:
        print(f"Episode {episode} - Reward: {sum(rewards)} - Avg Reward: {np.mean(avg_rewards)}")
    temperature *= 0.999
    # if episode % 1500 == 0:
       #  temperature -= 0.05
    
    #if np.mean(avg_rewards) > 200 and episode > 100:
    #    break


Episode 0 - Reward: -222.02066719434805 - Avg Reward: -222.02066719434805
Episode 150 - Reward: -301.14937200604425 - Avg Reward: -176.56855014278923
Episode 300 - Reward: -115.2943792516303 - Avg Reward: -185.26372916682013
Episode 450 - Reward: -157.33550104088562 - Avg Reward: -168.07469438430218
Episode 600 - Reward: -226.7300520382715 - Avg Reward: -181.5900048880132
Episode 750 - Reward: -115.55054287417413 - Avg Reward: -186.34216698657525
Episode 900 - Reward: -201.97017312441767 - Avg Reward: -172.08914726320936
Episode 1050 - Reward: -103.57071141742203 - Avg Reward: -169.51934945903542
Episode 1200 - Reward: -75.66753591981183 - Avg Reward: -191.32307124125077
Episode 1350 - Reward: -133.1714607652639 - Avg Reward: -185.55744786947304
Episode 1500 - Reward: -135.1597003762604 - Avg Reward: -193.79809167527517
Episode 1650 - Reward: -340.7800349807271 - Avg Reward: -185.064790250999
Episode 1800 - Reward: -7.121707333457195 - Avg Reward: -194.0847782543694
Episode 1950 - Rewa

KeyboardInterrupt: 

In [23]:
torch.save(policy_net.state_dict(), f"policy_{env_name}.pth")

In [26]:
env_render = gym.make(env_name, render_mode = "human")
env_render = gym.make(env_name)
deq = deque(maxlen=100)
for m in range(100):
    state, info = env_render.reset()
    total_reward = 0
    for t in count():
        action, _ = select_action(state, policy_net)
        state, reward, term, trunc, _ = env_render.step(action)
        total_reward += reward
        if term or trunc:
            break
    deq.append(total_reward)

print(f"Total Reward: {total_reward}")
print(f"Average Reward: {np.mean(deq)}")
env_render.close()


Total Reward: 259.68489652746996
Average Reward: 176.7315248638158
