In [1]:
!apt-get install -y xvfb python-opengl > /dev/null 2>&1

In [2]:
!pip install gym pyvirtualdisplay > /dev/null 2>&1

In [3]:
!apt-get install -y ffmpeg

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [None]:
mountDrive = True
if mountDrive:
  from google.colab import drive
  drive.mount('/content/drive')

In [None]:
import gym
import numpy as np
if not hasattr(np, "bool8"):
    np.bool8 = np.bool_
import matplotlib.pyplot as plt
from matplotlib import animation
from matplotlib.patches import Patch, FancyArrowPatch
from IPython import display as ipythondisplay
import torch
import torch.nn as nn
import torch.optim as optim
import math
import pickle
import copy

In [None]:
from pyvirtualdisplay import Display
display = Display(visible=0, size=(400, 300))
display.start()

In [None]:
# Naive Implementation with Random Forces applied
env = gym.make('CartPole-v1', render_mode="rgb_array", new_step_api=True)
env.reset()
prev_screen = env.render()
plt.imshow(prev_screen[0])
test_frames = []
test_frames.append(prev_screen[0])

for i in range(50000):
  action = env.action_space.sample()
  print("step i",i,"action=",action)
  obs, reward, done, truncated, info = env.step(action)
  print("obs=",obs,"reward=",reward,"done=",done,"truncated=", truncated,"info=",info)
  screen = env.render()

  plt.imshow(screen[0])
  test_frames.append(screen[0])
  ipythondisplay.clear_output(wait=True)
  ipythondisplay.display(plt.gcf())

  if done or truncated:
    break

ipythondisplay.clear_output(wait=True)
env.close()

print("Iterations that were run:", i)
print("obs=",obs,"reward=",reward,"done=",done,"truncated=", truncated,"info=",info)


if mountDrive:
    with open("/content/drive/MyDrive/Academia&Work/projects/Masters/APS1080 (RL)/Exercises/frames_test.pkl", "wb") as f:
        pickle.dump(test_frames, f)
    print("Model saved to frames_test.pkl")

    with open("/content/drive/MyDrive/Academia&Work/projects/Masters/APS1080 (RL)/Exercises/obs_test.pkl", "wb") as f:
        pickle.dump(obs, f)
    print("Model saved to obs_test.pkl")

In [6]:
def createVideo(vid_frames, true_obs, title):
    r_obs = np.round(true_obs, 2)
    print(f"Pole reached critical tipping position at angle: {r_obs[2]*180/math.pi:.2f}\u00B0,"
                          f"\nposition: {r_obs[0]:.2f}m, ang. vel.: {r_obs[3]*180/math.pi:.2f}\u00B0/s, lin. vel.: {r_obs[1]:.2f} m/s")
    fps=8
    hold_annotation=6
    cart_centre = int(true_obs[0]*125.0+300)
    print(f"cart centre: {cart_centre}")
    frames_extended = vid_frames + [vid_frames[-1]] * fps*hold_annotation



    # Save video
    fig, ax = plt.subplots(figsize=(6, 4), dpi=400)
    im = ax.imshow(vid_frames[0], interpolation='auto')
    ax.axis('off')
    ax.set_title("Cartpole Demo of REINFORCE Algorithm,\nPolicy-based Learning")
    pole_color = np.mean(np.array([202, 152, 101]).reshape(-1, 3), axis=0) / 255

    # Add legend
    legend_elements = [
        Patch(facecolor='black', edgecolor='black', label='Cart'),
        Patch(facecolor=pole_color, label='Pole')
    ]
    ax.legend(handles=legend_elements, loc='upper right')
    y_pos = 300
    arrow = FancyArrowPatch(
        (-5, y_pos),                        # start point
        (vid_frames[0].shape[1]+5, y_pos),     # end point
        arrowstyle='<->',
        mutation_scale=15,
        color='black',
        linewidth=3
    )
    ax.add_patch(arrow)



    def update(frame_idx):
        im.set_array(frames_extended[frame_idx])

        artists = [im]
        # Add text after last real frame
        if frame_idx >= len(vid_frames) - 1:
            text = ax.text(vid_frames[0].shape[1]//2, 130,
                          f"Pole reached critical tipping position at angle: {r_obs[2]*180/math.pi:.2f}\u00B0,"
                          f"\nposition: {r_obs[0]:.2f}m, ang. vel.: {r_obs[3]*180/math.pi:.2f}\u00B0/s, lin. vel.: {r_obs[1]:.2f} m/s",
                          color='red', fontsize=10, ha='center', va='top',
                          fontweight='light')
            vline = ax.axvline(x=cart_centre,ymin=0.27, ymax=0.58,color='blue', linestyle=':', linewidth=2)
            th_diff = 6 if true_obs[2] > 0 else -12
            v_sign = 1 if true_obs[1] > 0 else -1
            theta_text = ax.text((cart_centre+th_diff), 200, r'$\theta$',
                                color='blue', fontsize=8, fontweight='bold')
            v_arrow = FancyArrowPatch(
                posA=(cart_centre-v_sign*20, 300),
                posB=(cart_centre+v_sign*25, 300),
                arrowstyle='->',
                color='yellow',
                linewidth=2,
                mutation_scale=10
            )

            ax.add_patch(v_arrow)
            artists.extend([text, vline, theta_text, v_arrow])

        return artists
    ani = animation.FuncAnimation(fig, update, frames=len(frames_extended), interval=50, blit=True)
    if mountDrive:
        ani.save(
            '/content/drive/MyDrive/Academia&Work/projects/Masters/APS1080 (RL)/Exercises/'+title,
            writer='ffmpeg',
            dpi=400,
            bitrate=5000,
            fps=fps
        )
    ani.save(
        title,
        writer='ffmpeg',
        dpi=400,
        bitrate=5000,
        fps=fps
    )

    print("MP4 Video successfully saved")
    plt.close(fig)

In [None]:
createVideo(test_frames, obs,'RL_REINFORCE_CartpoleTest.mp4')

In [7]:
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, alpha):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)
        self.optimizer = optim.Adam(self.parameters(), lr=alpha)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return torch.softmax(x, dim=-1)

    def predict(self, state):
        state = torch.tensor(state, dtype=torch.float32)
        action_probs = self.forward(state)
        return action_probs.detach().numpy()

    def gradient_log_prob(self, state, action):
        state = torch.tensor(state, dtype=torch.float32)
        action_probs = self.forward(state)
        log_prob = torch.log(action_probs[action])
        self.optimizer.zero_grad()
        log_prob.backward()
        return [param.grad for param in self.parameters()]

    def update_policy(self, grad_log_prob, G, t, gamma):
        for param, grad in zip(self.parameters(), grad_log_prob):
            param.grad = -grad * (gamma ** t) * G
        self.optimizer.step()

In [8]:
class REINFORCE:
    def __init__(self, alpha, gamma, policy_network, num_episodes):
        self.alpha = alpha
        self.gamma = gamma
        self.policy_network = policy_network
        self.num_episodes = num_episodes
        self.ep_lengths = []

    def generate_episode(self, env):
        episode = []
        state = env.reset()
        done = False
        truncated = False
        actions_taken = 0
        while not (done or truncated):
            action_probs = self.policy_network.predict(state)
            action = np.random.choice(len(action_probs), p=action_probs)
            next_state, reward, done, truncated, _ = env.step(action)
            episode.append((state, action, reward))
            state = next_state
            actions_taken += 1
        self.ep_lengths.append(actions_taken)
        return episode, actions_taken

    def reinforce(self, env):
        longest_episode = 0
        for episode in range(self.num_episodes):
            episode_data, actions_taken = self.generate_episode(env)
            if actions_taken > longest_episode:
                longest_episode = actions_taken
                print(f"Longest episode so far: {longest_episode}")
                self.best_agent = copy.deepcopy(self)
            for t in range(len(episode_data)):
                G = 0
                for k in range(t, len(episode_data)):
                    _, _, reward = episode_data[k]
                    G += (self.gamma ** (k - t)) * reward

                state, action, _ = episode_data[t]
                grad_log_prob = self.policy_network.gradient_log_prob(state, action)
                self.policy_network.update_policy(grad_log_prob, G, t, self.gamma)

            if (episode % 500 == 0) and mountDrive:
              with open("/content/drive/MyDrive/Academia&Work/projects/Masters/APS1080 (RL)/Exercises/reinforce_agent.pkl", "wb") as f:
                pickle.dump(self, f)
              print(f"Episode {episode} completed, saved reinforce_agent checkpoint")

    def test_policy(self, version):
        env = gym.make("CartPole-v1", max_episode_steps=10_000, new_step_api=True, render_mode='rgb_array')
        obs = env.reset()
        prev_screen = env.render()
        plt.imshow(prev_screen[0])
        frames = []
        frames.append(prev_screen[0])
        ipythondisplay.display(plt.gcf())

        for i in range(50000):
            action_probs = self.policy_network.predict(obs)
            action = np.argmax(action_probs)
            obs, reward, done, truncated, info = env.step(action)

            print(f"Step {i}: Action={action}, Obs={obs}, Reward={reward}, Done={done}, Truncated={truncated}, Info={info}")

            screen = env.render()
            plt.imshow(screen[0])
            frames.append(screen[0])
            ipythondisplay.clear_output(wait=True)
            ipythondisplay.display(plt.gcf())

            if done or truncated:
                break

        ipythondisplay.clear_output(wait=True)
        env.close()
        print(f"Iterations that were run: {i}")

        if mountDrive:
            with open(f"/content/drive/MyDrive/Academia&Work/projects/Masters/APS1080 (RL)/Exercises/frames_{version}.pkl", "wb") as f:
                pickle.dump(frames, f)
            print("Model saved to frames.pkl")

            with open(f"/content/drive/MyDrive/Academia&Work/projects/Masters/APS1080 (RL)/Exercises/obs_{version}.pkl", "wb") as f:
                pickle.dump(obs, f)
            print("Model saved to obs.pkl")


        createVideo(frames, obs,f'RL_REINFORCE_Cartpole_{version}.mp4')


In [None]:
# Set up the environment and hyperparameters
env = gym.make("CartPole-v1", max_episode_steps=10_000_000, new_step_api=True)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
alpha = 0.01
gamma = 0.99
num_episodes = 10_000

# Create the policy network
policy_network = PolicyNetwork(state_dim, action_dim, alpha)

# Create and run the REINFORCE algorithm
reinforce_agent = REINFORCE(alpha, gamma, policy_network, num_episodes)
reinforce_agent.reinforce(env)

print("Model saved to reinforce_agent.pkl")

In [None]:
# Test the trained policy
# env = gym.make("CartPole-v1", max_episode_steps=10_000, new_step_api=True, render_mode='rgb_array')
# reinforce_agent.test_policy("latest_model")

In [None]:
env = gym.make("CartPole-v1", max_episode_steps=10_000, new_step_api=True, render_mode='rgb_array')
reinforce_agent.best_agent.test_policy("best_model")