In [None]:
import gym
import sys
import os
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import itertools
import shutil
import threading
import multiprocessing
from tensorflow.keras.layers import Input,Dense,Conv2D,Flatten
from tensorflow.keras import Model
import tensorflow_probability as tfp
from datetime import datetime as time


In [None]:
IM_SIZE = 84
ENV_NAME = 'Breakout-v0'
MAX_GLOBAL_STEPS = 5e6
STEPS_PER_UPDATE = 5


<h2>frame processing</h2>

In [None]:
class ImageTransformer:
    # transfrom image from 210,160,3
    # to 84,84
    def transform(self,state):
        with tf.device('/cpu:0'):
            state = tf.image.rgb_to_grayscale(state)
            state = tf.image.crop_to_bounding_box(state,34, 0, 160, 160)
            state = tf.image.resize(state,[IM_SIZE, IM_SIZE],method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
            state = tf.squeeze(state)
            return state

In [None]:
def shift_frames(state, obs_small):
    # throw oldest frame
    # append newest to end
    state = tf.convert_to_tensor(state,dtype=tf.uint8)
    obs_small = tf.convert_to_tensor(obs_small,dtype=tf.uint8)
    return shift_frames_(state, obs_small)


@tf.function
def shift_frames_(state, obs_small):
    # throw oldest frame
    # append newest to end
    return tf.concat((state[:,:,1:],tf.expand_dims(obs_small,2)),axis=2)


In [None]:
def repeat_frame(frame):
    return np.stack([frame]*4,axis=2)

In [None]:
class step:
    def __init__(self,state,action,reward,next_state,done):
        self.state = state
        self.action = action
        self.reward = reward
        self.next_state = next_state
        self.done = done

<h2>Policy & Value Models</h2>

In [None]:
# feature extractor shared between value & policy models
def create_body():
    input_ = Input(shape=(IM_SIZE,IM_SIZE,4))
    x = input_/255.0
    x = Conv2D(16,8,4,activation='relu')(x)
    x = Conv2D(32,4,2,activation='relu')(x)
    x = Flatten()(x)
    # this acts as body for both value & policy model
    # each still to add its final dense layer
    output = Dense(256,activation='relu')(x)
    body = Model(inputs=input_,outputs = output)
    return body


In [None]:
def create_policy_net(body):
    input_ = Input(shape=(IM_SIZE,IM_SIZE,4))
    x = body(input_)
    logits = Dense(NUM_ACTIONS, activation=None)(x) # for loss
    probs = tf.nn.softmax(logits) # for sampling actions
    cdist = tfp.distributions.Categorical(logits=logits)
    action = cdist.sample()
    policy_net = Model(inputs=input_,outputs=[action,probs])
    return policy_net


In [None]:
def create_value_net(body):
    input_ = Input(shape=(IM_SIZE,IM_SIZE,4))
    x = body(input_)
    yhat = Dense(1,activation=None)(x)
    yhat = tf.squeeze(yhat)
    value_net = Model(inputs=input_,outputs=yhat)
    return value_net


In [None]:
# since both value and policy nets share same body
# create them together in one function
def create_networks():
    body = create_body()
    policy_net = create_policy_net(body)
    value_net = create_value_net(body)
    return policy_net,value_net



In [None]:
def copy_weights(from_,to):
    weights = [w.numpy() for w in from_.weights]
    to.set_weights(weights)

<h2>Workers</h2>

In [None]:
class Worker:
    def __init__(self,name,env,policy_net,value_net,global_counter,returns_list,discount_factor=0.99,max_global_steps=None):
        self.name = name
        self.env = env
        self.global_policy_net = policy_net
        self.global_value_net = value_net
        self.global_counter = global_counter
        self.discount_factor = discount_factor
        self.max_global_steps = max_global_steps
        self.global_step = tf.compat.v1.train.get_global_step()
        self.img_transformer = ImageTransformer()
        
        # create local policy & value models
        self.local_policy_net , self.local_value_net = create_networks()
        self.state = None
        self.total_reward = 0
        self.returns_list = returns_list # global returns list shared amongst all workers
        self.p_opt = tf.keras.optimizers.RMSprop(0.00025, 0.99, 0.0, 1e-6)
        self.v_opt = tf.keras.optimizers.RMSprop(0.00025, 0.99, 0.0, 1e-6)

    def run(self,coor,t_max):
        self.state = repeat_frame(self.img_transformer.transform(self.env.reset()))
        try:
            while not coord.should_stop():
                # copy weights from global networks to local networks
                copy_weights(self.global_policy_net,self.local_policy_net)
                copy_weights(self.global_value_net,self.local_value_net)
                

                # collect esxperience
                steps , global_step = self.run_n_steps(t_max)
                
                # stop once max number of global steps has been reached
                if self.max_global_steps is not None and global_step >= self.max_global_steps:
                    coord.request_stop()
                    return
                # update global network using local gradient
                self.update(steps)
                
        except tf.errors.CancelledError:
            return
    @tf.function
    def p_train_step(self,inputs,actions,advantages):
        with tf.GradientTape() as tape:
            probs = self.local_policy_net(inputs, training=True)[1]
            # calculate loss
            entropy = -tf.reduce_sum(probs*tf.math.log(probs),axis=1)
            selected_action_probs = tf.reduce_sum(probs * tf.one_hot(actions, NUM_ACTIONS),axis=1)
            loss_value = tf.math.log(selected_action_probs) * advantages + 0.01 * entropy
            loss_value = -tf.reduce_sum(loss_value)
        grads = tape.gradient(loss_value, self.local_policy_net.trainable_weights)
        clipped_grads, _ = tf.clip_by_global_norm(grads, 5.0)
        self.p_opt.apply_gradients(zip(clipped_grads, self.global_policy_net.trainable_weights))
        return loss_value


    @tf.function
    def v_train_step(self,inputs,targets):
        with tf.GradientTape() as tape:
            logits = self.local_value_net(inputs, training=True)
            # calculate loss
            loss_value = tf.reduce_sum(tf.square(targets - logits))
        grads = tape.gradient(loss_value, self.local_value_net.trainable_weights)
        clipped_grads, _ = tf.clip_by_global_norm(grads, 5.0)
        self.v_opt.apply_gradients(zip(clipped_grads, self.global_value_net.trainable_weights))
        return loss_value

    @tf.function
    def sample_action(self,state):
        state = tf.expand_dims(state,axis=0)
        action = self.local_policy_net(state)[0][0]
        return action
    
    @tf.function
    def get_value_prediction(self,state):
        state = tf.expand_dims(state,axis=0)
        yhat = self.local_value_net(state)
        return yhat
    
    def run_n_steps(self,n):
        steps = []
        for _ in range(n):
            action = self.sample_action(self.state)
            next_frame,reward,done,_ = self.env.step(action)
            next_state = shift_frames(self.state, self.img_transformer.transform(next_frame))
            if done:
                print("Total reward:", self.total_reward, "Worker:", self.name)
                self.returns_list.append(self.total_reward)
                if len(self.returns_list) > 0 and len(self.returns_list) % 100 == 0:
                    print("*** Total average reward (last 100):", np.mean(self.returns_list[-100:]), "Collected so far:", len(self.returns_list))
                self.total_reward = 0.
            else:
                self.total_reward += reward

            # Save step
            state = step(self.state, action, reward, next_state, done)
            steps.append(state)

            # Increase local and global counters
            global_step = next(self.global_counter)
            
            if done:
                self.state = repeat_frame(self.img_transformer.transform(self.env.reset()))
                break
            else:
                self.state = next_state
        return steps, global_step

    
    def update(self,steps):
        reward = 0.0
        # ex: G(s3) = r3 + V(s4)
        #     G(s2) = r2 + r4 + V(s4)
        if not steps[-1].done: # if done r = 0 , otherwise v
            reward = self.get_value_prediction(steps[-1].next_state) # this stand for V(s4) in example
        states = []
        advantages = []
        value_targets = []
        actions = []
        
        for step in reversed(steps):
            reward = step.reward + self.discount_factor * reward
            advantage = reward - self.get_value_prediction(step.state)
            states.append(step.state)
            actions.append(step.action)
            advantages.append(advantage)
            value_targets.append(reward)
            
       # now update global nets using gradients produced by local nets
        
        states = tf.convert_to_tensor(states,dtype=tf.uint8)
        value_targets = tf.convert_to_tensor(value_targets,dtype=tf.float32)
        actions = tf.convert_to_tensor(actions,dtype=tf.int32)
        advantages = tf.stack(advantages,axis=0)
        pnet_loss = self.p_train_step(states,actions,advantages)
        vnet_loss = self.v_train_step(states,value_targets)
        
        return pnet_loss,vnet_loss    

<h2>Training</h2>

In [None]:
def Env():
    return gym.envs.make(ENV_NAME)

In [None]:
if ENV_NAME in ['Ping-v0','Breakout-v0']:
    NUM_ACTIONS = 4 # env.action_space.n return a bigger number
else:
    env = Env()
    NUM_ACTIONS = env.action_space.n
    env.close()


In [None]:
def running_avg(totalrewards):
    # average results over 100 episodes
    N = len(totalrewards)
    running_avg = np.empty(N)
    for t in range(N):
        running_avg[t] = totalrewards[max(0,t-100):(t+1)].mean()
    return running_avg


In [None]:
NUM_WORKERS = multiprocessing.cpu_count()

In [None]:
with tf.device('/cpu:0'):
    policy_net,value_net = create_networks()
    global_counter = itertools.count()
    returns_list = []
    workers = []
    for worker_id in range(NUM_WORKERS):
        worker = Worker(
        name='worker_{}'.format(worker_id),
        env = Env(),
            policy_net = policy_net,
            value_net = value_net,
            global_counter = global_counter,
            returns_list = returns_list,
            discount_factor = 0.99,
            max_global_steps = MAX_GLOBAL_STEPS
        )
        workers.append(worker)
    
    coord = tf.train.Coordinator()
    worker_threads = []
    for worker in workers:
        worker_fn = lambda: worker.run(coord,STEPS_PER_UPDATE)
        t = threading.Thread(target=worker_fn)
        t.start()
        worker_threads.append(t)
    # wait for all workers to finish
    coord.join(worker_threads,stop_grace_period_secs=300)
    
    x = np.array(returns_list)
    y =  running_avg(x)
    plt.plot(x,label='orig')
    plt.plot(y,label='running_avg')
    plt.legend()
    plt.show()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Total reward: 3.0 Worker: worker_1
Total reward: 2.0 Worker: worker_0
Total reward: 3.0 Worker: worker_1
Total reward: 2.0 Worker: worker_0
Total reward: 0.0 Worker: worker_1
Total reward: 0.0 Worker: worker_1
Total reward: 1.0 Worker: worker_0
Total reward: 2.0 Worker: worker_1
Total reward: 4.0 Worker: worker_0
Total reward: 2.0 Worker: worker_1
Total reward: 3.0 Worker: worker_0
Total reward: 3.0 Worker: worker_1
Total reward: 2.0 Worker: worker_0
Total reward: 2.0 Worker: worker_1
Total reward: 0.0 Worker: worker_0
Total reward: 2.0 Worker: worker_1
Total reward: 2.0 Worker: worker_0
Total reward: 2.0 Worker: worker_0
Total reward: 4.0 Worker: worker_1
Total reward: 0.0 Worker: worker_0
Total reward: 2.0 Worker: worker_1
Total reward: 3.0 Worker: worker_0
Total reward: 0.0 Worker: worker_0
Total reward: 5.0 Worker: worker_1
Total reward: 3.0 Worker: worker_0
Total reward: 4.0 Worker: worker_1
Total reward: 2.0 Worker: