In [1]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

from tqdm.notebook import tqdm

import gymnasium as gym

from collections import deque

In [2]:
class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy,self).__init__()

        self.fc1 = nn.Linear(s_size,h_size)
        self.fc2 = nn.Linear(h_size,a_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)
    
    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
    scores_deque = deque(maxlen=100)
    scores = []

    for i_episode in tqdm(range(1, n_training_episodes+1)):
        saved_log_prob = []
        rewards = []
        state, info = env.reset()

        for t in range(max_t):
            action, log_prob = policy.act(state)
            saved_log_prob.append(log_prob)
            state, reward, done, trunc, info = env.step(action)
            rewards.append(reward)
            if(done or trunc):
                break

        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))

        returns = deque(maxlen=max_t)
        n_stepes = len(rewards)

        for t in range(n_stepes)[::-1]:
            disc_return_t = returns[0] if len(returns) > 0 else 0
            returns.appendleft(gamma * disc_return_t + rewards[t])

        eps = np.finfo(np.float32).eps.item()
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

        policy_loss = []
        for log_prob, disc_return in zip(saved_log_prob, returns):
            policy_loss.append(-log_prob* disc_return)
        
        policy_loss = torch.cat(policy_loss).sum()

        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if i_episode % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))
    return scores


def evaluate_agent(env, max_steps, n_eval_episodes, policy):

    episode_rewards = []
    for episode in tqdm(range(n_eval_episodes)):
        state, info = env.reset()
        step = 0
        done = False
        total_rewards_ep = 0

        for step in range(max_steps):
            action, _ =policy.act(state)
            new_state, reward, done,trunc,info = env.step(action)
            total_rewards_ep += reward

            if done or trunc:
                break

            state = new_state
        episode_rewards.append(total_rewards_ep)

    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)

    return mean_reward, std_reward


In [3]:

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

#env_id = "CartPole-v1"
env_id = "Pixelcopter-PLE-v0"

env = gym.make(env_id)

s_size = env.observation_space.shape[0]
a_size = env.action_space.n

cartpole_hyperparameters = {
    "h_size": 16,
    "n_training_episodes" : 100,
    "n_evaluation_episodes" : 10,
    "max_t": 1000,
    "gamma": 1.0,
    "lr":1e-2,
    "env_id":env_id,
    "state_space": s_size,
    "action_space": a_size
}




cartpole_policy = Policy(
    cartpole_hyperparameters["state_space"],
    cartpole_hyperparameters["action_space"],
    cartpole_hyperparameters["h_size"]
).to(device)
cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"])

NameNotFound: Environment `Pixelcopter-PLE` doesn't exist.

In [12]:
scores= reinforce(
    cartpole_policy,
    cartpole_optimizer,
    cartpole_hyperparameters["n_training_episodes"],
    cartpole_hyperparameters["max_t"],
    cartpole_hyperparameters["gamma"],
    100,
)

  0%|          | 0/100 [00:00<?, ?it/s]

Episode 100	Average Score: 260.21


In [13]:
evaluate_agent(
    env, cartpole_hyperparameters["max_t"], cartpole_hyperparameters["n_evaluation_episodes"], cartpole_policy
)

  0%|          | 0/10 [00:00<?, ?it/s]

(np.float64(500.0), np.float64(0.0))

In [14]:
import time
environement = gym.make(env_id, render_mode="human")
state, info = environement.reset()



while True:
    action, _ = cartpole_policy.act(state)

    state, reward, done, trunc, info = environement.step(action)

    
    

    if  done or  trunc:
        state, info = environement.reset()


KeyboardInterrupt: 

In [47]:
policy = Policy(s_size,a_size,64).to(device)
state, info = env.reset()
policy.act(state)

tensor([[0.5351, 0.4649]], grad_fn=<ToCopyBackward0>)
Categorical(probs: torch.Size([1, 2]))


(0, tensor([-0.6253], grad_fn=<SqueezeBackward1>))