<a href="https://colab.research.google.com/github/dane-meister/Machine-Learning-Algos/blob/main/reinforce_%26_actor_critic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# REINFORCE and Actor-Critic

---


The goal is to implement REINFORCE and Actor-Critic

We will be using 4 Gymnasium environments (InvertedPendulum-v4, Hopper-v4, HalfCheetah-v4, and image-based InvertedPendulum-v4).

In [None]:
!apt install xvfb -y
!pip install gymnasium[classic-control,atari,accept-rom-license,mujoco]
!pip install opencv-python
%env MUJOCO_GL=egl

In [None]:
import imageio
from IPython.display import Image, display
from io import BytesIO

# Code for visualizing the episode

class GIFMaker:
    def __init__(self):
        self.reset()

    def reset(self):
        self.images = []
        self.buffer = BytesIO()

    def append(self, img):
        self.images.append(img)

    def display(self):
        imageio.mimsave(self.buffer, self.images, format='gif')
        gif = Image(data=self.buffer.getvalue())
        display(gif)
        return gif

    def __len__(self):
      return len(self.images)

##Import packages we need

In [None]:
import os
import gc
import math
import random

from tqdm import tqdm
import matplotlib.pyplot as plt
from collections import deque

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions.normal import Normal

import cv2

## Env1 InvertedPendulum
Detailed information of this environment:
https://gymnasium.farama.org/environments/mujoco/inverted_pendulum/

In [None]:
import gymnasium as gym
env = gym.make("InvertedPendulum-v4", render_mode="rgb_array")
g = GIFMaker() # visualization
observation, info = env.reset(seed=42)
for i in range(500):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    # print(i, action, observation, reward)
    g.append(env.render()) # save one frame
    if terminated or truncated:
        observation, info = env.reset()
        break
g.display() # show GIF animation
env.close()

## Env2 Hopper-v4
Detailed information of this environment:
https://gymnasium.farama.org/environments/mujoco/hopper/

In [None]:
env = gym.make("Hopper-v4", render_mode="rgb_array")

observation, info = env.reset(seed=42)
g = GIFMaker()
for i in range(200):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    # print(i, action, observation, reward)
    g.append(env.render())
    if terminated or truncated:
        observation, info = env.reset()
        break
g.display()
env.close()

## Env3 HalfCheetah-v4
Detailed information of this environment: https://gymnasium.farama.org/environments/mujoco/half_cheetah/

In [None]:
env = gym.make("HalfCheetah-v4", render_mode="rgb_array")

observation, info = env.reset(seed=42)
g = GIFMaker()
for i in range(200):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    # print(i, action, observation, reward)
    g.append(env.render())
    if terminated or truncated:
        observation, info = env.reset()
        break
g.display()
env.close()

## Env4 *Image-based* InvertedPendulum
Detailed information of this environment: https://gymnasium.farama.org/environments/mujoco/inverted_pendulum/

In [None]:
from gymnasium import ObservationWrapper
from gymnasium.wrappers import (
    PixelObservationWrapper,
    GrayScaleObservation,
    ResizeObservation
)
from gymnasium.spaces import Box

class KeyObservationWrapper(ObservationWrapper):
    def __init__(self, env: gym.Env):
        gym.ObservationWrapper.__init__(self, env)
        self.observation_space = self.observation_space['pixels']

    def observation(self, observation):
        return observation['pixels']

class MakeChannelObservationWrapper(ObservationWrapper):
    def __init__(self, env: gym.Env):
        gym.ObservationWrapper.__init__(self, env)
        self.observation_space = Box(
            low = self.observation_space.low[np.newaxis, :],
            high = self.observation_space.high[np.newaxis, :],
            shape = (1,) + self.observation_space.shape,
            dtype = self.observation_space.dtype
        )

    def observation(self, observation):
        return observation[np.newaxis, :]


def make_image_env(env_name):
    env = gym.make(env_name, render_mode="rgb_array")
    env = PixelObservationWrapper(env)
    env = KeyObservationWrapper(env)
    env = GrayScaleObservation(env)
    env = ResizeObservation(env, (84, 84))
    env = MakeChannelObservationWrapper(env)
    return env

env = make_image_env("InvertedPendulum-v4")
observation, info = env.reset(seed=42)
print (observation.shape)
g = GIFMaker()
for i in range(200):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    # this observation gives you (1, 84, 84), 1 means one gray channel
    # print(i, action, observation, reward)
    # g.append(env.render()) # original rgb frames
    g.append(observation.squeeze(0))    # see the image observations
    if terminated or truncated:
        observation, info = env.reset()
        break
g.display()
env.close()

##REINFORCE

In [None]:
# The policy network for numerical states using MLP
class Policy(nn.Module):
    def __init__(
        self,
        state_dim,
        action_dim
    ):
        super(Policy, self).__init__()
        # print(state_dim, action_dim)

        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 256)
        self.fc3 = nn.Linear(256, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)

        x = F.tanh(x) # scale the final output to [-1, 1]; And we treat it as the mean value of a (multi-variable) normal distribution
        return x

# The policy network for **image observations** using CNN
class Policy_Image(nn.Module):
    def __init__(
        self,
        image_channels,
        action_dim
    ):
        super(Policy_Image, self).__init__()
        self.conv1 = nn.Conv2d(image_channels, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)

        self.fc1 = nn.Linear(64 * 7 * 7, 512)
        self.fc2 = nn.Linear(512, action_dim)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))

        # Flatten the output for the linear layer
        x = x.view(x.size(0), -1)

        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        x = F.tanh(x) # scale the final output to [-1, 1]; And we treat it as the mean value of a (multi-variable) normal distribution
        return x


# REINFORCE
class REINFORCE():
    def __init__(
        self,
        env,
        env_name,
        policy_network,
        var=0.1,
        var_decay_steps=20000,
        gamma=0.95,
        lr=0.01
    ):
        # initlize env and env name
        self.env = env
        self.env_name = env_name

        # set state dimension and number of actions
        self.state_dim = env.observation_space.shape
        self.action_dim = env.action_space.shape[0]

        # create policy network:
        if len(self.state_dim) > 1:
            self.policy_network = Policy_Image(image_channels=self.state_dim[0], action_dim=self.action_dim)
        else:
            # for numerical states
            self.state_dim = self.state_dim[0]
            self.policy_network = Policy(state_dim=self.state_dim, action_dim=self.action_dim)

        # assign gpu if avliable
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

        # move the models to gpu
        self.policy_network = self.policy_network.to(self.device)

        # (end) variance value for normal distribution
        self.var = var
        # how many steps epsilon decays from 1 to eps
        self.var_decay_steps = var_decay_steps

        # set optimizer and make it use "lr" (the learning rate)
        self.optimizer = optim.Adam(self.policy_network.parameters(), lr=lr)

        # mode is used to switch between optimization and Evaluation
        self.mode = "eval"
        self.total_transitions = 0

        self.gamma = gamma
        self.g = None


    # function to choose action given a state
    # return the log probablity of the action as well
    # return values are all tensors (on gpu if using gpu)
    def act(self, state):
        if self.mode == "eval":
            with torch.no_grad():
                # For evaluation mode
                action = self.policy_network(state)#.squeeze()
                log_prob = torch.zeros_like(action) #all zeros #torch.tensor([0.0])

                return action, log_prob

        elif self.mode == "train":
            # variance of the policy
            # which decays from the initial high value (1) to final value (self.var)
            # this is similar to epsilon decay in DQN
            if self.total_transitions < self.var_decay_steps:
                var = (self.var_decay_steps-self.total_transitions) / self.var_decay_steps * (1 - self.var) + self.var
            else:
                var = self.var

            # we create a normal distribution for action
            # where the mean is the output from the policy network
            # the variance is a constant value
            mean = self.policy_network(state) # mean
            var = torch.full((self.action_dim,), var).to(self.device) # variance
            normal_dist = Normal(mean, var.sqrt())

            # sample from the normal distribution as the action
            action = normal_dist.sample()

            # let us clip the action to range [-1, 1] in case environment does not accept that
            action = torch.clip(action, -1, 1)

            # get the log probablity of that action according to the normal distribution
            log_prob = normal_dist.log_prob(action)

        return action, log_prob

    # REINFORCE update
    def optimization(self, log_probs, returns):
        # make the model in train mode
        self.policy_network.train()

        log_probs = log_probs.mean(dim=1).unsqueeze(1)
        loss = -(log_probs * returns).sum()

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()


    # run for one episode
    def run_one_episode(self, visualize=False):
        self.g = GIFMaker()

        terminated = False
        r = 0

        # initialize lists for holding rewards and log_probs we experienced
        rewards = []
        log_probs = []

        # reset the environment
        state, info = self.env.reset()

        if visualize:
            self.g.append(env.render())

        while not terminated:
            # convert state to tensor, make a batch dimension and move it to gpu
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)

            # choose (sample) an action using self.act()
            # also get its log probability
            action_tensor, log_prob_tensor = self.act(state_tensor)

            # convert the action tensor to a number for taking a step in environment
            action = action_tensor.cpu().numpy()[0]

            # Go to next state
            next_state, reward, terminated, truncated, info = self.env.step(action)

            r += reward # note: r is cumulative reward and reward is this step's reward


            # append the log probability of action to the list for later optimization
            # append this step's reward to rewards list, which will be used to compute future rewards for optimization
            log_probs.append(log_prob_tensor)
            rewards.append(torch.tensor([reward], device=self.device))

            # for rendering
            if visualize:
                self.g.append(env.render())

            # inc total_transitions
            if self.mode == "train":
                self.total_transitions += 1

            # Terminated then return
            if terminated or truncated:
                break

            state = next_state

        # optimize your policy after this episode ends
        if self.mode == "train":
            future_rewards = []
            for r in reversed(rewards):
                if future_rewards:
                    future_rewards.insert(0, r + self.gamma * future_rewards[0])
                else:
                    future_rewards.insert(0, r)
            future_rewards = torch.cat(future_rewards).unsqueeze(1)

            log_probs = torch.cat(log_probs)

            # optimize your policy by this trajectory
            self.optimization(log_probs, future_rewards)

        return r

    def train(self):
        self.mode = "train"
        self.run_one_episode()

    # Evaluation of policy
    def eval(self, n, visualize=False):
        self.mode = "eval" # put in eval mode
        returns = []
        for i in range(n):                          # run evaluation for n episode
            returns.append(self.run_one_episode(visualize))
        return np.mean(returns)                    # return average returns over niter episodes


    # the function called to perform optimization and evaluation
    def execute(self, total_ep=5000, eval_freq=100, eval_ep=100):
        rewards = []   #used to track polciy evaluation across runs
        episodes = []  #number of episodes used to update policy

        prog_bar = tqdm(range(0, total_ep))
        for i in prog_bar:
            self.train()                        # train
            if (i+1) % eval_freq == 0:          # evaluate using eval_ep episodes every eval_freq policy updates
                reward = self.eval(eval_ep)
                rewards.append(reward)
                episodes.append(i)
                print (f"Eval Reward: {reward:.2f} at ep {i+1}")

        plt.plot(episodes, rewards)   #plot evaluation reward vs episodes
        plt.xlabel('episodes')
        plt.ylabel('Rewards')
        plt.title('REINFORCE on '+self.env_name)
        plt.show()

    def visualize(self):
        self.g.display()

In [None]:
# InvertedPendulum-v4 REINFORCE
gc.collect() # free some unused RAM, don't worry about this
env = gym.make("InvertedPendulum-v4", render_mode="rgb_array")
reinforce = REINFORCE(env, "InvertedPendulum", Policy, var=0.01, lr=1e-3, gamma=0.95) # you can explore different gamma, var (variance of your policy) and learning rate
reinforce.execute(total_ep=12000, eval_freq=400, eval_ep=20)

In [None]:
# evaluate the final policy (could be slow because it renders images. Be patient)
print (f"Final Eval: {reinforce.eval(20, visualize=True):.2f}")
reinforce.visualize()

In [None]:
# Hopper-v4 REINFORCE
gc.collect() # free some unused RAM, don't worry about this
env = gym.make("Hopper-v4", render_mode="rgb_array", max_episode_steps=500)
reinforce = REINFORCE(env, "Hopper", Policy, var=0.05, lr=1e-4, gamma=0.99) # you can explore different gamma, var (variance of your policy) and learning rate
reinforce.execute(total_ep=3000, eval_freq=50, eval_ep=20)

In [None]:
# evaluate the final policy (could be slow because it renders images. Be patient)
print (f"Final Eval: {reinforce.eval(10, visualize=True):.2f}")
reinforce.visualize()

In [None]:
# HalfCheetah-v4 REINFORCE
gc.collect() # free some unused RAM
env = gym.make("HalfCheetah-v4", render_mode="rgb_array", max_episode_steps=500)
reinforce = REINFORCE(env, "HalfCheetah", Policy, var=0.05, lr=1e-3, gamma=0.95) # you can explore different gamma, var (variance of your policy) and learning rate
reinforce.execute(total_ep=2000, eval_freq=50, eval_ep=10)

In [None]:
# evaluate the final policy (could be slow because it renders images. Be patient)
print (f"Final Eval: {reinforce.eval(10, visualize=True):.2f}")
reinforce.visualize()

In [None]:
# image based InvertedPendulum-v4
env = make_image_env("InvertedPendulum-v4")
gc.collect()
env = gym.make("InvertedPendulum-v4", render_mode="rgb_array")
reinforce = REINFORCE(env, "InvertedPendulum", Policy_Image, var=0.03, lr=1e-4, gamma=0.95) # you can explore different gamma, var (variance of your policy) and learning rate
reinforce.execute(total_ep=5000, eval_freq=200, eval_ep=10)

#Actor Critic

In [None]:
# The actor network for numerical states using MLP
class Actor(nn.Module):
    def __init__(
        self,
        state_dim,
        action_dim
    ):
        super(Actor, self).__init__()
        # print(state_dim, action_dim)

        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 256)
        self.fc3 = nn.Linear(256, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)

        x = F.tanh(x) # scale the output to [-1, 1]; And we treat it as the mean value of a (multi-variable) normal distribution
        return x

# The actor network for **image states** using CNN
class Actor_Image(nn.Module):
    def __init__(
        self,
        image_channels,
        action_dim
    ):
        super(Actor_Image, self).__init__()
        # print(state_dim, action_dim)

        self.conv1 = nn.Conv2d(image_channels, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)

        self.fc1 = nn.Linear(64 * 7 * 7, 512)
        self.fc2 = nn.Linear(512, action_dim)


    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))

        x = x.view(x.size(0), -1)  # Flatten the output

        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = F.tanh(x) # scale the output to [-1, 1]; And we treat it as the mean value of a (multi-variable) normal distribution
        return x


# The critic network for numerical states using MLP
class Critic(nn.Module):
    def __init__(
        self,
        state_dim
    ):
        super(Critic, self).__init__()
        # print(state_dim, action_dim)
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 256)
        self.fc3 = nn.Linear(256, 1)  # Outputs a single value (state value)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)  # No activation here, as we want a raw score
        return x

# The critic network for **image states** using CNN
class Critic_Image(nn.Module):
    def __init__(
        self,
        image_channels
    ):
        super(Critic_Image, self).__init__()
        self.conv1 = nn.Conv2d(image_channels, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)

        self.fc1 = nn.Linear(64 * 7 * 7, 512)  # Adjust based on the output of conv layers
        self.fc2 = nn.Linear(512, 1)  # Outputs a single value

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))

        x = x.view(x.size(0), -1)  # Flatten the output

        x = F.relu(self.fc1(x))
        x = self.fc2(x)  # No activation here
        return x


# ActorCritic
class ActorCritic():
    def __init__(
        self,
        env,
        env_name,
        actor,
        critic,
        var=0.1,
        var_decay_steps=20000,
        train_freq=4,
        target_network_update_freq=200,
        gamma=0.95,
        lr=1e-3
    ):
        # initlize env and env name
        self.env = env
        self.env_name = env_name

        # set state dimension and number of actions
        self.state_dim = env.observation_space.shape
        self.action_dim = env.action_space.shape[0]

        # create actor and critic networks:
        if len(self.state_dim) > 1:
            self.actor = Actor_Image(image_channels=self.state_dim[0], action_dim=self.action_dim)
            self.critic = Critic_Image(image_channels=self.state_dim[0])
            self.critic_target = Critic_Image(image_channels=self.state_dim[0])
        else:
            # for numerical states
            self.state_dim = self.state_dim[0]
            self.actor = Actor(state_dim=self.state_dim, action_dim=self.action_dim)
            self.critic = Critic(state_dim=self.state_dim)
            self.critic_target = Critic(state_dim=self.state_dim)

        # initialize the target critic network using the weights from critic network
        self.critic_target.load_state_dict(self.critic.state_dict())

        # assign gpu if avliable
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

        # move the models to gpu
        self.actor = self.actor.to(self.device)
        self.critic = self.critic.to(self.device)
        self.critic_target = self.critic_target.to(self.device)

        # (end) variance value for normal distribution
        self.var = var
        # how many steps epsilon decays from 1 to eps
        self.var_decay_steps = var_decay_steps

        # set optimizers, one each network
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)

        # mode is used to switch between optimization of DQN and Evaluation
        self.mode = "eval"
        self.total_transitions = 0

        self.gamma = gamma
        self.g = None
        self.train_freq = train_freq
        self.target_network_update_freq = target_network_update_freq


    # function to choose action given a state
    # returns the log probablity of the action as well
    # returns values are all tensors (on gpu if using gpu)
    def act(self, state):
        if self.mode == "eval":
            with torch.no_grad():
                mean = self.actor(state)
                action = torch.tanh(mean)
                log_prob = torch.zeros_like(action)

                return action, log_prob

        elif self.mode == "train":
            # variance of the policy
            # which decays from the initial high value (1) to final value (self.var)
            # this is similar to epsilon decay in DQN
            if self.total_transitions < self.var_decay_steps:
                var = (self.var_decay_steps-self.total_transitions) / self.var_decay_steps * (1 - self.var) + self.var
            else:
                var = self.var

            # we create a normal distribution for action
            # where the mean is the output from the policy network
            # the variance is a constant value
            mean = self.actor(state) # mean
            var = torch.full((self.action_dim,), self.var).to(self.device)  # variance
            normal_dist = Normal(mean, var.sqrt())

            # sample from the normal distribution as the action
            action = normal_dist.sample()

            # let us clip the action to range [-1, 1] in case environment does not accept that
            action = torch.clip(action, -1, 1)

            # get the log probablity of that action according to the normal distribution
            log_prob = normal_dist.log_prob(action)

        return action, log_prob

    # optimization
    def optimization(self, log_probs, states, next_states, rewards, not_dones):
        # put the models into training mode
        self.actor.train()
        self.critic.train()

        log_probs = torch.cat(log_probs, dim=0)
        states = torch.cat(states)
        next_states = torch.cat(next_states)
        rewards = torch.cat(rewards)
        not_dones = torch.cat(not_dones)

        # compute value v(s) and v(s') using critic and advantages
        values = self.critic(states)
        with torch.no_grad():
          next_values = self.critic_target(next_states)

        # compute actor loss
        # Taking the mean across the action dimension (dim=1)
        log_probs_mean = log_probs.mean(dim=1)
        advantages = rewards + self.gamma * next_values * not_dones - values
        # Detaching advantages to prevent gradient flow into critic
        advantages_detached = advantages.detach()
        actor_loss = -(log_probs_mean * advantages_detached).mean()

        # optimize the actor using actor loss
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # compute critic loss
        target_values = rewards + self.gamma * next_values * not_dones
        critic_loss = F.mse_loss(values, target_values)

        # optimize the critic network (not the target critic!)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # update the target critic network based on critic network paramters every self.target_network_update_freq steps
        if self.total_transitions % self.target_network_update_freq == 0:
          self.critic_target.load_state_dict(self.critic.state_dict())
        # ======================================


    # run for one episode
    def run_one_episode(self, visualize=False):
        self.g = GIFMaker()

        terminated = False
        r = 0

        # make empty lists to hold state, next state, stepwise reward, log probabilities, and not_done
        states = []
        next_states = []
        rewards = []
        log_probs = []
        not_dones = []


        # reset
        state, info = self.env.reset()

        if visualize:
            self.g.append(env.render())

        while not terminated:
            # make state to tensor, add a batch dimension, and move it to gpu
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)

            # select (sample) an action and get its log probability
            action_tensor, log_prob_tensor = self.act(state_tensor)


            # convert action to numpy array for using it in environment
            action = action_tensor.cpu().numpy()[0]

            # go to next state using that action
            next_state, reward, terminated, truncated, info = self.env.step(action)

            r += reward

            # for rendering
            if visualize:
                self.g.append(env.render())

            # create tensors for next_state, reward, and not_done
            next_state_tensor = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0).to(self.device)
            reward_tensor = torch.tensor([reward], dtype=torch.float32).to(self.device)
            not_done_tensor = torch.tensor([1.0 - float(terminated)], device=self.device)

            if self.mode == "train":
                self.total_transitions += 1

                # if training, append the transition (log probabilities, states, next_states, rewards and not dones)
                # to the lists holding them
                states.append(state_tensor)
                next_states.append(next_state_tensor)
                rewards.append(reward_tensor)
                log_probs.append(log_prob_tensor)
                not_dones.append(not_done_tensor)

                # every time we need to train or the trajectory ends
                if (self.total_transitions % self.train_freq) or terminated or truncated:

                    # call optimization
                    self.optimization(log_probs, states, next_states, rewards, not_dones)

                    # reset the lists for holding transitions back to empty lists
                    states, next_states, rewards, log_probs, not_dones = [], [], [], [], []

            # terminated then return
            if terminated or truncated:
                return r

            # don't forget to assign next_state to state for the next loop
            state = next_state

    def train(self):
        self.mode = "train"
        self.run_one_episode()

    # Evaluation of policy
    def eval(self, n, visualize=False):
        self.mode = "eval" # put in eval mode
        returns = []
        for i in range(n):                          # run evaluation for n episode
            returns.append(self.run_one_episode(visualize))
        return np.mean(returns)                    # return average returns over niter episodes


    # the function called to perform optimization and evaluation
    def execute(self, total_ep=5000, eval_freq=100, eval_ep=100):
        rewards = []   #used to track polciy evaluation across runs
        episodes = []  #number of episodes used to update policy

        prog_bar = tqdm(range(0, total_ep))
        for i in prog_bar:
            self.train()                        # train
            if (i+1) % eval_freq == 0:          # evaluate using eval_ep episodes every eval_freq policy updates
                reward = self.eval(eval_ep)
                rewards.append(reward)
                episodes.append(i)
                print (f"Eval Reward: {reward:.2f} at ep {i+1}")

        plt.plot(episodes, rewards)   #plot evaluation reward vs episodes
        plt.xlabel('episodes')
        plt.ylabel('Rewards')
        plt.title('Actor Critic on '+self.env_name)
        plt.show()

    def visualize(self):
        self.g.display()

In [None]:
# InvertedPendulum-v4 Actor-Critic
gc.collect() # free some unused RAM, don't worry about this
env = gym.make("InvertedPendulum-v4", render_mode="rgb_array")
ac = ActorCritic(env, "InvertedPendulum", Actor, Critic, var=0.06, lr=1e-3, train_freq=64) # you can explore different gamma, var (variance of your policy) and learning rate
ac.execute(total_ep=5000, eval_freq=200, eval_ep=20) # run at least 5000 episodes # roughly around 25mins if total_ep=5000, eval_freq=200, eval_ep=20

In [None]:
# evaluate the final policy (could be slow because it renders images. Be patient)
print (f"Final Eval: {ac.eval(20, visualize=True):.2f}")
ac.visualize()

In [None]:
# Hopper-v4 Actor-Critic
gc.collect() # free some unused RAM, don't worry about this
env = gym.make("Hopper-v4", render_mode="rgb_array")
ac = ActorCritic(env, "Hopper", Actor, Critic, var=0.04, lr=1e-4, train_freq=32) # you can explore different gamma, var (variance of your policy) and learning rate
ac.execute(total_ep=5000, eval_freq=200, eval_ep=10) # run at least 5000 episodes # roughly around 20mins if total_ep=5000, eval_freq=200, eval_ep=10

In [None]:
# evaluate the final policy (could be slow because it renders images. Be patient)
print (f"Final Eval: {ac.eval(10, visualize=True):.2f}")
ac.visualize()

In [None]:
# HalfCheetah-v4 Actor-Critic
gc.collect() # free some unused RAM, don't worry about this
env = gym.make("HalfCheetah-v4", render_mode="rgb_array")
ac = ActorCritic(env, "HalfCheetah", Actor, Critic, var=0.08, lr=1e-3, train_freq=32, gamma=0.99) # you can explore different gamma, var (variance of your policy) and learning rate
ac.execute(total_ep=850, eval_freq=50, eval_ep=10) # run at least 600 episodes # roughly around 50mins if total_ep=600, eval_freq=50, eval_ep=10

In [None]:
# evaluate the final policy (could be slow because it renders images. Be patient)
print (f"Final Eval: {ac.eval(10, visualize=True):.2f}")
ac.visualize()

In [None]:
# image based InvertedPendulum-v4 Actor-Critic
env = make_image_env("InvertedPendulum-v4")
gc.collect() # free some unused RAM, don't worry about this
env = gym.make("InvertedPendulum-v4", render_mode="rgb_array")
ac = ActorCritic(env, "InvertedPendulum", Actor_Image, Critic_Image, var=0.02, lr=1e-3, train_freq=32, gamma=0.92) # you can explore different gamma, var (variance of your policy) and learning rate
ac.execute(total_ep=5000, eval_freq=200, eval_ep=10) # please train for at least 5000 episodes

In [None]:
# evaluate the final policy (could be slow because it renders images. Be patient)
print (f"Final Eval: {ac.eval(10, visualize=True):.2f}")
ac.visualize()