In [5]:
import gymnasium as gym 
import torch
import numpy as np
import random

episodes = 1000
lr = 1e-3
discount = 0.9
batch = 10
resets = 50
epsilon = 0.2
c1 = 0.5                
c2 = 0.01
lamb = 0.95  

env = gym.make("CartPole-v1")

#actor inputs the state and outputs a single action
actor = torch.nn.Sequential(
        torch.nn.Linear(4, 32), # 4 dimensional state vector input
        torch.nn.ReLU(),
        torch.nn.Linear(32, 32),
        torch.nn.ReLU(),
        torch.nn.Linear(32, env.action_space.n),
        torch.nn.Softmax(dim=-1)
)

#critc inputs state and outputs a value for each possible state
critic  = torch.nn.Sequential(
    torch.nn.Linear(4,32),
    torch.nn.ReLU(),
    torch.nn.Linear(32,32),
    torch.nn.ReLU(),
    torch.nn.Linear(32,1),

)

optimizer = torch.optim.Adam(list(actor.parameters()) + list(critic.parameters()), lr=lr)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [7]:
for ep in range(episodes):
    obs, _ = env.reset()
    obs = torch.tensor(obs, dtype=torch.float32).to(device)
    done = False
    totalrew = 0
    step = 0

    states = []
    actions = []
    rewards = []
    dones = []
    log_probs = []
    values = []

    while not done:
        #actor outputs action
        dist = torch.distributions.Categorical(actor(obs))
        action = dist.sample()
        logprob = dist.log_prob(action)
        
        #critc outputs value from obs
        vals = critic(obs)

        #act in environment
        next_obs, reward, terminated, truncated, info = env.step(action.item())
        done = terminated or truncated

        #collect data from rollout/trajectory - replay buffer
        log_probs.append(logprob)
        states.append(obs)
        values.append(vals.item())
        actions.append(action)
        rewards.append(reward)
        totalrew += reward
        obs = torch.tensor(next_obs, dtype=torch.float32).to(device)

    #Calculate GAE Advantages and returns
    T = len(rewards)
    advs = torch.zeros(T, device=device)
    returns = torch.zeros(T, device=device)
    lastgaelam = 0.0

    values.append(0.0)
    for t in reversed(range(T)):
        delta = rewards[t] + discount * values[t + 1] - values[t]
        lastgaelam = delta + discount * lamb * lastgaelam
        advs[t] = lastgaelam
        returns[t] = lastgaelam + values[t]
    
    Lclip = 0
    Lvf = 0
    Lent = 0

    #update actor policy
    for t in range(T):
        #calculate ratio
        dist = torch.distributions.Categorical(actor(states[t])) # current policy prediction
        newlogprob = dist.log_prob(actions[t])
        ratio = torch.exp(newlogprob - log_probs[t]) #sample old probs from replay buffer

        advantage = advs[t]
        
        #clip loss calc for actor
        clipped = torch.clamp(ratio, 1 - epsilon, 1 + epsilon)
        Lclip += -torch.min(ratio*advantage, clipped*advantage)

        #squared error value loss for critic
        pred = critic(states[t]) #new val pred from critc
        Lvf += (pred - returns[t])**2

        #entropy bonus
        Lent += -dist.entropy() #negative because log loss

    loss = ((Lclip + c1 * Lvf + c2 * Lent) / T)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if ep%100 == 0:
        print(ep, ":", totalrew)


0 : 13.0
100 : 17.0


KeyboardInterrupt: 

In [3]:
env = gym.make("CartPole-v1", render_mode="human")

for episode in range(5):  # Show 3 episodes
    obs, _ = env.reset()
    done = False
    total_reward = 0

    while not done:
        obs_tensor = torch.tensor(obs, dtype=torch.float32).to(device)
        probs = actor(obs_tensor)
        action = torch.argmax(probs).item()

        obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward

    print(f"Episode {episode+1} finished with reward {total_reward}")

env.close()

Episode 1 finished with reward 12.0
Episode 2 finished with reward 19.0
Episode 3 finished with reward 16.0
Episode 4 finished with reward 16.0
Episode 5 finished with reward 18.0
