<a href="https://colab.research.google.com/github/alisa-fh/reinforcement-learning-gravitar/blob/main/A2C_Atari.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# https://github.com/victor-armegioiu/Deep-RL---Policy-Gradients/blob/f7cb3850019148c71d81439aefe0055de203fb1b/Actor%20Critic/actor_critic.py
import gym
import numpy as np
from itertools import count
from collections import namedtuple
from PIL import Image

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import random

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
size = (82, 82)
video_every   = 25
print_every   = 5
gamma = 0.98
learning_rate = 3e-2

env = gym.make('Gravitar-v0')
# env = gym.make('Atlantis-v0')

seed = 742
torch.manual_seed(seed)
env.seed(seed)
random.seed(seed)
np.random.seed(seed)
env.action_space.seed(seed)
env = gym.wrappers.Monitor(env, "./video", video_callable=lambda episode_id: (episode_id%video_every)==0,force=True)


In [None]:
SavedAction = namedtuple('SavedAction', ['log_prob', 'value'])

In [None]:
class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)

In [None]:
class Policy(nn.Module):
    """
    implements both actor and critic in one model
    """
    def __init__(self):
        super(Policy, self).__init__()
    
        self.conv1 = nn.Conv2d(3, 5, kernel_size=(3, 3), stride=(1, 1))
        self.conv2 = nn.Conv2d(5, 2, kernel_size=(4, 4), stride=(2, 2))
        self.conv3 = nn.Conv2d(2, 2, kernel_size=(5, 5), stride=(2, 2))

        self.flatten = Flatten()
        self.linear = nn.Linear(in_features=3700, out_features=800, bias=True)

        # actor's layer
        self.action_head = nn.Linear(800, env.action_space.n)

        # critic's layer
        self.value_head = nn.Linear(800, 1)

        # action & reward buffer
        self.saved_actions = []
        self.rewards = []

    def forward(self, x):
        """
        forward of both actor and critic
        """
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = self.flatten(x)
        x = F.relu(self.linear(x))

        # actor: choses action to take from state s_t 
        # by returning probability of each action
        action_prob = F.softmax(self.action_head(x), dim=-1)

        # critic: evaluates being in the state s_t
        state_values = self.value_head(x)

        # return values for both actor and critic as a tupel of 2 values:
        # 1. a list with the probability of each action over the action space
        # 2. the value from state s_t 
        return action_prob, state_values

In [None]:
# Model contains list of probability of each action over action space and the critic values from state s_t
model = Policy()
# TRY different learning rates
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
eps = np.finfo(np.float32).eps.item()

In [None]:
def select_action(state):
    state = torch.from_numpy(state).float()
    state = state.permute(2, 0, 1)
    probs, state_value = model(state[None, :])
    # #Select an action (0 or 1) by running policy model and choosing based on the probabilities in state
    # create a categorical distribution over the list of probabilities of actions
    c = Categorical(probs)

    # and sample an action using the distribution
    action = c.sample()

    # save to action buffer
    model.saved_actions.append(SavedAction(c.log_prob(action), state_value))

    # the action to take (left or right)
    return action.item()

In [None]:
def finish_episode():
    """
    Training code. Calcultes actor and critic loss and performs backprop.
    """
    R = 0
    saved_actions = model.saved_actions
    policy_losses = [] # list to save actor (policy) loss
    value_losses = [] # list to save critic (value) loss
    returns = [] # list to save the true values aka rewards

    # calculate the true value using rewards returned from the environment
    for r in model.rewards[::-1]:
        # calculate the discounted value
        R = r + gamma * R
        returns.insert(0, R)

    # Scale rewards/returns
    print("returns")
    returns = torch.tensor(returns).to(device)
    returns = (returns - returns.mean()) / (returns.std() + np.finfo(np.float32).eps)

    for (log_prob, value), R in zip(saved_actions, returns):
        advantage = R - value.item()

        # calculate actor (policy) loss 
        policy_losses.append(-log_prob * advantage)

        # calculate critic (value) loss using L1 smooth loss
        value_losses.append(F.smooth_l1_loss(value, torch.tensor([R])))

    # reset gradients
    optimizer.zero_grad()

    # sum up all the values of policy_losses and value_losses
    loss = torch.stack(policy_losses).sum() + torch.stack(value_losses).sum()
    # perform backprop
    loss.backward()
    optimizer.step()
    print("backprop")

    # reset rewards and action buffer
    del model.rewards[:] # Deletes all elements in the array
    del model.saved_actions[:]

In [None]:
def main():
    running_reward = 10

    # run inifinitely many episodes
    for i_episode in range(int(1e32)): #aka while True

        # reset environment and episode reward
        state = env.reset()
        ep_reward = 0

        # for each episode, only run 9999 steps so that we don't 
        # infinite loop while learning
        for t in range(1, 10000):

            # select action from policy
            action = select_action(state)

            # take the action
            state, reward, done, _ = env.step(action)

            # if args.render:
            #     env.render()
            
            #Save reward
            model.rewards.append(reward)
            ep_reward += reward
            if done:
                break

        # update cumulative reward
        running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward

        # perform backprop
        finish_episode()

        # log results
        if i_episode%1 == 0:
            print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                  i_episode, ep_reward, running_reward))
        # # check if we have "solved" the cart pole problem
        # if running_reward > env.spec.reward_threshold:
        #     print("Solved! Running reward is now {} and "
        #           "the last episode runs to {} time steps!".format(running_reward, t))
        #     break

In [None]:
main()

backprop
Episode 1150	Last reward: 100.00	Average reward: 151.92


KeyboardInterrupt: ignored