In [1]:
import gym
import numpy as np

In [2]:
env = gym.make('CartPole-v1')
obs = env.reset()

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [3]:
## Policy Gradient

In [4]:
# initilize W and b
n_input = 4
n_hidden = 8
n_output = env.action_space.n
W1 = np.random.randn(n_input, n_hidden)*0.1
b1 = np.ones([n_hidden])
W2 = np.random.randn(n_hidden,n_output)*0.1
b2 = np.zeros([n_output])

In [5]:
def training_step(obs):
    N, D = obs.shape
    h = obs.dot(W1) + b1 
    h[h<0] = 0 
    out_linear = h.dot(W2) + b2 
    exp_scores = np.exp(out_linear)
    probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True) 
    action = np.random.choice([x for x in range(n_output)], 1, p=probs[-1])

    grads={}
    dscores = probs.copy()
    dscores[range(N), list(action)] -= 1
    dscores /= N
    grads['W2'] = h.T.dot(dscores) 
    grads['b2'] = np.sum(dscores, axis = 0)

    dh = dscores.dot( W2.T)
    dh_ReLu = (h > 0) * dh
    grads['W1'] = obs.T.dot(dh_ReLu) 
    grads['b1'] = np.sum(dh_ReLu, axis = 0)
    return grads, action

In [6]:
def get_action(obs):
    obs = obs.reshape([1,n_input])
    h = obs.dot(W1) + b1 
    h[h<0] = 0 
    out_linear = h.dot(W2) + b2 
    exp_scores = np.exp(out_linear)
    probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True) 
    return np.random.choice([x for x in range(n_output)], 1, p=probs[-1])[0]

In [7]:
def discount(r, gamma = 0.7):
    discounted = np.zeros_like(r)
    running_add = 0
    for t in reversed(range(0, r.size)):
        running_add = running_add*gamma + r[t]
        discounted[t] = running_add
    return discounted

In [8]:
n_iter = 10000
n_game_per_iter = 100
n_action_per_game = 200
learning_rate = 1e-3
for itern in range(n_iter):
    update_grads = {}
    update_grads['W1'] = np.zeros_like(W1)
    update_grads['b1'] = np.zeros_like(b1)
    update_grads['W2'] = np.zeros_like(W2)
    update_grads['b2'] = np.zeros_like(b2)
    mean_reward = 0
    all_gradients = []
    for game in range(n_game_per_iter):
        obs = env.reset()
        current_rewards = []
        current_gradients = []
        total_reward = 0
        for step in range(n_action_per_game):
            obs = obs.reshape([1,n_input])
            grads, action = training_step(obs)
            obs, reward, done, info = env.step(action[0])
            current_rewards.append(reward)
            current_gradients.append(grads)
            total_reward+=reward
            if done:
                break
                
        mean_reward+=total_reward
        current_rewards = np.array(current_rewards)
        current_gradients = np.array(current_gradients)
        discounted_rewards = discount(current_rewards)
        discounted_rewards -= np.mean(discounted_rewards)
        discounted_rewards /= np.std(discounted_rewards)
        
        for i in range(current_gradients.shape[0]):
            for _, n in enumerate(current_gradients[i]):
                current_gradients[i][n] = current_gradients[i][n]*discounted_rewards[i]
                all_gradients.append(current_gradients[i])
                
    for i in range(len(all_gradients)):
        for _, n in enumerate(all_gradients[i]):
            update_grads[n]+=all_gradients[i][n]
    for _, n in enumerate(update_grads):
        update_grads[n]/= len(all_gradients)
    W1 -= learning_rate*update_grads['W1']
    b1 -= learning_rate*update_grads['b1']
    W2 -= learning_rate*update_grads['W2']
    b2 -= learning_rate*update_grads['b2']
    
    print('\riteration %d / %d: Mean Score %f'% (itern, n_iter, mean_reward/n_game_per_iter), end = "")

    

iteration 267 / 10000: Mean Score 23.680000

KeyboardInterrupt: 

In [None]:
# Render the env

n_test = 10
for i in range(n_test):
    obs = env.reset()
    total_reward = 0
    while True:
        env.render()
        action = get_action(obs)
        obs, reward, done, info = env.step(action)
        total_reward+=reward
        if done :
            break
    print("Game %d, Total Reward %f"%(i+1, total_reward))
        