# Let's dive into the world of Reinforcement Learning: from concepts to code

> We are what we repeatedly do, therefore, excellence is not an act, but a habit. - Aristotle

In [2]:
import os 
from collections import deque
from IPython.display import Image

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

In [3]:
seed = 777

torch.manual_seed(seed)
np.random.seed(seed)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

  return torch._C._cuda_getDeviceCount() > 0


'cpu'

In [4]:
env_id = {
    'Cart Pole': 'CartPole-v1', 
    'Frozen Lake': 'FrozenLake-v1',
    'Taxi': 'Taxi-v3'
}

render_mode = 'human'
n_training_episodes = 3000
n_eval_episodes = 100
lr = 0.05
max_steps = 99
gamma = 0.95
eval_seed = range(n_eval_episodes)
min_epsilon, max_epsilon = 0.05, 1.0
decay_rate = 0.0005

In [5]:
env = gym.make(env_id['Frozen Lake'], render_mode='rgb_array')

print(f"The environment's observation space: {env.observation_space.n}")
print(f"The environment's action space: {env.action_space.n}")

The environment's observation space: 16
The environment's action space: 4


In [9]:
env.reset()

(0, {'prob': 1})

# Policy Gradient

In [6]:
class PolicyNN(nn.Module): 
    def __init__(self, state_size, action_size, hidden_size):
        super().__init__()
        self.state_size = state_size

        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size*2)
        self.fc3 = nn.Linear(hidden_size*2, action_size)

    def forward(self, x): 
        x = F.relu(self.fc1(x)) 
        x = F.relu(self.fc2(x)) 
        x = self.fc3(x) 
        x = F.softmax(x, dim=1)
        return x

    def act(self, state): 
        # state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        state = torch.tensor(state, dtype=torch.float).unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()

        return action.item(), m.log_prob(action)


In [7]:
env.reset()

(0, {'prob': 1})

In [8]:
state_space = env.observation_space.n
action_space = env.action_space.n
deadline = 100

policy = PolicyNN(state_size=state_space, 
                action_size=action_space,
                hidden_size=258).to(device)

optimizer = optim.Adam(policy.parameters(), lr=lr)
scores_deque = deque(maxlen=100)
scores = []

for i_episode in tqdm(range(n_training_episodes)): 
    save_log_probs = []
    rewards = []
    state = env.reset()

    # selected_state = np.zeros(shape=state_space)
    # selected_state[state[0]] = state[1]['prob']

    for t in range(max_steps): 
        action, log_prob = policy.act(state[0])
        save_log_probs.append(log_prob)
        state, reward, terminated, truncated, info = env.step(action)
        rewards.append(reward)

        if terminated or truncated: 
            break

    scores_deque.append(sum(rewards))
    scores.append(sum(rewards))

    returns = deque(maxlen=max_steps)
    n_steps = len(rewards)

    for t in range(n_steps)[::-1]: 
        disc_return_t = returns[0] if len(returns) > 0 else 0
        returns.appendleft(gamma*disc_return_t + rewards[t])

    eps = np.finfo(np.float32).eps.item()

    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + eps)

    policy_loss = []
    for log_prob, disc_return in zip(save_log_probs, returns): 
        policy_loss.append(-log_prob*disc_return)

    policy_loss = torch.cat(policy_loss).sum()

    optimizer.zero_grad()
    policy_loss.backward()
    optimizer.step()

    if i_episode % deadline == 0: 
        print(f"Episode {i_episode}\tAverage Score: {np.mean(scores_deque):.4f}")

  0%|          | 0/3000 [00:00<?, ?it/s]

RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x1 and 16x258)