# Implementation of the REINFORCE algorithm

From Miguel's Book, combined with my old code.

https://github.com/mimoralea/gdrl/blob/master/notebooks/chapter_11/chapter-11.ipynb

In [1]:
# will take some of miguel's import statements, but not all.
import gym
import time
import math
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image
import copy
import gc

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchinfo import summary

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device", device)
env = gym.make("CartPole-v1")
n_action = env.action_space.n
n_states = env.observation_space.shape[0]
input_shape= env.observation_space.shape
print("action space:", env.action_space)
print("observation space:", env.observation_space)
print("input shape:", input_shape)
env.reset()

device cuda
action space: Discrete(2)
observation space: Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
input shape: (4,)


array([-0.01358154, -0.01906393, -0.02498913, -0.02085484], dtype=float32)

In [6]:
class DenseNetwork(nn.Module):
    def __init__(self, in_features,
                 out_features,
                 hidden_features=(32, 32),
                 activation = F.relu):
        super(DenseNetwork, self).__init__()
        self.activation = activation
        self.input_layer = nn.Linear(in_features, hidden_features[0])
        self.hidden_layers = nn.ModuleList()
        for i in range(len(hidden_features - 1)):
            hidden_layer = nn.Linear(hidden_features[i], hidden_features[i+1])
            self.hidden_layers.append(hidden_layer)
        self.output_layer = nn.Linear(hidden_features[-1], out_features)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(self.device)


    def _format(self, state):
        x = state
        if not isinstance(x, torch.Tensor):
            x = torch.tensor(x,
                             device=self.device,
                             dtype=torch.float32)
            x = x.unsqueeze(0)
        return x

    def forward(self, state):
        state = self._format(state)
        state = self.activation(self.input_layer(state))
        for layer in self.hidden_layers:
            state = self.activation(layer(state))
        state = self.activation(self.output_layer(state))
        return state

    def full_pass(self, state):
        logits = self.forward(state)
        dist = torch.distributions.Categorical(logits=logits)
        action = dist.sample()
        logpa = dist.log_prob(action).unsqueeze(-1)
        entropy = dist.entropy().unsqueeze(-1)
        is_exploratory = action != np.argmax(logits.detach().numpy())
        return action.item(), is_exploratory.item(), logpa, entropy

    def select_action(self, state):
        logits = self.forward(state)
        dist = torch.distributions.Categorical(logits=logits)
        action = dist.sample()
        return action.item()

    def select_greedy_action(self, state):
        logits = self.forward(state)
        return np.argmax(logits.detach().numpy())


In [None]:
class REINFORCE:
    def __init__(self, policy_model_fn, policy_optimizer_fn, policy_optimizer_lr):
        self.n_action = None
        self.n_states = None
        self.policy_optimizer = None
        self.policy_model = None
        self.env = None
        self.gamma = None
        self.seed = None
        self.make_env_kargs = None
        self.make_env_fn = None
        self.logpas = None
        self.policy_model_fn = policy_model_fn
        self.policy_optimizer_fn = policy_optimizer_fn
        self.policy_optimizer_lr = policy_optimizer_lr
        self.episode_reward = []
        self.iterator = count(start=0, step=1)

    def optimize_model(self):
        T = len(self.rewards)
        discounts = np.logspace(0, T, num=T, base=self.gamma, endpoint=False)
        returns = np.array([np.sum(discounts[:T-t] * self.rewards[t:]) for t in range(T)])

        discounts = torch.FloatTensor(discounts).unsqueeze(1)
        returns = torch.FloatTensor(returns).unsqueeze(1)
        self.logpas = torch.cat(self.logpas)

        policy_loss = -(discounts * returns * self.logpas).mean()
        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()

    def interaction_step(self, state):
        action, is_exploratory, logpa, _ = self.policy_model.full_pass(state)
        new_state, reward, is_terminal, _ = self.env.step(action)

        self.logpas.append(logpa)
        self.rewards.append(reward)

        self.episode_reward[-1] += reward
        self.episode_timestep[-1] += 1
        self.episode_exploration[-1] += int(is_exploratory)

    def train(self, env, gamma, max_minutes, max_episodes, goal_mean_100_reward):
        training_start, last_debug_time = time.time(), float('-inf')

        self.env = env
        self.gamma = gamma

        self.n_states, self.n_action = env.observation_space.shape[0], env.action_space.n
        self.policy_model = self.policy_model_fn(self.n_states, self.n_action)
        self.policy_optimizer = self.policy_optimizer_fn(self.policy_model, self.policy_optimizer_lr)
        result = np.empty((max_episodes, 5))
        result[:] = np.nan
        training_time = 0
        for episodes in range(max_episodes):
            episode_start = time.time()
            state, is_terminal = self.env.reset(), False
            self.episode_reward.append(0.0)
            next(self.iterator)

            self.logpas, self.rewards = [], []
            state, is_terminal = self.interaction_step(state)
            if is_terminal:
                gc.collect()
                break

            self.optimize_model()



