In [40]:
import tensorflow as tf
import tensorflow_probability as tfp
import pandas as pd
import numpy as np
import gym

env = gym.make('CartPole-v0')
env._max_episode_steps=200

In [41]:
class Agent(tf.keras.Model):
    
    def __init__(self,env):
      super(Agent, self).__init__()
      self.env=env
      self.MAX_TRAJ=200
      self.space_size=env.observation_space.shape
      self.action_size=env.action_space.n
      self.learning_rate=0.005
      self.gamma=0.999
      
      self.optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
      
      self.layer1=tf.keras.layers.Dense(128,"relu",input_shape=self.space_size)
      self.layer2=tf.keras.layers.Dense(128,"relu")
      self.layer3=tf.keras.layers.Dense(128,"relu")
      self.layer4=tf.keras.layers.Dense(self.action_size,"softmax")

    def call(self,state):
        net=self.layer1(state)
        net=self.layer2(net)
        net=self.layer3(net)
        net=self.layer4(net)
        return net
    
    def action(self,state):
        s=state.reshape(1,-1)
        net=self.call(s)
        action=tfp.distributions.Categorical(probs=net).sample()
        return action.numpy()[0]
      
    def get_log_prob(self,state,action):
        net=self.call(state)
        dist=tfp.distributions.Categorical(probs=net)
        return dist.log_prob(action)
      
    def get_return_G(self,rewards):
        G=[]
        for i in range(len(rewards)):
            ret=0
            p=0
            for j in rewards[i:]:
                ret+= self.gamma**p*j
                p+=1
            G.append([ret])
        return G

    def get_trajectory(self):
        states=[]
        actions=[]
        rewards=[]
        
        done=False
        curr_state=env.reset()
        
        while not done:
            act=self.action(curr_state)
            next_state,reward,done,_=env.step(act)
            states.append(curr_state)
            actions.append([act])
            rewards.append(reward)
            curr_state=next_state
        return states,actions,rewards

    def get_full_experience(self):
        all_states=[]
        all_actions=[]
        all_G=[]
        current_value=0
        for i in range(self.MAX_TRAJ):
            states,actions,rewards=self.get_trajectory()
            G=self.get_return_G(rewards)
            all_states.extend(states)
            all_actions.extend(actions)
            all_G.extend(G)
            current_value+=sum(rewards)
        current_value/=self.MAX_TRAJ
        return np.array(all_states),np.array(all_actions),np.array(all_G),current_value



In [42]:
class Base_line(tf.keras.Model):

    def __init__(self,env):
      super(Base_line , self).__init__()
      self.env=env
      self.space_size=env.observation_space.shape
      self.action_size=env.action_space.n
      self.base_line_optimizer=tf.keras.optimizers.Adam()
      
      self.layer1=tf.keras.layers.Dense(128,"relu",input_shape=self.space_size)
      self.layer2=tf.keras.layers.Dense(128,"relu")
      self.layer3=tf.keras.layers.Dense(128,"relu")
      self.layer4=tf.keras.layers.Dense(self.action_size,"softmax")

    def call(self,s):
        net=self.layer1(s)
        net=self.layer2(net)
        net=self.layer3(net)
        net=self.layer4(net)
        return net

In [43]:
class Vanilla_Policy_Gradient(tf.keras.Model):
    
    def __init__(self,env):
        super(Vanilla_Policy_Gradient, self).__init__()
        self.env=env
        self.Agent=Agent(env)
        self.Base_line=Base_line(env)
        self.max_reward=200

        self.Base_line.compile(loss='mse',optimizer='adam')

    def loss(self,states,actions,Advantage):
        m=states.shape[0]
        log_prob=self.Agent.get_log_prob(states,actions)
        return -1* tf.reduce_sum(tf.multiply(log_prob,Advantage))/m
    
    def train(self):
      max_value=self.max_reward 
      curr_value=0

      counter=1
      while curr_value < max_value:
            states,actions,returns,curr_value=self.Agent.get_full_experience()

            print(f"At iteration {counter} value_function: {curr_value}")

            pred=self.Base_line.predict(states)
            G=[]
            for i in range(len(pred)):
              G.append([pred[i][actions[i][0]]])

            G_n=np.array(G)
            Adv=returns-(G_n)
 
            Adv=(Adv-Adv.mean())/Adv.std()

            self.Base_line.fit(states,returns,epochs=100,verbose=0)

            with tf.GradientTape() as t:
                J=self.loss(states,actions,Adv)
            
            grads=t.gradient(J,self.Agent.trainable_variables)
            self.Agent.optimizer.apply_gradients(zip(grads,self.Agent.trainable_variables))
            
            counter+=1
            


In [44]:
Agent=Vanilla_Policy_Gradient(env)

In [None]:
Agent.train()