# Policy Gradient Model Demo

In [1]:
!pip install gym[atari] gym[accept-rom-license] tensorboardx



## Environment and Model

In [2]:
from gym.wrappers import (
    RecordVideo,
    RecordEpisodeStatistics,
    GrayScaleObservation,
    FrameStack,
    TransformReward,
    AtariPreprocessing,
)
import gym
import cv2
from collections import deque
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.nn import functional as F

%matplotlib inline

# set manual seed
seed = 2339

torch.manual_seed(seed)
np.random.seed(seed)

# set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
import gym
import os
import warnings
import time

warnings.filterwarnings("ignore")


# create output folder
def get_output_folder(parent_dir, name):
    parent_dir = os.path.join(parent_dir, name)
    parent_dir = parent_dir + time.strftime("-%m-%d-%H-%M")

    os.makedirs(parent_dir, exist_ok=True)
    return parent_dir


def always_true(x):
    return True


class RepeatActionInFramesTakeMaxOfTwo(gym.Wrapper):
    def __init__(self, env, h=160, repeat=4):
        super().__init__(env)

        self.repeat = repeat
        self.h = h
        self.shape = (
            h,
            env.observation_space.low.shape[1],
            env.observation_space.low.shape[2],
        )
        self.frames = deque(maxlen=2)

        if repeat <= 0:
            raise ValueError("Repeat value needs to be 1 or higher")

    def step(self, action):
        total_reward = 0
        done = False
        info = {}

        for i in range(self.repeat):
            observation, reward, done, info = self.env.step(action)
            total_reward += reward
            self.frames.append(observation[: self.h, :, :])

            if done:
                break

        # Open queue into arguments for np.maximum
        maximum_of_frames = np.maximum(*self.frames)
        return maximum_of_frames, total_reward, done, info

    def reset(self):
        observation = self.env.reset()
        self.frames.clear()
        self.frames.append(observation[: self.h, :, :])
        return observation[: self.h, :, :]


class NormResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)

        # Create the new observation space for the env
        # Since we are converting to grayscale we set low of 0 and high of 1
        self.shape = shape

        self.observation_space = gym.spaces.Box(
            low=0.0, high=1.0, shape=self.shape, dtype=np.float32
        )

    def observation(self, observation):
        """Change from 255 grayscale to 0-1 scale"""
        observation = cv2.resize(observation, self.shape, interpolation=cv2.INTER_AREA)
        return (observation / 255.0).reshape(self.shape)


class ClipRewardWrapper(gym.RewardWrapper):
    def __init__(self, env):
        super().__init__(env)

    def reward(self, reward):
        def clip(x):
            if x < 0:
                return -1
            if x > 0:
                return 1
            return 0

        vec_clip = np.vectorize(clip)
        return vec_clip(reward)


def Wrap(env, video_dir):
    shape = (84, 84)
    env = RepeatActionInFramesTakeMaxOfTwo(env, repeat=4)
    env = ClipRewardWrapper(env)
    env = GrayScaleObservation(env)
    env = NormResizeObservation(env, shape)
    env = FrameStack(env, num_stack=4)
    env = RecordEpisodeStatistics(env, 100)
    env = RecordVideo(env, video_dir, episode_trigger=always_true)
    return env

  and should_run_async(code)


In [4]:
class PolicyPi(nn.Module):
    def __init__(self, hidden_dim=64):
        super().__init__()
        self.hidden = nn.Linear(84 * 84 * 4, hidden_dim)
        self.hidden2 = nn.Linear(hidden_dim, hidden_dim // 2)
        # 9 is number of action in Enduro
        self.output = nn.Linear(hidden_dim // 2, 9)
        self.dropout = nn.Dropout(0.1)

    def forward(self, s):
        outs = self.hidden(s)
        outs = F.relu(outs)
        outs = self.dropout(outs)
        outs = self.hidden2(outs)
        outs = F.relu(outs)
        outs = self.dropout(outs)
        logits = self.output(outs)
        return logits


policy_pi = PolicyPi(hidden_dim=512).to(device)

In [5]:
env = gym.make("ALE/Enduro-v5")
output_dir = get_output_folder("gym_monitor", "PG")
env = Wrap(env, output_dir)

## Testing

In [6]:
model = torch.load("policy_pi.pth")
policy_pi.load_state_dict(model)
policy_pi

PolicyPi(
  (hidden): Linear(in_features=28224, out_features=512, bias=True)
  (hidden2): Linear(in_features=512, out_features=256, bias=True)
  (output): Linear(in_features=256, out_features=9, bias=True)
  (dropout): Dropout(p=0.1, inplace=False)
)

In [7]:
def pick_sample(s):
    with torch.no_grad():
        # Flatten observation
        s_batch = np.expand_dims(s, axis=0)
        s_batch = torch.tensor(s_batch, dtype=torch.float).to(device)
        s_batch = torch.flatten(s_batch, start_dim=1)

        # Get logits from state
        logits = policy_pi(s_batch)
        logits = logits.squeeze(dim=0)

        # From logits to probability distribution
        probs = F.softmax(logits, dim=-1)

        # Sample
        a = torch.multinomial(probs, num_samples=1)
        return a.item()


rewards = []
frames = 0
s = env.reset()

done = False
while not done:
    a = pick_sample(s)
    s, r, term, _ = env.step(a)
    done = term

    rewards.append(r)
    frames += 1

    if done:
        break

In [8]:
total_reward = np.sum(rewards)

print("Total reward: {}; Total time: {};".format(total_reward, frames))
print("Estimated in-game final score: ~{}".format(int((frames - 100) / 100) * 100))

Total reward: 20; Total time: 833;
Estimated in-game final score: ~700
