In [1]:
#!pip install gym[atari]

In [2]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import cv2


In [3]:
# helper fc for pong
def preprocess(image):
    """ Prepro 210x160x3 uint8 frame into 6400 (80x80) 2D float array """
    image = image[35:195]  # crop
    image = image[::2, ::2, 0]  # downsample by factor of 2
    image[image == 144] = 0  # erase background (background type 1)
    image[image == 109] = 0  # erase background (background type 2)
    image[image != 0] = 1  # everything else (paddles, ball) set to 1
    return np.reshape(image.astype(np.float32), [80, 80])

In [4]:
# simple model of full connect
class PolicyNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(PolicyNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 200),
            nn.ReLU(),
            nn.Linear(200, output_size),
            nn.Softmax(dim=-1)
        )
    
    def forward(self, x):
        x = x.view(-1, x.size(-1))
        return self.fc(x)

In [5]:
#public methods
def select_action(state):
    cu_state = torch.FloatTensor(state).unsqueeze(0)   
    probs = policy_net(cu_state)
    action = torch.multinomial(probs, 1).item()
    return action

############
# apply baseline to rewards
############
def compute_discounted_rewards(rewards, gamma, baseline=None):
    discounted_rewards = []
    R = 0
    for r in reversed(rewards):
        R = r + gamma * R
        discounted_rewards.insert(0, R)
    discounted_rewards = np.array(discounted_rewards)
    
    # Apply baseline, reduce variance
    if baseline is not None:
        discounted_rewards -= baseline

    return torch.FloatTensor(discounted_rewards)

In [6]:
# here is the training part code, we provide choose of cartpole or pong, gamma, usebaseline and epoch

def train_policy_network(run='cartpole', num_episodes=1000, gamma=0.95, use_baseline=False, baseline_value=0.0):
    all_rewards = []
    stacked_frames = deque(maxlen=4)

    for episode in range(num_episodes):
        state = env.reset()[0]
        if run == 'pong':
            state = preprocess(state)
            stacked_frames = deque([state] * 4, maxlen=4)
            state = np.stack(stacked_frames, axis=0).flatten()
        else:
            state = torch.FloatTensor(state).numpy()

        rewards = []
        log_probs = []
        
        ############
        # interact with env
        ############
        while True:
            action = select_action(state)
            if run == 'pong':
                mapped_action = action + 2  # Map 0 -> 2 (RIGHT), 1 -> 3 (LEFT)
                next_state, reward, done, _, _ = env.step(mapped_action)
                next_state = preprocess(next_state)
                stacked_frames.append(next_state)
                next_state = np.stack(stacked_frames, axis=0).flatten()
            else:
                next_state, reward, done, _, _ = env.step(action)
                next_state = torch.FloatTensor(next_state).numpy()

            rewards.append(reward)

            state_tensor = torch.FloatTensor(state)
            action_prob = policy_net(state_tensor.unsqueeze(0))[0, action]
            log_prob = torch.log(action_prob)
            log_probs.append(log_prob)
            
            state = next_state

            if done:
                break

        ############
        # optimize the model
        ############
        episode_reward = sum(rewards)
        all_rewards.append(episode_reward)

        baseline = baseline_value if use_baseline else np.mean(all_rewards[-100:]) if len(all_rewards) >= 100 else None

        discounted_rewards = compute_discounted_rewards(rewards, gamma, baseline)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-8)

        loss = -torch.sum(torch.stack(log_probs) * discounted_rewards)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if episode % 100 == 0:
            print(f'Episode {episode}, Total Reward: {episode_reward}')

    return all_rewards

In [7]:
# here is the eval part code, we provide choose of cartpole or pong

def evaluate_policy(run='cartpole', num_episodes=500):
    rewards = []
    stacked_frames = deque(maxlen=4)
    
    for episode in range(num_episodes):
        state = env.reset()[0]
        if run == 'pong':
            state = preprocess(state)
            stacked_frames = deque([state] * 4, maxlen=4)
            state = np.stack(stacked_frames, axis=0).flatten()
        else:
            state = torch.FloatTensor(state).numpy()

        total_reward = 0

        while True:
            action = select_action(state)
            if run == 'pong':
                mapped_action = action + 2
                next_state, reward, done, _, _ = env.step(mapped_action)
                next_state = preprocess(next_state)
                stacked_frames.append(next_state)
                next_state = np.stack(stacked_frames, axis=0).flatten()
            else:
                next_state, reward, done, _, _ = env.step(action)
                next_state = torch.FloatTensor(next_state).numpy()

            total_reward += reward
            state = next_state

            if done:
                break

        rewards.append(total_reward)
    
    plt.figure(figsize=(8, 6))
    plt.hist(rewards, bins=20, edgecolor='black', alpha=0.7)
    plt.xlabel('Total Reward')
    plt.ylabel('Frequency')
    plt.title('Histogram of Total Rewards Over 500 Episodes')
    plt.show()
    
    mean_reward = np.mean(rewards)
    std_reward = np.std(rewards)
    print(f'Mean Reward: {mean_reward}')
    print(f'Standard Deviation of Reward: {std_reward}')

In [None]:
run = 'carpole'
if_use_bl = False


if run == 'pong':
    gym.make("ALE/Pong-v5")
    input_size = 80 * 80 * 4 
    output_size = 2  # We only use left or right
    the_gamma = 0.99
else:
    env = gym.make('CartPole-v1')
    input_size = env.observation_space.shape[0]
    output_size = env.action_space.n
    the_gamma = 0.95

policy_net = PolicyNetwork(input_size=input_size, output_size=output_size)
optimizer = optim.Adam(policy_net.parameters(), lr=0.01)

# Train network
all_rewards = train_policy_network(run=run, num_episodes=1000, gamma=the_gamma, use_baseline=if_use_bl, baseline_value=0.0)
print("train down")

# Plot rewards
plt.figure(figsize=(10, 6))
plt.plot(all_rewards, label='Episode Reward')
moving_avg = np.convolve(all_rewards, np.ones(100)/100, mode='valid')
plt.plot(range(99, len(all_rewards)), moving_avg, label='100-Episode Moving Average', color='red')
plt.xlabel('Episodes')
plt.ylabel('Total Reward')
plt.title('Episode Reward vs Number of Episodes')
plt.legend()
plt.show()

# Evaluate the trained policy
evaluate_policy(run=run, num_episodes=500)

Episode 0, Total Reward: 12.0
Episode 100, Total Reward: 26.0
Episode 200, Total Reward: 79.0
Episode 300, Total Reward: 131.0
