In [11]:
import gym
import os
import sys
import tensorflow as tf
import matplotlib.pyplot as plt
from gym import wrappers
from datetime import datetime


In [24]:
class HiddenLayer():
    def __init__(self, M1, M2, f=tf.nn.tanh,use_bias=True):
        self.W = tf.Variable(tf.random.normal(shape=(M1, M2)))#fully connected wiegths
        self.use_bias = use_bias
        if use_bias:
            self.b = tf.Variable(np.zeros(M2).astype(np.float32))
            
        self.f = f
        
    def forward(self, X):
        if self.use_bias:
            a = tf.matmul(X,self.W) + self.b
        else :
            a = tf.matmul(X,self.W)
            
        return self.f(a)
    
    

In [None]:
class PolicyModel():#approximates pi(a|s)
    def __init__(self,D, K,hidden_layer_sizes):
        #K is number of actions
        self.layers=[]
        M1 = D
        for M2 in hidden_layer_sizes:
            layer = HiddenLayer(M1,M2)
            self.layers.append(layer)
            M1 = M2
            
        #final layer
        layer = HiddenLayer(M1,K, tf.nn.softmax, use_bias=False)
        self.layers.append(layer)
        
        #input and target
        self.X = tf.compat.v1.placeholder(tf.float32, shape=(None, D), name='X')
        self.actions = tf.compat.v1.placeholder(tf.int32, shape=(None), name='actions')
        self.advantages = tf.compat.v1.placeholder(tf.float32, shape=(None,), name='advantages')
        
        #output
        Z=self.X
        for layer in self.layers:
            Z= layer.forward(Z)
        p_a_given_s = Z
        self.predict_op = p_a_given_s
        
        selected_probs = tf.math.log(tf.reduce_sum(p_a_given_s* tf.one_hot(self.actions,K)))
        
        cost = -tf.reduce_sum(self.advantages*selected_probs)
        
        self.train_op = tf.train.adagradeOptimizer(10e-2).minimize(cost)
        
        def set_session(self,session):
            self.session = session
            
        def partial_fit(self,X, actions, advantages):
            X = np.atleast_2d(X)
            actions = np.atleast_1d(actions)
            advantages = np.atleast_1d(advantages)
            
            self.session.run(
            self.train_op,
            feed_dict={
                self.X:X,
                self.actions: actions,
                self.advantages : advantages,
            })
            
        def predict(self,X):
            X = np.atleast_2d(X)
            return self.session.run(self.predict_op, feed_dict={self.X:X})
        
        def sample_action(self,X):
            p = self.predict(X)[0]
            return np.random.choice(len(p),p = p )
        

In [None]:
class valueModel:
    def __init__(self, D, hidden_layer_sizes):
        self.layers =[]
        
        M1 = D
        for M2 in hidden_layer_sizes:
            layer = HiddenLayer(M1,M2)
            self.layers.append(layer)
            M1=M2
        layer = HiddenLayer(M1,1,lambda x:x)
        self.layers.append(layer)
        
        self.X = tf.compat.v1.placeholder(tf.float32, shape=(None, D), name='X')
        self.Y = tf.compat.v1.placeholder(tf.float32, shape=(None,), name='Y')
        
        Z = self.X
        for layer in self.layers:
            Z = layer.forward(Z)
        y_hat = tf.reshape(Z,[-1])
        
        self.predict_op = y_hat
        
        cost = tf.reduce_sum(tf.square(self.Y - self.Y_hat))
        
        self.train_op = tf.train.GradientDescentOptimizer(10e-5).minimize(cost)
        
    def set_session(self,session):
        self.session = session

    def partial_fit(self,X,Y):
        X = np.atleast_2d(X)
        Y = np.atleast_1d(Y)
        self.session.run(self.train_op,feed_dict={self.X:X,self.Y:Y})

    def predict(self,X):
            X = np.atleast_2d(X)
            return self.session.run(self.predict_op, feed_dict={self.X:X})
        
        

In [None]:
def paly_one_mc(env, pmodel, vmodel, gamma):
    observation = env.reset()
    done =False
    totalreward = 0
    iters = 0
    states =[]
    actions = []
    rewards = []
    
    while not done and iters <2000:
        action = pmodel.sample_action(observation)
        prev_observation = observation
        observation, reward,done,info = env.setup(action)
        
        if done:
            reward = -200
            
        state.append(prev_observation)
        actions.append(action)
        rewards.append(reward)
        
        if reward == 1:
            totalreward +=reward
        iters +=1
        
    returns =[]
    advantages = []
    G =0 
    for s,r in zip(reversed(state),reversed(rewards)):
        returns.append(G)
        advantages.append(G-vmodel.predict(s)[0])
        G =r +gamma(G)
    returns.reverse()
    advantages.reverse()
    
    pmodel.partial_fit(states,actions,advantages)
    vmodel.partial_fit(states,returns)
    
    return totalreward

In [None]:
def main():
    env = gym.make('CartPole-v0')
    D = env.observation_space.shape[0]
    K = env.action_space.n
    pmodel = PolicyModel(D, K, [])
    vmodel = ValueModel(D, [10])
    init = tf.global_variables_initializer()
    session = tf.InteractiveSession()
    session.run(init)
    pmodel.set_session(session)
    vmodel.set_session(session)
    gamma = 0.99


    N = 1000
    totalrewards = np.empty(N)
    costs = np.empty(N)
    for n in range(N):
        totalreward = play_one_mc(env, pmodel, vmodel, gamma)
        totalrewards[n] = totalreward
        if n % 100 == 0:
            print("episode:", n, "total reward:", totalreward, "avg reward (last 100):", totalrewards[max(0, n - 100): (n + 1)].mean())

    print("avg reward for last 100 episodes:", totalrewards[-100: ].mean())
    print("total steps:", totalrewards.sum())

    plt.plot(totalrewards)
    plt.title("Rewards")
    plt.show()




In [None]:

if __name__ == '__main__':
    tf.compat.v1.disable_eager_execution()
    main()