In [1]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
from tqdm import tqdm

# Check if CUDA is available and set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

def preprocess(image):
    """ prepro 210x160x3 uint8 frame into 6400 (80x80) 2D float array """
    image = image[35:195] # crop
    image = image[::2,::2,0] # downsample by factor of 2
    image[image == 144] = 0 # erase background (background type 1)
    image[image == 109] = 0 # erase background (background type 2)
    image[image != 0] = 1 # everything else (paddles, ball) just set to 1
    return np.reshape(image.astype(np.float32).ravel(), [80,80])

class PolicyNetwork(nn.Module):
    def __init__(self):
        super(PolicyNetwork, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU()
        )
        self.fc = nn.Sequential(
            nn.Linear(32 * 10 * 10, 256),
            nn.ReLU(),
            nn.Linear(256, 2)  # Only LEFT and RIGHT actions
        )
    
    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return nn.Softmax(dim=-1)(x)

def run_episode(env, policy):
    state, _ = env.reset()
    prev_state = None
    log_probs = []
    rewards = []
    
    while True:
        curr_state = preprocess(state)
        if prev_state is not None:
            state_diff = curr_state - prev_state
        else:
            state_diff = np.zeros_like(curr_state)
        prev_state = curr_state
        
        state_diff = torch.FloatTensor(state_diff).unsqueeze(0).unsqueeze(0).to(device)
        action_probs = policy(state_diff)
        dist = Categorical(action_probs)
        action = dist.sample()
        
        log_prob = dist.log_prob(action)
        log_probs.append(log_prob)
        
        state, reward, done, _, _ = env.step(action.item() + 2)  # +2 to map to RIGHT, LEFT actions
        rewards.append(reward)
        
        if done:
            break
    
    return log_probs, rewards

def train(env, policy, optimizer, num_episodes, gamma):
    episode_rewards = []
    
    # Use tqdm for progress tracking
    for episode in tqdm(range(num_episodes), desc="Training Progress"):
        log_probs, rewards = run_episode(env, policy)
        
        returns = []
        R = 0
        for r in reversed(rewards):
            R = r + gamma * R
            returns.insert(0, R)
        returns = torch.tensor(returns, device=device)
        
        loss = []
        for log_prob, R in zip(log_probs, returns):
            loss.append(-log_prob * R)
        loss = torch.stack(loss).sum()
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        episode_rewards.append(sum(rewards))
        
        # Update tqdm postfix with the current average reward
        if episode % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            tqdm.write(f'Episode {episode}, Average Reward: {avg_reward:.2f}')
    
    return episode_rewards

# Main execution
env_name = 'PongNoFrameskip-v4'
env = gym.make(env_name, render_mode="rgb_array")
policy = PolicyNetwork().to(device)
optimizer = optim.Adam(policy.parameters(), lr=0.01)

num_episodes = 1000
gamma = 0.99

rewards = train(env, policy, optimizer, num_episodes, gamma)

# Plot results
import matplotlib.pyplot as plt

plt.plot(rewards)
plt.plot(np.convolve(rewards, np.ones(100)/100, mode='valid'))
plt.title('Episode Rewards')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.show()

# Evaluate the trained model
def evaluate(env, policy, num_episodes=500):
    episode_rewards = []
    for _ in tqdm(range(num_episodes), desc="Evaluation Progress"):
        state, _ = env.reset()
        prev_state = None
        total_reward = 0
        done = False
        while not done:
            curr_state = preprocess(state)
            if prev_state is not None:
                state_diff = curr_state - prev_state
            else:
                state_diff = np.zeros_like(curr_state)
            prev_state = curr_state
            
            state_diff = torch.FloatTensor(state_diff).unsqueeze(0).unsqueeze(0).to(device)
            action_probs = policy(state_diff)
            action = torch.argmax(action_probs).item()
            state, reward, done, _, _ = env.step(action + 2)  # +2 to map to RIGHT, LEFT actions
            total_reward += reward
        episode_rewards.append(total_reward)
    
    return episode_rewards

eval_rewards = evaluate(env, policy)
print(f'Mean Reward: {np.mean(eval_rewards):.2f}')
print(f'Standard Deviation: {np.std(eval_rewards):.2f}')

plt.hist(eval_rewards, bins=50)
plt.title('Histogram of Episode Rewards')
plt.xlabel('Reward')
plt.ylabel('Frequency')
plt.show()

Using device: cuda


A.L.E: Arcade Learning Environment (version 0.8.1+53f58b7)
[Powered by Stella]
Training Progress:   0%|          | 0/1000 [00:00<?, ?it/s]


AttributeError: module 'numpy' has no attribute 'float'.
`np.float` was a deprecated alias for the builtin `float`. To avoid this error in existing code, use `float` by itself. Doing this will not modify any behavior and is safe. If you specifically wanted the numpy scalar type, use `np.float64` here.
The aliases was originally deprecated in NumPy 1.20; for more details and guidance see the original release note at:
    https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations