In [2]:
import numpy as np
import gymnasium as gym
import os
import tqdm
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from collections import deque
from IPython.display import Image
from matplotlib import animation

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


### Initialize the environment

In [4]:
# Initialize the environment
env = gym.make('LunarLander-v3') #render_mode="human"

state_space = env.observation_space.shape[0]
print('State Space:', state_space)
action_space = env.action_space.n
print('Action Space:', action_space)

State Space: 8
Action Space: 4


### Policy

In [6]:
# Policy Network
class Policy(nn.Module):
    def __init__(self , s_size , a_size , h_size ):
        super (Policy , self ).__init__ ()
        self.fc1 = nn.Linear( s_size , h_size )
        self.fc2 = nn.Linear( h_size , h_size * 2)
        self.fc3 = nn.Linear( h_size * 2, a_size )
    def forward(self , x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return F.softmax(x, dim =1)
    def act(self, state ):
        state = torch.from_numpy(state).float().unsqueeze(0)  #.to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        # Random action
        action = m.sample()
        return action.item() , m.log_prob(action)

### REINFORCE 
Initialize the policy parameter $\theta$ at random. <br>
**for** each episode $\{s_1, a_1, r_2, s_2, a_2, \dots, s_T\} $ **do** <br>
&nbsp; &nbsp; &nbsp; &nbsp;     **for** $t=1, 2, … , T $ **do** <br>
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Calculate the Return $G_t$ <br>
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Update policy parameters $\theta \leftarrow \theta + \alpha \gamma^t G_t \nabla_\theta \log \pi_\theta(A_t \vert S_t)$ <br>
&nbsp; &nbsp; &nbsp; &nbsp; **end for** <br>
**end for**

Visually Explained: https://towardsdatascience.com/reinforcement-learning-explained-visually-part-6-policy-gradients-step-by-step-f9f448e73754

In [12]:
# Training Function
def reinforce(
        policy ,
        optimizer ,
        n_training_episodes ,
        max_steps ,
        gamma ,
        print_every
        ):
    scores_deque = deque( maxlen =100)
    scores = []

    # Each Episode
    for i_episode in range(1, n_training_episodes + 1):
        saved_log_probs = []
        rewards = []
        state = env.reset()[0]

        # t=1, 2, … , T
        for t in range(max_steps):
            action , log_prob = policy.act(state)
            saved_log_probs.append(log_prob)
            state , reward , done , _, info = env.step(action)
            rewards.append(reward)
            if done :
                break
        scores_deque.append(sum( rewards ))
        scores.append(sum( rewards ))

        returns = deque( maxlen = max_steps )
        n_steps = len( rewards )

        # List of discounted Returns
        for t in range(n_steps)[:: -1]:
            disc_return_t = returns[0] if len( returns ) > 0 else 0
            returns.appendleft( gamma*disc_return_t + rewards[t])

        eps = np.finfo(np.float32 ).eps.item()

        returns = torch.tensor( returns )
        returns = ( returns - returns.mean()) / ( returns.std() + eps)

        # Total loss
        policy_loss = []
        for log_prob , disc_return in zip( saved_log_probs , returns ):
            policy_loss.append(-log_prob * disc_return )
        policy_loss = torch.cat( policy_loss ).sum()

        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if i_episode % print_every == 0:
            print(" Episode {}\ tAverage Score : {: f}".format( i_episode , np.mean(scores_deque )))
    return scores


In [17]:
# Hyperparameter
h_size = 128
lr = 0.001
n_training_episodes = 10000
max_steps = 10
gamma = 0.99

In [18]:
policy = Policy (
        s_size = state_space ,
        a_size = action_space ,
        h_size = h_size ,
        ).to( device )
optimizer = optim.Adam( policy.parameters() , lr=lr)

In [19]:
scores = reinforce (
        policy ,
        optimizer ,
        n_training_episodes ,
        max_steps ,
        gamma ,
        print_every = 100)

 Episode 100\ tAverage Score : -6.552787
 Episode 200\ tAverage Score :  1.124283
 Episode 300\ tAverage Score :  5.369712
 Episode 400\ tAverage Score :  8.096942
 Episode 500\ tAverage Score :  6.389607
 Episode 600\ tAverage Score :  8.509075
 Episode 700\ tAverage Score :  7.315725
 Episode 800\ tAverage Score :  7.333497
 Episode 900\ tAverage Score :  8.363511
 Episode 1000\ tAverage Score :  8.120340
 Episode 1100\ tAverage Score :  7.459752
 Episode 1200\ tAverage Score :  7.431425
 Episode 1300\ tAverage Score :  7.472951
 Episode 1400\ tAverage Score :  8.398130
 Episode 1500\ tAverage Score :  9.744566
 Episode 1600\ tAverage Score :  8.730456
 Episode 1700\ tAverage Score :  7.127023
 Episode 1800\ tAverage Score :  6.767623
 Episode 1900\ tAverage Score :  8.039899
 Episode 2000\ tAverage Score :  8.467052
 Episode 2100\ tAverage Score :  9.913705
 Episode 2200\ tAverage Score :  8.912401
 Episode 2300\ tAverage Score :  7.508261
 Episode 2400\ tAverage Score :  8.590199
 