# Pong with Advantage Actor Critic

## Step 1: Import the libraries

In [1]:
!pip install requirements.txt

In [4]:
!AutoROM -y

AutoROM will download the Atari 2600 ROMs.
They will be installed to:
	/usr/local/lib/python3.8/dist-packages/AutoROM/roms

Existing ROMs will be overwritten.
robotank.bin
time_pilot.bin
space_invaders.bin
et.bin
ms_pacman.bin
video_pinball.bin
laser_gates.bin
phoenix.bin
flag_capture.bin
name_this_game.bin
star_gunner.bin
private_eye.bin
freeway.bin
human_cannonball.bin
battle_zone.bin
pitfall.bin
beam_rider.bin
kaboom.bin
haunted_house.bin
road_runner.bin
mr_do.bin
riverraid.bin
warlords.bin
enduro.bin
chopper_command.bin
miniature_golf.bin
air_raid.bin
video_chess.bin
alien.bin
word_zapper.bin
turmoil.bin
breakout.bin
fishing_derby.bin
crazy_climber.bin
joust.bin
journey_escape.bin
kung_fu_master.bin
space_war.bin
entombed.bin
koolaid.bin
demon_attack.bin
gopher.bin
__pycache__
mario_bros.bin
lost_luggage.bin
asterix.bin
kangaroo.bin
tic_tac_toe_3d.bin
video_cube.bin
solaris.bin
ice_hockey.bin
tetris.bin
assault.bin
galaxian.bin
wizard_of_wor.bin
backgammon.bin
amidar.bin
klax.bin
k

In [5]:
import gymnasium as gym
import torch
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
from IPython.display import clear_output
from random import sample

%matplotlib inline

  logger.warn(f"Overriding environment {new_spec.id} already in registry.")
  logger.warn(f"Overriding environment {new_spec.id} already in registry.")


In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
#%cd '/content/drive/MyDrive/Reinforcement_learning/notebooks'
%cd 'drive/My Drive/Colab Notebooks/M2DS/Reinforcement_learning/notebooks'

/content/drive/My Drive/Colab Notebooks/M2DS/Reinforcement_learning/notebooks


In [8]:
import sys
sys.path.append('../../')
from a2c_agent import A2CAgent
from actor_critic_cnn import ActorCnn, CriticCnn

In [9]:
from gymnasium.wrappers import FrameStack

## Step 1: Create our environment

Initialize the environment in the code cell below.


In [10]:
env = gym.make("Pong-v4", mode=1, obs_type="grayscale")
env = gym.wrappers.ResizeObservation(env, (84, 84))
env = FrameStack(env, 4)

  logger.warn(


In [11]:
# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device: ", device)

Device:  cuda


## Step 2: Creating our Agent

In [12]:
INPUT_SHAPE = (4, 84, 84)
ACTION_SIZE = env.action_space.n
SEED = 0
GAMMA = 0.99           # discount factor
ALPHA= 0.0001          # Actor learning rate
BETA = 0.0005          # Critic learning rate
UPDATE_EVERY = 5     # how often to update the network 

agent = A2CAgent(INPUT_SHAPE, ACTION_SIZE, SEED, device, GAMMA, ALPHA, BETA, UPDATE_EVERY, ActorCnn, CriticCnn)

## Step 3: Creating our HumanRewarder

In [13]:
from reward_function import HumanFeedBackRewardFunction

In [14]:
human_rewarder = HumanFeedBackRewardFunction(device)

## Step 4: Creating human choice

In [15]:
def getHumanChoice(obs_1, obs_2):
    obs_1 = (np.array(obs_1) * 255).astype(np.uint8)
    obs_2 = (np.array(obs_2) * 255).astype(np.uint8)

    # obs_1 = obs_1.reshape((obs_1.shape[0]*obs_1.shape[1], obs_1.shape[2], obs_1.shape[3]))
    # obs_2 = obs_2.reshape((obs_2.shape[0]*obs_2.shape[1], obs_2.shape[2], obs_2.shape[3]))

    imageio.mimsave(
        "./example1.gif", obs_1, fps=15  # output gif  # array of input frames
    )

    imageio.mimsave("./example2.gif", obs_2, fps=15)

    clear_output(True)

    print("Choose between 2 scenarios:")
    print(
        "0: They are equally good, 1: The 1st is better, 2: The 2nd is better, 3: None are good"
    )

    img1 = open("example1.gif", "rb").read()
    img2 = open("example2.gif", "rb").read()

    wi1 = widgets.Image(value=img1, format="png", width=300, height=400)
    wi2 = widgets.Image(value=img2, format="png", width=300, height=400)

    sidebyside = widgets.HBox([wi1, wi2])

    display(sidebyside)

    choice = input("Press 0 or 1 or 2 or 3:")

    if choice == 0:
        human_choice = [0.5, 0.5]
    elif choice == 1:
        human_choice = [1, 0]
    elif choice == 2:
        human_choice = [0, 1]
    else:
        human_choice = [0, 0]

    return human_choice

## Step 5: Initialize D space

In [16]:
def initialize_D_space(real_human_check=False):
    print("initialize_D_space: Start")

    D_space = []

    while len(D_space) < 500:
        obs, _ = env.reset()
        time_step = 0
        obs_1_list = []
        obs_2_list = []
        real_reward_1 = 0
        real_reward_2 = 0

        obs = np.array(obs._frames)
        obs = np.ascontiguousarray(obs, dtype=np.float32) / 255

        while len(D_space) < 500:
            time_step += 1

            action, log_prob, entropy = agent.act(obs)
            next_obs, reward, terminated, truncated, info = env.step(action)

            next_obs = np.array(next_obs._frames)
            next_obs = np.ascontiguousarray(next_obs, dtype=np.float32) / 255

            obs = next_obs

            if time_step <= 25:
                obs_1_list.append(
                    obs[-1]
                )  # We keep only the last frame since 1 step = 1 frame
                real_reward_1 += reward
            else:
                obs_2_list.append(
                    obs[-1]
                )  # We keep only the last frame since 1 step = 1 frame
                real_reward_2 += reward

            if time_step >= 50:
                if real_human_check:
                    human_choice = getHumanChoice(obs_1_list, obs_2_list)
                else:
                    # Here we fake the behavior of our real human assessor
                    if real_reward_1 > real_reward_2:
                        human_choice = [1, 0]
                    else:
                        human_choice = [0, 1]

                if human_choice != [0, 0]:
                    D_space.append(np.array([obs_1_list, obs_2_list, human_choice]))

                obs_1_list = []
                obs_2_list = []
                time_step = 0

    print("initialize_D_space: End")

    return D_space

## Step 6: Creating HumanRewarder Train Function

In [17]:
# We shall call this function each 10 timesteps
def trainHumanRewarder(D_space):
    i = 10  # We sample 10 triplets from D

    D_sampled = sample(D_space, i)

    fake_reward_1_list_all = torch.Tensor([])  # (i, 3) --> i segments and 3 rewarder
    fake_reward_2_list_all = torch.Tensor([])

    human_choice_list = []

    for triplet in D_sampled:
        fake_reward_1_list = []  # (1, 3) --> 1 segment duos and 3 rewarder
        fake_reward_2_list = []

        human_choice_list.append(triplet[2])

        for n in range(0, len(triplet[0]) - 1, 4):
            obs_1_i = np.array(triplet[0][n : n + 4])
            obs_2_i = np.array(triplet[1][n : n + 4])

            fake_reward_1_list.append(human_rewarder(obs_1_i, True))  # We append (1,3)
            fake_reward_2_list.append(human_rewarder(obs_2_i, True))

        # fake_reward_1_list = torch.sum(torch.Tensor(fake_reward_1_list), axis = 0) #From (6,3) to (1, 3)
        # fake_reward_2_list = torch.sum(torch.Tensor(fake_reward_2_list), axis = 0)
        fake_reward_1_list = torch.sum(
            fake_reward_1_list, axis=0
        )  # From (6,3) to (1, 3)
        fake_reward_2_list = torch.sum(fake_reward_2_list, axis=0)

        # fake_reward_1_list_all.append(fake_reward_1_list)
        if len(fake_reward_1_list_all) == 0:
            fake_reward_1_list_all = fake_reward_1_list.unsqueeze(0)
            fake_reward_2_list_all = fake_reward_2_list.unsqueeze(0)
        else:
            fake_reward_1_list_all = torch.cat(
                (fake_reward_1_list_all, fake_reward_1_list.unsqueeze(0)), 0
            )
            fake_reward_2_list_all = torch.cat(
                (fake_reward_2_list_all, fake_reward_2_list.unsqueeze(0)), 0
            )

        # fake_reward_2_list_all.append(fake_reward_2_list)

    human_choice_list = torch.Tensor(human_choice_list)
    fake_reward_1_list_all = torch.Tensor(fake_reward_1_list_all)
    fake_reward_2_list_all = torch.Tensor(fake_reward_2_list_all)

    # Update rewarders weights
    human_rewarder.update(
        fake_reward_1_list_all, fake_reward_2_list_all, human_choice_list
    )
  

## Step 7: Pretraining HumanRewarder

In the Atari domain we also pretrain the reward predictor for 200 epochs before beginning RL training, to reduce the likelihood of irreversibly learning a bad policy based on an untrained predictor.

In [18]:
def pretrainHumanRewarder(D_space):
    print("pretrainHumanRewarder: Start")
    for epoch in range(0, 200):
        trainHumanRewarder(D_space)

        if epoch % 20 == 0:
            print("pretrainHumanRewarder, Step:", epoch, "/200")

    print("pretrainHumanRewarder: End")


## Step 8: Feeding D Space

In [19]:
# We shall call this function each T+5e6/5e6 timesteps
def feedingDSpace(
    obs1, obs2, D_space, real_human_check=False, real_reward_1=[], real_reward_2=[]
):
    if real_human_check:
        human_choice = getHumanChoice(obs1, obs2)
    else:
        # Here we fake the behavior of our real human assessor
        if np.array(real_reward_1).sum() > np.array(real_reward_2).sum():
            human_choice = [1, 0]
        else:
            human_choice = [0, 1]

    if human_choice != [0, 0]:
        D_space.append(np.array([obs1, obs2, human_choice]))

    # We keep only the 3000 lastest triplets
    if len(D_space) > 3000:
        D_space.pop(0)

    return D_space

## Step 9: Train the Agent with A2C + HumanRewarder

In [20]:
import imageio
from IPython.display import Image
from IPython.display import display
from ipywidgets import  widgets, HBox

start_epoch = 0
scores = []
scores_window = deque(maxlen=20)
scores_fake = []

scores_mean = []
scores_fake_mean = []

In [22]:
def train_with_humanRewarder(n_episodes=1000, real_human_check=False):
    """
    Params
    ======
        n_episodes (int): maximum number of training episodes
    """

    # We initialize our D space
    D_space = initialize_D_space(real_human_check)

    # We pretrain our humanRewarder
    pretrainHumanRewarder(D_space)

    T = 1  # Timestep over all the episodes
    timestep = 0  # timestep over one episoded
    labelled_number = 0  # Number of time we've asked the human to label 2 segments

    scores = []
    scores_fake = []
    T_to_plot = []

    obs_list = []
    real_reward_list = []

    for i_episode in range(0, n_episodes + 1):
        obs, _ = env.reset()
        obs = np.array(obs._frames)
        obs = np.ascontiguousarray(obs, dtype=np.float32) / 255

        score = 0
        score_fake = 0
        timestep = 0

        while True:
            # Each 10 timestep we train our HumanRewarder
            if timestep % 10 == 0:
                trainHumanRewarder(D_space)

            # We take an action and our agent learn from it
            action, log_prob, entropy = agent.act(obs)
            next_obs, reward, terminated, truncated, info = env.step(action)
            fake_reward = human_rewarder(obs)
            score_fake += torch.Tensor(fake_reward).mean().cpu().detach().numpy()
            score += reward

            next_obs = np.array(next_obs._frames)
            next_obs = np.ascontiguousarray(next_obs, dtype=np.float32) / 255

            # Each 5 timestep we train our Agent
            agent.step(
                obs,
                log_prob,
                entropy,
                fake_reward[0].mean().cpu().detach().numpy(),
                terminated or truncated,
                next_obs,
            )

            obs = next_obs

            ####################
            # Part feed D_space

            obs_list.append(obs[-1])
            real_reward_list.append(reward)
            if len(obs_list) > 50:
                obs_list.pop(0)
                real_reward_list.pop(0)

            # We do T*4 because each timestep is composed of 4 frames ? Not sure
            if (1 / 2) ** (((T * 1e3) + 5e6) / 5e6) > labelled_number:
                if len(obs_list) == 50:
                    print("Feed D_space")
                    D_space = feedingDSpace(
                        obs_list[0:25],
                        obs_list[25:50],
                        D_space,
                        real_human_check,
                        real_reward_list[0:25],
                        real_reward_list[25:50],
                    )
                    obs_list = []
                    real_reward_list = []
                    labelled_number += 1

            timestep += 1
            T += 1

            if terminated or truncated:
                break

        # if i_episode%10 == 0:
        #  for i in range(0, 3):
        #    torch.save(human_rewarder.nets[i].state_dict(), './Human_rewarder_saved_model_' + str(i))
        #  torch.save(agent.actor_net.state_dict(), './actor_net_saved_model')
        #  torch.save(agent.critic_net.state_dict(), './critic_net_saved_model')

        scores.append(score)  # save most recent score
        scores_fake.append(score_fake)
        T_to_plot.append(T)

        clear_output(True)
        plt.title("pong")
        plt.plot(T_to_plot, scores, label="RL")
        plt.plot(T_to_plot, scores_fake, label="3k synthetic labels")
        plt.ylabel("reward")
        plt.xlabel("timestep")

        plt.legend(loc=4)

        plt.show()
        print("Episode:", i_episode)

    return scores

In [None]:
scores = train_with_humanRewarder(10000, False)

initialize_D_space: Start


  D_space.append(np.array([obs_1_list, obs_2_list, human_choice]))


In [None]:
#torch.save(human_rewarder.net.state_dict(), './Human_rewarder_saved_model')
#human_rewarder.net.load_state_dict(torch.load('Human_rewarder_saved_model'))
