In [None]:
!pip install gym 
!pip install 'gym[box2d]'
!pip install atari_py
!pip install ale-py
!pip install "gym[atari, accept-rom-license]"
!pip install gym[atari]
!pip install autorom[accept-rom-license]
!pip install matplotlib.
!pip install opencv-python
!pip install gym[atari]
!pip install torch torchvision
!pip install ale-py

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import gym
import numpy as np

def preprocess_state(state):
    state = state[35:195, :, :]  # crop the screen
    state = state[::2, ::2, :]  # downsample by a factor of 2
    state = state.mean(axis=2)  # convert to grayscale
    state = state / 255.0  # normalize
    return torch.tensor(state, dtype=torch.float32).unsqueeze(0).unsqueeze(0)


class ActorCritic(nn.Module):
    def __init__(self, num_actions):
        super(ActorCritic, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )

        # Calculate the flattened size
        self._flat_size = self._get_flat_size()

        self.fc = nn.Sequential(
            nn.Linear(self._flat_size, 512),
            nn.ReLU()
        )

        self.actor = nn.Linear(512, num_actions)
        self.critic = nn.Linear(512, 1)

    def _get_flat_size(self):
        x = torch.zeros(1, 1, 80, 80)
        x = self.conv(x)
        return x.view(1, -1).size(1)

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        action_prob = torch.softmax(self.actor(x), dim=-1)
        value = self.critic(x)
        return action_prob, value

def ppo_iter(mini_batch_size, states, actions, log_probs, returns, advantage):
    batch_size = states.size(0)
    for _ in range(batch_size // mini_batch_size):
        rand_ids = np.random.randint(0, batch_size, mini_batch_size)
        yield states[rand_ids], actions[rand_ids], log_probs[rand_ids], returns[rand_ids], advantage[rand_ids]


def ppo_update(model, optimizer, ppo_epochs, mini_batch_size, states, actions, log_probs, returns, advantages, clip_param=0.2):
    for _ in range(ppo_epochs):
        for state, action, old_log_probs, return_, advantage in ppo_iter(mini_batch_size, states, actions, log_probs, returns, advantages):
            action_prob, value = model(state)
            dist = Categorical(action_prob)
            entropy = dist.entropy().mean()
            new_log_probs = dist.log_prob(action)

            ratio = (new_log_probs - old_log_probs).exp()
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1.0 - clip_param, 1.0 + clip_param) * advantage

            actor_loss = -torch.min(surr1, surr2).mean()
            critic_loss = (return_ - value).pow(2).mean()

            optimizer.zero_grad()
            (actor_loss + 0.5 * critic_loss - 0.001 * entropy).backward()
            optimizer.step()
            
def compute_gae(next_value, rewards, masks, values, gamma, tau):
    values = values + [next_value]
    gae = 0
    returns = []
    for step in reversed(range(len(rewards))):
        delta = rewards[step] + gamma * values[step + 1] * masks[step] - values[step]
        gae = delta + gamma * tau * masks[step] * gae
        returns.insert(0, gae + values[step])
    return returns

In [None]:
import torchvision.transforms.functional as TF
import matplotlib.pyplot as plt

# Load the original state (frame) from the environment
env = gym.make('ALE/Pong-v5')
state = env.reset()

# Preprocess the state using the provided preprocessing function
preprocessed_state = preprocess_state(state)

# Convert the preprocessed PyTorch tensor to a PIL Image
preprocessed_image = TF.to_pil_image(preprocessed_state.squeeze(0).squeeze(0))

# Display the preprocessed image
plt.imshow(preprocessed_image, cmap='gray')
plt.show()

In [None]:
import os

def save_model(model, save_path='models', model_name='ppo_pong.pt'):
    if not os.path.exists(save_path):
        os.makedirs(save_path)
        
    torch.save(model.state_dict(), os.path.join(save_path, model_name))
    print(f"Model saved as {os.path.join(save_path, model_name)}")

def ppo(env_name='ALE/Pong-v5', num_actions=3, num_epochs=10000, num_steps=1000,
        mini_batch_size=64, ppo_epochs=4, gamma=0.99, tau=0.95,
        lr=1e-3, clip_param=0.2, log_interval=10, save_interval=100):
    
    training_rewards = []
    logged_rewards = []
    
    env = gym.make(env_name)
    device = torch.device("cpu")
    model = ActorCritic(num_actions).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    action_mapping = {
        0: 0,
        1: 2,
        2: 3
    }

    # Training loop
    for epoch in range(num_epochs):
        state = env.reset()
        state = preprocess_state(state).to(device)
        
        log_probs = []
        values = []
        rewards = []
        masks = []
        actions = []
        states = []
        
        for step in range(num_steps):
            action_prob, value = model(state)
            dist = Categorical(action_prob)
            action = dist.sample()
            
            next_state, reward, done, _ = env.step(action_mapping[action.item()])
            next_state = preprocess_state(next_state).to(device)
            log_prob = dist.log_prob(action)
            
            log_probs.append(log_prob)
            values.append(value)
            rewards.append(torch.tensor([reward], dtype=torch.float, device=device))
            masks.append(torch.tensor([1 - done], dtype=torch.float, device=device))
            actions.append(action)
            
            states.append(state) 
            state = next_state
            
            if done:
                break

        next_value = model(state)[1]
        returns = compute_gae(next_value, rewards, masks, values, gamma, tau)
        

        returns = torch.cat(returns).detach()
        log_probs = torch.cat(log_probs).detach()
        values = torch.cat(values).detach()
        actions = torch.cat(actions)
        states  = torch.cat(states)
        advantages = returns - values
        
        ppo_update(model, optimizer, ppo_epochs, mini_batch_size, states, actions, log_probs, returns, advantages, clip_param)

        training_rewards.append(torch.sum(torch.tensor(rewards)).item())
        
        if epoch % log_interval == 0:
            print(f'Epoch: {epoch}, Reward: {np.sum(rewards)}')
            logged_rewards.append(np.sum(rewards))
        
        if epoch % save_interval == 0:
            save_model(model)
            
    
    #plt.plot(training_rewards)
    #plt.xlabel('Epoch')
    #plt.ylabel('Reward')
    #plt.title('Training Rewards per Epoch')
    #plt.show()
        
    return model, logged_rewards

In [None]:
import matplotlib.pyplot as plt

trained_model, logged_rewards = ppo()

In [None]:
def moving_average(data, window_size):
    padding = np.zeros(window_size - 1)
    data_padded = np.concatenate((padding, data))
    cumsum = np.cumsum(data_padded)
    return (cumsum[window_size:] - cumsum[:-window_size]) / float(window_size)

window_size = 5 # Adjust this value to your preference
smoothed_rewards = moving_average(logged_rewards, window_size)

plt.figure(figsize=(12, 6))
plt.plot(np.arange(0, len(smoothed_rewards2) * 10, 10), smoothed_rewards2)
plt.xlabel('Eposide')
plt.ylabel('Reward')
plt.title('Smoothed Training Rewards per Episode')
plt.show(

In [None]:
def evaluate_trained_model(trained_model, env_name='ALE/Pong-v5', num_episodes=500):
    env = gym.make(env_name)
    device = torch.device("cpu")
    rewards = []

    action_mapping = {
        0: 0,
        1: 2,
        2: 3
    }

    for episode in range(num_episodes):
        state = env.reset()
        state = preprocess_state(state).to(device)
        episode_reward = 0

        while True:
            with torch.no_grad():
                action_prob, _ = trained_model(state)
                dist = Categorical(action_prob)
                action = dist.sample()

            next_state, reward, done, _ = env.step(action_mapping[action.item()])
            state = preprocess_state(next_state).to(device)

            episode_reward += reward

            if done:
                rewards.append(episode_reward)
                print(f'Episode: {episode}, Reward: {episode_reward}')
                break

    return rewards

evaluation_rewards = evaluate_trained_model(trained_model)
window_size = 5  # Adjust the window size for the moving average
smoothed_evaluation_rewards = moving_average(evaluation_rewards, window_size)

plt.figure(figsize=(12, 6))
plt.plot(smoothed_evaluation_rewards)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Evaluation Rewards per Episode')
plt.grid(True)
plt.show()