# PPO

Promixal Policy Optimization

https://arxiv.org/pdf/1707.06347.pdf

In [37]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

import operator
import numpy as np
import random
from collections import namedtuple, deque
import matplotlib.pyplot as plt
import imageio

is_ipython = 'inline' in plt.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

import gym

## Parallel Env

In [349]:
# taken from openai/baseline
# with minor edits
# see https://github.com/openai/baselines/baselines/common/vec_env/subproc_vec_env.py
# 


import numpy as np
import gym
from multiprocessing import Process, Pipe
from abc import ABC, abstractmethod

class CloudpickleWrapper(object):
    """
    Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
    """

    def __init__(self, x):
        self.x = x

    def __getstate__(self):
        import cloudpickle
        return cloudpickle.dumps(self.x)

    def __setstate__(self, ob):
        import pickle
        self.x = pickle.loads(ob)

class VecEnv(ABC):
    """
    An abstract asynchronous, vectorized environment.
    """

    def __init__(self, num_envs, observation_space, action_space):
        self.num_envs = num_envs
        self.observation_space = observation_space
        self.action_space = action_space

    @abstractmethod
    def reset(self):
        """
        Reset all the environments and return an array of
        observations, or a dict of observation arrays.
        If step_async is still doing work, that work will
        be cancelled and step_wait() should not be called
        until step_async() is invoked again.
        """
        pass

    @abstractmethod
    def step_async(self, actions):
        """
        Tell all the environments to start taking a step
        with the given actions.
        Call step_wait() to get the results of the step.
        You should not call this if a step_async run is
        already pending.
        """
        pass

    @abstractmethod
    def step_wait(self):
        """
        Wait for the step taken with step_async().
        Returns (obs, rews, dones, infos):
         - obs: an array of observations, or a dict of
                arrays of observations.
         - rews: an array of rewards
         - dones: an array of "episode done" booleans
         - infos: a sequence of info objects
        """
        pass

    @abstractmethod
    def close(self):
        """
        Clean up the environments' resources.
        """
        pass

    def step(self, actions):
        """
        Step the environments synchronously.
        This is available for backwards compatibility.
        """
        self.step_async(actions)
        return self.step_wait()

    def render(self, mode='human'):
        #logger.warn('Render not defined for %s' % self)
        pass
        
    @property
    def unwrapped(self):
        if isinstance(self, VecEnvWrapper):
            return self.venv.unwrapped
        else:
            return self


def worker(remote, parent_remote, env_fn_wrapper):
    parent_remote.close()
    env = env_fn_wrapper.x
    while True:
        cmd, data = remote.recv()
        if cmd == 'step':
            ob, reward, done, info = env.step(data)
            if done:
                ob = env.reset()
            remote.send((ob, reward, done, info))
        elif cmd == 'reset':
            ob = env.reset()
            remote.send(ob)
        elif cmd == 'reset_task':
            ob = env.reset_task()
            remote.send(ob)
        elif cmd == 'close':
            remote.close()
            break
        elif cmd == 'get_spaces':
            remote.send((env.observation_space, env.action_space))
        else:
            raise NotImplementedError


class parallelEnv(VecEnv):
    def __init__(self, env_name='PongDeterministic-v4',
                 n=4, seed=None,
                 spaces=None):

        env_fns = [ gym.make(env_name) for _ in range(n) ]

        if seed is not None:
            for i,e in enumerate(env_fns):
                e.seed(i+seed)
        
        """
        envs: list of gym environments to run in subprocesses
        adopted from openai baseline
        """
        self.waiting = False
        self.closed = False
        nenvs = len(env_fns)
        self.remotes, self.work_remotes = zip(*[Pipe() for _ in range(nenvs)])
        self.ps = [Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
            for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]
        for p in self.ps:
            p.daemon = True # if the main process crashes, we should not cause things to hang
            p.start()
        for remote in self.work_remotes:
            remote.close()

        self.remotes[0].send(('get_spaces', None))
        observation_space, action_space = self.remotes[0].recv()
        VecEnv.__init__(self, len(env_fns), observation_space, action_space)

    def step_async(self, actions):
        for remote, action in zip(self.remotes, actions):
            remote.send(('step', action))
        self.waiting = True

    def step_wait(self):
        results = [remote.recv() for remote in self.remotes]
        self.waiting = False
        obs, rews, dones, infos = zip(*results)
        return np.stack(obs), np.stack(rews), np.stack(dones), infos

    def reset(self):
        for remote in self.remotes:
            remote.send(('reset', None))
        return np.stack([remote.recv() for remote in self.remotes])

    def reset_task(self):
        for remote in self.remotes:
            remote.send(('reset_task', None))
        return np.stack([remote.recv() for remote in self.remotes])

    def close(self):
        if self.closed:
            return
        if self.waiting:
            for remote in self.remotes:            
                remote.recv()
        for remote in self.remotes:
            remote.send(('close', None))
        for p in self.ps:
            p.join()
        self.closed = True


## Policy

In [363]:
class Policy(nn.Module):
    def __init__(self,seed,nS,nA,hidden_dims=(32,32)):
        super(Policy,self).__init__()
        self.seed = torch.manual_seed(seed)
        self.hidden_dims = hidden_dims
        self.nA = nA
        self.nS = nS
        self.size = hidden_dims[-1] * hidden_dims[-2]
        
        self.input_layer = nn.Linear(nS,hidden_dims[0])
        self.hidden_layers = nn.ModuleList()
        for i in range(1,len(self.hidden_dims)):
            hidden_layer = nn.Linear(hidden_dims[i-1],hidden_dims[i])
            self.hidden_layers.append(hidden_layer)
        self.output_layer = nn.Linear(hidden_dims[-1],nA)
        self.sig = nn.Sigmoid()
            
    def forward(self,state):
        x = state
        if not isinstance(state,torch.Tensor):
            x = torch.tensor(x,dtype=torch.float32) #device = self.device,
            x = x.unsqueeze(0)
        x = F.relu(self.input_layer(x))
        for hidden_layer in self.hidden_layers:
            x = F.relu(hidden_layer(x))
        # flatten the tensor
#         x = x.view(-1,self.size)
        return F.softmax(self.output_layer(x),dim=1)
#         return self.sig(self.output_layer(x))
    
    # Return the action along with the probability of the action. For weighting the reward garnered by the action.
    def act(self,state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.forward(state)
        m = Categorical(probs)
        action = m.sample()
#         print('action',action)
        return action.item(),m.log_prob(action)

## Training single Env

- Randomize starting position
- Gather trajectories
- Discount the rewards
- Update policy to maximize rewards

In [447]:
def initialize_env(env):
    # seed the environment with N step random actions prior
    nrand_actions = int(np.random.random() * 3)
    state = env.reset()
    for act in range(nrand_actions):
        state,reward,done,_ = env.step(env.action_space.sample())
    return state,env

# convert states to probability, passing through the policy
def states_to_prob(policy, states,actions):
    policy_input = torch.tensor(states,dtype=torch.float32)
    return policy(policy_input)
    

def collect_trajectories(env,policy,tmax):
    rewards = []
    dones = []
    states = []
    actions = []
    a_probs = []
#     state,env = initialize_env(env)
    state = env.reset()
    for t in range(tmax):
        probs = policy(state).cpu().detach().numpy()[0]
        action = np.random.choice([0,1],p=probs)
        state,reward,done,_ = env.step(action)

        actions.append(action)
        a_probs.append(probs)
        states.append(state)
        rewards.append(reward)
        dones.append(done)
        if done:
            break
    return actions,np.vstack(a_probs),states,rewards,dones

def clipped_surrogate(policy, old_probs, states, actions, rewards,
                      discount = 0.995, epsilon=0.1, beta=0.01):
#     print('rewards',rewards)
#     print('old_probs',old_probs)
#     print('actions',actions)
    # discount and take future rewards
    discounts = discount**np.arange(len(rewards))
    future_r = [rewards[i:]*discounts[:-i] if i>0 else rewards*discounts for i in range(len(rewards))]
    rewards_future = [sum(future_r[i]) for i in range(len(future_r))]
#     print('rewards_future',rewards_future)
    mean = np.mean(rewards_future)
    std = np.std(rewards_future) + 1.0e-10

    rewards_normalized = (rewards_future - mean)/std
    
    
    
    # convert states to policy (or probability)
    new_probs = states_to_prob(policy,states,actions)
    
#     print('actions',actions)
#     print('new_probs',new_probs)
#     print('old_probs',old_probs)
#     print('new_probs',new_probs.shape)
#     print('old_probs',old_probs.shape)
    # slice both according to the actions taken
    index = np.arange(new_probs.shape[0])
    new_probs = new_probs[index,np.array(actions)]
    old_probs = old_probs[index,np.array(actions)]
#     print('new_probs',new_probs.shape)
#     print('old_probs',old_probs.shape)
    
    
    # convert everything into pytorch tensors and move to gpu if available
    old_probs = torch.tensor(old_probs, dtype=torch.float, device=device)
    actions = torch.tensor(actions, dtype=torch.int8, device=device)
    rewards = torch.tensor(rewards_future, dtype=torch.float, device=device)
    
#     policy_input = torch.tensor(states,dtype=torch.float32)
#     new_probs = policy(policy_input).squeeze(-1)
#     new_probs = torch.where(actions == 0, new_probs[:,0], new_probs[:,1])
#     print('new_probs',new_probs)
    ratio = new_probs/old_probs

    clip = torch.clamp(ratio,1-epsilon,1+epsilon)
    clipped_surrogate = torch.min(ratio*rewards, clip*rewards)
    
    
    entropy = -(new_probs*torch.log(old_probs+1.e-10)+ \
        (1.0-new_probs)*torch.log(1.0-old_probs+1.e-10))
#     print('entropy',entropy)
#     print('ratio',ratio)
#     print('clip',clip)
#     print('ratio*rewards',ratio*rewards)
#     print('clip*rewards',clip*rewards)
#     print('clipped_surrogate',clipped_surrogate)
    return torch.mean(clipped_surrogate + entropy*beta)

# Single agent
def train(env,policy,optimizer,episodes,discount,epsilon,beta,tmax,SGD_epoch):
    total_rewards = []
    for i_episode in range(1,episodes+1):
        # get trajectories
        actions,a_probs,states,rewards,dones = collect_trajectories(env,policy,tmax)
        for _ in range(SGD_epoch):
            # Surrogate
            L = clipped_surrogate(policy,a_probs,states,actions,rewards,discount,epsilon,beta)

            optimizer.zero_grad()
            # for batch loss
#             print('L',L)
            L.backward()
            optimizer.step()
            del L
        
        # the clipping parameter reduces as time goes on
        epsilon*=.999

        # the regulation term also reduces
        # this reduces exploration in later runs
        beta*=.995

        # get the average reward of the parallel environments
        total_rewards.append(sum(rewards))

        # display some progress every 20 iterations
        if i_episode % 50 == 0:
            print("Episode: {0:d}, score: {1:f}".format(i_episode,total_rewards[-1]))
    
def main():
    seed = 1234
    env = gym.make('CartPole-v0')
    env.seed(seed)
    nA = env.action_space.n
#     nA = 1
    nS = env.observation_space.shape[0]
    policy = Policy(seed,nS,nA).to(device)
    optimizer = optim.Adam(policy.parameters(), lr = 1e-2)
    
    discount_rate = .995
    epsilon = 0.1
    beta = .01
    tmax = 200
    SGD_epoch = 4
    episodes = 1000
    
    train(env,policy,optimizer,episodes,discount_rate,epsilon,beta,tmax,SGD_epoch)

In [448]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
main()

Episode: 50, score: 25.000000
Episode: 100, score: 56.000000
Episode: 150, score: 21.000000
Episode: 200, score: 22.000000
Episode: 250, score: 37.000000
Episode: 300, score: 28.000000
Episode: 350, score: 32.000000
Episode: 400, score: 26.000000
Episode: 450, score: 29.000000
Episode: 500, score: 49.000000
Episode: 550, score: 39.000000
Episode: 600, score: 61.000000
Episode: 650, score: 36.000000
Episode: 700, score: 30.000000
Episode: 750, score: 25.000000
Episode: 800, score: 39.000000
Episode: 850, score: 68.000000
Episode: 900, score: 34.000000
Episode: 950, score: 29.000000
Episode: 1000, score: 41.000000


## Train Multiproccessing

In [418]:
RIGHT = 1
LEFT = 0
def collect_multi_trajectories(env,policy,tmax):
    n = 8
    nrand = 2
    # number of parallel instances
    n=len(envs.ps)

    #initialize returning lists and start the game!
    state_list=[]
    reward_list=[]
    prob_list=[]
    action_list=[]

    envs.reset()
    
    # start all parallel agents
    envs.step([1]*n)
    
    # perform nrand random steps
    for _ in range(nrand):
        fr1, re1, _, _ = envs.step(np.random.choice([RIGHT, LEFT],n))
        fr2, re2, _, _ = envs.step([0]*n)
    
    for t in range(tmax):

        # prepare the input
        # preprocess_batch properly converts two frames into 
        # shape (n, 2, 80, 80), the proper input for the policy
        # this is required when building CNN with pytorch
        batch_input = torch.tensor([fr1,fr2],dtype=torch.float32)
        
        # probs will only be used as the pi_old
        # no gradient propagation is needed
        # so we move it to the cpu
        probs = policy(batch_input).squeeze().cpu().detach().numpy()
        
        ###
        print('probs',probs.shape)
        print('np.random.rand(n)',np.random.rand(n).shape)
        action = np.where(np.random.rand(n) < probs, RIGHT, LEFT)
        probs = np.where(action==RIGHT, probs, 1.0-probs)
        
        
        # advance the game (0=no action)
        # we take one action and skip game forward
        fr1, re1, is_done, _ = envs.step(action)
        fr2, re2, is_done, _ = envs.step([0]*n)

        reward = re1 + re2
        
        # store the result
        state_list.append(batch_input)
        reward_list.append(reward)
        prob_list.append(probs)
        action_list.append(action)
        
        # stop if any of the trajectories is done
        # we want all the lists to be retangular
        if is_done.any():
            break


    # return pi_theta, states, actions, rewards, probability
    return prob_list, state_list, \
        action_list, reward_list

def clipped_surrogate_multi(policy, old_probs, states, actions, rewards,
                      discount = 0.995, epsilon=0.1, beta=0.01):
    discount = discount**np.arange(len(rewards))
    rewards = np.asarray(rewards)*discount[:,np.newaxis]
    
    # convert rewards to future rewards
    rewards_future = rewards[::-1].cumsum(axis=0)[::-1]
    
    mean = np.mean(rewards_future, axis=1)
    std = np.std(rewards_future, axis=1) + 1.0e-10

    rewards_normalized = (rewards_future - mean[:,np.newaxis])/std[:,np.newaxis]
    
    # convert everything into pytorch tensors and move to gpu if available
    actions = torch.tensor(actions, dtype=torch.int8, device=device)
    old_probs = torch.tensor(old_probs, dtype=torch.float, device=device)
    rewards = torch.tensor(rewards_normalized, dtype=torch.float, device=device)

    # convert states to policy (or probability)
    new_probs = states_to_prob(policy, states)
    new_probs = torch.where(actions == RIGHT, new_probs, 1.0-new_probs)
    
    # ratio for clipping
    ratio = new_probs/old_probs

    # clipped function
    clip = torch.clamp(ratio, 1-epsilon, 1+epsilon)
    clipped_surrogate = torch.min(ratio*rewards, clip*rewards)

    # include a regularization term
    # this steers new_policy towards 0.5
    # add in 1.e-10 to avoid log(0) which gives nan
    entropy = -(new_probs*torch.log(old_probs+1.e-10)+ \
        (1.0-new_probs)*torch.log(1.0-old_probs+1.e-10))
    
    print('entropy',entropy)
    print('clipped_surrogate',clipped_surrogate)
    # this returns an average of all the entries of the tensor
    # effective computing L_sur^clip / T
    # averaged over time-step and number of trajectories
    # this is desirable because we have normalized our rewards
    return torch.mean(clipped_surrogate + beta*entropy)

# Single agent
def train_multi(env,policy,optimizer,episodes,discount,epsilon,beta,tmax,SGD_epoch):
    mean_rewards = []
    for i_episode in range(1,episodes+1):
        # get trajectories
        actions,a_probs,states,rewards,dones = collect_multi_trajectories(env,policy,tmax)
        for _ in range(SGD_epoch):
            # Surrogate
            L = -clipped_surrogate_multi(policy,a_probs,states,actions,rewards,discount,epsilon,beta)

            optimizer.zero_grad()
            # for batch loss
            print('L',L)
            L.backward()
            optimizer.step()
            del L
        
        # the clipping parameter reduces as time goes on
        epsilon*=.999

        # the regulation term also reduces
        # this reduces exploration in later runs
        beta*=.995

        # get the average reward of the parallel environments
        total_rewards.append(sum(rewards))

        # display some progress every 20 iterations
        if i_episode % 20 == 0:
            print("Episode: {0:d}, score: {1:f}".format(i_episode,total_rewards[-1]))
    
def main_multi():
    seed = 1234
    envs = parallelEnv('CartPole-v0', n=8, seed=1234)
    env = gym.make('CartPole-v0')
    nA = env.action_space.n
#     nA = 1
    nS = env.observation_space.shape[0]
    del env
    policy = Policy(seed,nS,nA).to(device)
    optimizer = optim.Adam(policy.parameters(), lr = 1e-2)
    
    discount_rate = .995
    epsilon = 0.1
    beta = .01
    tmax = 200
    SGD_epoch = 4
    episodes = 500
    
    train_multi(envs,policy,optimizer,episodes,discount_rate,epsilon,beta,tmax,SGD_epoch)

In [362]:
main_multi()

probs (2, 8, 2)
np.random.rand(n) (8,)


ValueError: operands could not be broadcast together with shapes (8,) (2,8,2) 

Process Process-54:
Process Process-53:
Process Process-51:
Process Process-55:
Process Process-56:
Traceback (most recent call last):
  File "/anaconda3/envs/torch/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/anaconda3/envs/torch/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-349-d097a606b668>", line 104, in worker
    cmd, data = remote.recv()
  File "/anaconda3/envs/torch/lib/python3.6/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/anaconda3/envs/torch/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/anaconda3/envs/torch/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Process Process-50:
Process Process-49:
Traceback (most recent call last):
Process Process-52:
Traceback (most recent call last)