In [1]:
import gym
import matplotlib as plt
import tensorflow as tf
import numpy as np
from matplotlib import pyplot as plt
import time

In [2]:
class simpleLayer(tf.keras.layers.Layer):
    def __init__(self,dense_units=7,last_layer=False):
        super(simpleLayer, self).__init__()
        self.dense_units = dense_units
        self.batchnorm = tf.keras.layers.BatchNormalization(axis= -1) # channell first
        self.last = last_layer
        if not self.last:
            self.dropout = tf.keras.layers.Dropout(0.3)
    
    def build(self,input_shape):
        self.dense = tf.keras.layers.Dense(self.dense_units,input_shape = input_shape)
        
        
    def call(self, input):
        out = self.dense(input)
        #out = self.batchnorm(out)
        if not self.last:
            out = self.dropout(out)
        return out

In [3]:
import random
def shuffle(list1, list2):
    temp = zip(list1, list2)
    np.random.shuffle(temp)
    a, b = zip(*temp)
    return a ,b

In [4]:
class actorNet(tf.keras.Model):
    def __init__(self):
        super(actorNet, self).__init__()
        self.layer1 = simpleLayer()
        self.layer2 = simpleLayer()
        self.layer3 = simpleLayer(last_layer=True)
        self.dense = tf.keras.layers.Dense(2, activation = self.myActivation)
    
    def call(self, input):
        x = self.layer1(input)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.dense(x)
        return x
    
    def myActivation(self,x):
        return tf.keras.activations.softmax(x,axis=-1)
    
    
    def ppoLoss(self, y_true, y_pred, advantage, old_prediction):
        ENTROPY_LOSS = 5e-3
        LOSS_CLIPPING = 0.2
        prob = tf.keras.backend.sum(y_true * y_pred, axis=-1)
        prob = tf.dtypes.cast(prob, tf.float32)
        old_prob = tf.keras.backend.sum(y_true * old_prediction, axis=-1)
        old_prob = tf.dtypes.cast(old_prob, tf.float32)
        r = prob/(old_prob + 1e-10)
        return -tf.keras.backend.mean(tf.keras.backend.minimum(r * advantage, tf.keras.backend.clip(r, min_value=1 - LOSS_CLIPPING, max_value=1 + LOSS_CLIPPING) * advantage) + ENTROPY_LOSS * -(prob * tf.keras.backend.log(prob + 1e-10)))
        
    
    
    def learn(self, observe, action, advantage, old_prediction, epochs, batch, shuffle):
        self.optimaizer = tf.keras.optimizers.SGD(learning_rate=0.01)
        for epoch in range(epochs):
            batch_observe = observe[batch*epoch:batch*(epoch+1)]
            batch_action = action[batch*epoch:batch*(epoch+1)]
            if shuffle is True:
                batch_observe, batch_action = shuffle(batch_observe, batch_action)
            print("Start of epoch %d" % (epoch,))
            for step, (obs_batch_train, act_batch_train) in enumerate(zip(batch_observe,batch_action)):
                with tf.GradientTape() as tape:
                    print(obs_batch_train.shape)
                    pred_action = self.call(obs_batch_train)
                    loss = self.ppoLoss(act_batch_train,pred_action,advantage, old_prediction )
                    

                print("the model loss is :%d" % loss)
                grads = tape.gradient(loss, self.trainable_weights)
                self.optimaizer.apply_gradients(zip(grads, self.trainable_weights))


In [5]:
class criticNet(tf.keras.Model):
    def __init__(self):
        super(criticNet, self).__init__()
        self.layer1 = simpleLayer()
        self.layer2 = simpleLayer()
        self.layer3 = simpleLayer(last_layer=True)
        self.dense = tf.keras.layers.Dense(1, activation = self.myActivation)
    def call(self, input):
        x = self.layer1(input)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.dense(x)
        return x
    def myActivation(self,x):
        return tf.keras.activations.linear(x)
    
    def learn(self, observe, reward, epochs, batch ,shuffle):
        
        self.optimaizer = tf.keras.optimizers.SGD(learning_rate=0.01)
        for i in range(epochs):
            batch_observe = observe[batch*i:batch*(i+1)]
            batch_reward = reward[batch*i:batch*(i+1)]
            if shuffle is True:
                batch_observe, batch_reward = shuffle(batch_observe, batch_reward)
            for j, (batch_observe_train, batch_reward_train) in enumerate(zip(batch_observe,batch_reward)):
                with tf.GradientTape() as tape:
                    print(batch_observe_train.shape)
                    pred_reward = self.call(batch_observe_train)
                    loss = tf.keras.losses.MSE(batch_reward_train, pred_reward)
                print("the model loss is :%d" % loss)
                grads = tape.gradient(loss, self.trainable_weights)
                self.optimaizer.apply_gradients(zip(grads, self.trainable_weights))
                

In [6]:
class Gamerecord():
    def __init__(self):
        self.observations = []
        self.actions = []
        self.actionPredictions = []
        self.reward_steps = []
        self.reward_total = []
        self.first_epoch_index = 0
    
    def add_reward(self, reward,done, gamma=0.9):
        self.reward_steps.append(reward)
        self.reward_total.append(reward)
        list_size = len(self.reward_total)
        if done is True:
            self.first_epoch_index = list_size - 1
        power = 1
        for i in range(list_size-2, self.first_epoch_index-1, -1):
            self.reward_total[i] += (gamma**power)*reward
            power += 1
            
    def reset(self):
        self.observations = []
        self.actions = []
        self.actionPredictions = []
        self.reward_steps = []
        self.reward_total = []
            

In [None]:
BUFFER_SIZE = 256
EPISODES = 10
EPOCHS = 10
BATCH_SIZE = 256

class Agent():
    def __init__(self):
        self.actionNet = actorNet()
        self.criticalNet = criticNet()
        self.env = gym.make('CartPole-v0')
        self.episode = 0
        self.observation = self.env.reset()
        self.records = Gamerecord()
        self.gradient_steps = 0

        
    def reset_env(self):
        self.episode += 1
        self.observation = self.env.reset()
        
    def choose_action(self):
        p = self.actionNet.predict(self.observation.reshape(1, 4))
        action = np.random.choice(2, p=np.nan_to_num(p[0]))
        action_matrix = np.zeros(2)
        action_matrix[action] = 1
        return action, action_matrix, p   
    
    
    
    
    
    def get_experiance(self):
        
        while len(self.records.observations) < BUFFER_SIZE:
          
            print("processing :", len(self.records.observations),"\n")
            action, action_matrix, predicted_action = self.choose_action()
            observation, reward, done, info = self.env.step(action)
            
            self.records.observations.append(self.observation)
            self.records.actions.append(action_matrix)
            self.records.actionPredictions.append(predicted_action)
            self.records.add_reward(reward, done)
            
            self.observation = observation

            if done:
                self.reset_env()

        obs, action, pred, reward = np.array(self.records.observations).reshape(-1,1,4), np.array(self.records.actions), np.array(self.records.actionPredictions).reshape(-1,1,2), np.array(self.records.reward_total).reshape(-1,1,1)
        return obs, action, pred, reward
    
    def learn(self):
        
        while self.episode < EPISODES:
            obs, action, pred, reward = self.get_experiance()
            obs, action, pred, reward = obs[:BUFFER_SIZE], action[:BUFFER_SIZE], pred[:BUFFER_SIZE], reward[:BUFFER_SIZE]
            old_prediction = pred
            
            pred_values = self.criticalNet.predict(obs)
            advantage = reward - pred_values
            
            self.actionNet.learn(obs, action, advantage, old_prediction, EPOCHS, BATCH_SIZE, shuffle=False)
            self.criticalNet.learn(obs, reward, EPOCHS, BATCH_SIZE, shuffle=False)

            self.gradient_steps += 1
    


In [None]:
if __name__ == '__main__':
    ag = Agent()
    ag.learn()