# The Cart Pole Problem
The goal of this project is to balance a pole on a cart which you can move back and forth.
There are two possible actions, add force to left and add force to right. Scoring takes place for how long you keep the pole in the air

I am going to attempt to implement (from scratch) a simple 1 fully connected hidden layer ANN with N nodes and 2 outputs

In [1]:
import tensorflow as tf
import numpy as np
import gym
import matplotlib.pyplot as plt
%matplotlib inline

### Step 1 
Define an "Agent" which has .run(), .update(), .reset() In this case the agent will be implemented in numpy for the practice

In [98]:
class agent():
    def __init__(self, e_size, h_size, a_size, lr=0.01, gamma=0.99):
        # Record Agent Parameters
        self.e_size = e_size
        self.h_size = h_size
        self.a_size = a_size
        
        self.lr = lr
        self.gamma = gamma
        
        # Initialize weights and biases
        self.hidden_weights = np.random.standard_normal((e_size, h_size))
        self.hidden_bias = np.zeros((1, h_size))
        
        self.action_weights = np.random.standard_normal((h_size, a_size))
        self.action_bias = np.zeros((1, a_size))
        
        # Initialize Replay history
        self.history = []
    
    def reset(self):
        self.history = []
        
    def activation(self, x):
        return x
    
    def activation_derivative(self, x):
        return 1
    
    def discountedReward(self, rewards):
        for i in xrange(rewards.shape[0]-2, -1, -1):
            rewards[i] = rewards[i] + self.gamma*rewards[i+1]
        return rewards
    
    def record(self, s, a, r):
        # Ensuring state s is a proper row vector
        if (len(s.shape)==1):
            s = s[None, :]
            
        self.history.append([s, a, r])
    
    def forward_pass(self, s):
        # Implement the forward pass returning the various intermediate results
        
        # Check the observation has the correct number of dimensions
        assert (len(s.shape) <= 2 and len(s.shape) != 0)
        # If it has one dimension turn into a row vector
        if (len(s.shape)==1):
            s = s[None,:]
            
        # If it has 2 dimensions ensure it is a vector not a matrix
        assert (s.shape[0] == 1 or s.shape[1] == 1)
        # Convert to row matrix if its a column
        if (s.shape[0] !=1):
            s = s.T
            
        # Input Checking complete. Run forward pass
        hidden_input = np.dot(s, self.hidden_weights) + self.hidden_bias
        hidden_output = self.activation(hidden_input)
        
        action_input = np.dot(hidden_output, self.action_weights) + self.action_bias
        action_output = self.activation(action_input)
        
        return [hidden_input, hidden_output, action_input, action_output]
    
    def update(self):
        # Function to use back propagation with the history to improve weights
        
        # Input checking
        assert len(self.history) > 0
        
        # Applying discounted reward to this particular replay
        history = np.array(self.history)
        history[:,2] = self.discountedReward(history[:,2])
        
        # Prepare for gradients
        HWChange = np.zeros(self.hidden_weights.shape)
        HBChange = np.zeros(self.hidden_bias.shape)
        
        AWChange = np.zeros(self.action_weights.shape)
        ABChange = np.zeros(self.action_bias.shape)
        for i in xrange(history.shape[0]):
            # For each step of history:
                # Apply forward pass
                # 
            state = history[i, 0]
            action = history[i, 1]
            reward = history[i, 2]
            
#             print ((state, action, reward))
            
            (HI, H, A, yhat) = self.forward_pass(state)
#             print(HI)
#             print(H)
#             print(A)
#             print("++++++++++++++++++++++")
#             print (yhat)
#             print (reward)
#             print (-1*(reward-yhat[0,action]))
#             print ("--------------")
            
            main_error_term = np.zeros((1, self.a_size))
            main_error_term[0, action] = -1*(reward-yhat[0,action])
            
            main_error_term = main_error_term*self.activation_derivative(A)
            
            hidden_error_term = np.dot(main_error_term, self.action_weights.T)
            hidden_error_term = hidden_error_term*self.activation_derivative(HI)
            
#             print("\n")
#             print (main_error_term.shape)
#             print (main_error_term)
#             print (hidden_error_term.shape)
#             print (hidden_error_term)
#             print ("_________________________")
#             print (np.dot(state.T, hidden_error_term))
#             print (self.hidden_weights)
#             print ("\n")
            
            AWChange += np.dot(H.T, main_error_term)
            ABChange += main_error_term*1
            
            HWChange += np.dot(state.T, hidden_error_term)
            HBChange += hidden_error_term*1
        
        self.hidden_weights -= self.lr*HWChange/history.shape[0]
        self.hidden_bias -= self.lr*HBChange/history.shape[0]
        
        self.action_weights -= self.lr*AWChange/history.shape[0]
        self.action_bias -= self.lr*ABChange/history.shape[0]
        
        # Delete history
        self.history = []
                
    def predict(self, s):
        # Return the Q values prdicted for input state S
        return self.forward_pass(s)[-1]
    
    def choose_action(self, s):
        # Use the prediction to make a decision
        return np.argmax(self.predict(s))
            
        

### Test Agent
Implement tests for the back-propagation

In [99]:
e_size = 2
h_size = 3
a_size = 2

init_h_weights = np.array([[0.1, 0.5, 0.2], [0.3, -0.1, 0.6]])
init_h_bias = np.zeros((1,3))
init_a_weights = np.array([[-0.1, 0.1], [0.4, 0.3], [0.2, 0]])
init_a_bias = np.zeros((1,2))

exp_h_weights = np.array([[0.146, 0.638, 0.2],[0.392, 0.176, 0.6]])
exp_h_bias = np.array([0.0092, 0.0276, 0])
exp_a_weights = np.array([[-0.1, 0.422],[0.4, 0.438],[0.2, 0.644]])
exp_a_bias = np.array([0, 0.092])

state = np.array([5, 10])
action = 1
reward = 10

test_agent = agent(e_size, h_size, a_size, lr=0.01, gamma=0.99)

test_agent.hidden_weights = init_h_weights
test_agent.hidden_bias = init_h_bias
test_agent.action_weights = init_a_weights
test_agent.action_bias = init_a_bias

test_agent.record(state, action, reward)

test_agent.update()

assert(np.allclose(test_agent.hidden_weights, exp_h_weights))
assert(np.allclose(test_agent.hidden_bias, exp_h_bias))
assert(np.allclose(test_agent.action_weights, exp_a_weights))
assert(np.allclose(test_agent.action_bias, exp_a_bias))

print("All Tests Passed!")


All Tests Passed!


### Step 2
Now that we have the agent we can start using it to play the game. This section we run through a bunch of games (restart when hit done) and after each game we run the update

In [100]:
def softmax(x):
    """Compute the softmax of vector x."""
    exps = np.exp(x)
    return exps / np.sum(exps)

In [103]:
# Training Parameter
MAX_GAMES = 50000 # This is like epochs
MAX_GAME_TIME = 999

LEARNING_RATE = 0.00001
GAMMA = 0.99
HIDDEN_NODES = 10

REPORT_INTERVAL = 1000 # How often to report the progress.

env = gym.make('CartPole-v0')
e_size = 4
a_size = 2
player = agent(e_size, HIDDEN_NODES, a_size, lr=LEARNING_RATE, gamma=GAMMA)

for i_episode in range(MAX_GAMES):
    state = env.reset()
    player.reset()
    running_reward = 0
    
    for t in range(MAX_GAME_TIME):
        
        Q_vals = player.predict(state)
        # Do a weighted sample to pick your action
        action_prob = softmax(Q_vals[0,:])
        action = np.random.choice(range(a_size), p=action_prob)
        
        new_state, reward, done, info = env.step(action)
        player.record(state, action, reward)
        running_reward += reward
        state = new_state
        if done:
            player.update()
            if (i_episode % REPORT_INTERVAL == 0 or i_episode == MAX_GAMES-1):
                print("Round: {0:7d}    Score: {1:7.2f}    Play-time: {2:5d}".format(i_episode, running_reward, t))
            break

[2017-11-22 22:46:42,150] Making new env: CartPole-v0


Round:       0    Score:   10.00    Play-time:     9
Round:    1000    Score:    9.00    Play-time:     8
Round:    2000    Score:   11.00    Play-time:    10
Round:    3000    Score:   10.00    Play-time:     9
Round:    4000    Score:   10.00    Play-time:     9
Round:    5000    Score:   10.00    Play-time:     9
Round:    6000    Score:   10.00    Play-time:     9
Round:    7000    Score:   11.00    Play-time:    10
Round:    8000    Score:   10.00    Play-time:     9
Round:    9000    Score:    9.00    Play-time:     8
Round:   10000    Score:   10.00    Play-time:     9
Round:   11000    Score:    9.00    Play-time:     8
Round:   12000    Score:   10.00    Play-time:     9
Round:   13000    Score:    9.00    Play-time:     8
Round:   14000    Score:   11.00    Play-time:    10
Round:   15000    Score:   10.00    Play-time:     9
Round:   16000    Score:   13.00    Play-time:    12
Round:   17000    Score:   11.00    Play-time:    10
Round:   18000    Score:    9.00    Play-time:

In [64]:
state = env.reset()
done = False
t = 0
while t<900:
    env.render()
    action = player.choose_action(state)
    state, reward, done, info = env.step(action)
    t += 1
    if done:
        print("Episode finished after {} timesteps".format(t+1))
        break
env.render(close=True)

Episode finished after 12 timesteps
