### Cart-Pole simulation and Reinforcement learning

In [1]:
from __future__ import division, print_function
from env import CartPole, Physics
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import lfilter

def initialize_mdp_data(num_states):
    """
    Taking number of states as parameter and initialize mdp data
    """
    # index for probability: (state now, state after, action)
    transition_counts = np.zeros((num_states, num_states, 2))
    transition_probs = np.ones((num_states, num_states, 2)) / num_states
    # Index zero is count of rewards being -1 , index 1 is count of total num state is reached
    reward_counts = np.zeros((num_states, 2))
    reward = np.zeros(num_states)
    value = np.random.rand(num_states) * 0.1

    return {
        'transition_counts': transition_counts,
        'transition_probs': transition_probs,
        'reward_counts': reward_counts,
        'reward': reward,
        'value': value,
        'num_states': num_states,
    }

##### Calculate best action according to current state
- brute force search: loop over all *states* and *action* 
- more elegant way to do this: **dot product V and P_sa**
- Analysis: V - (1 * num_states), P_sa[state] - (num_states * 2) -> endup having array with dimension (1,2), every entry being payoff if taking corresponding action

In [2]:
def choose_action(state, mdp_data):
    expect_value = mdp_data['value'] @ mdp_data['transition_probs'][state]
    if expect_value[0] == expect_value[1]:
        return np.random.randint(2) # randomly choose an action
    return np.argmax(expect_value)

##### Record transition counts and reward counts for update

In [3]:

def update_mdp_transition_counts_reward_counts(mdp_data, state, action, new_state, reward):
    """
    record reward and state transition counts
    """
    # record number of times reward -1 is seen
    if reward == -1:
        mdp_data['reward_counts'][new_state][0] += 1
    mdp_data['reward_counts'][new_state][1] += 1 # total time for reward
    mdp_data['transition_counts'][state][new_state][action] += 1
    return None

##### Actually update transition probability after one iteration
- recall update of P_sa need (#took action a in state s) as denominator and (#~got to state s_prime) as numerator
- Have P_sa[state][new_state][action] stored, so **sum over new_state** to get denominator(which is sum in axis=1)
- Need to perform update for all new_state => **leave index for new_state as slicing**
- NOTICE: **when slicing the array, must use comma instead of bracket notation**

##### Detailed explanation of difference between bracket and comma notation
- For example x = np.array([1,2], [3,4], [5,6])
x[:][1] = array([3,4]), means take all indices of x, **then take index 1 along the first axis of the result**, namely the **brackets apply in sequential order**
x[:, 1] = array([2,4,6]), means take all indices of x along the first axis, but only index 1 along the second, which is the proper way for matrix slicing
- x[1][1] and x[1, 1] are only equivalent because **indexing array with integer shifts all remaining axes towards the front of the shape**, so the first axis of x[1] is the second axis of x, but slicing don't make the shift
- ***Should always use commas instead of multiple indexing steps***

In [4]:
def update_mdp_transition_probs_reward(mdp_data):
    """
    Update transition probability after one iteration
    """
    transition_counts = mdp_data['transition_counts']
    num_states = mdp_data['num_states']
    # times took action in state - sum up new_state
    num_counts = transition_counts.sum(axis=1)
    # update P_sa = # get to new state / # take action in state
    for s in range(num_states):
        for action in range(2): # action set being [0,1]
            if num_counts[s][action]:
                # we have taken action in state s
                mdp_data['transition_probs'][s, :, action] = transition_counts[s, :, action] / num_counts[s, action]
    reward_counts = mdp_data['reward_counts']
    for i in range(num_states):
        sum_count = reward_counts[i, 1] # first index being times to reach reward
        if sum_count: # this state has reward assigned 
            mdp_data['reward'][i] = -reward_counts[i, 0] / sum_count
    return None

##### Value iteration for solving Bellman's equation
- brute force way: loop over all possible states and actions, obtain new entries one by one
- elegant way: value function **dot product** transition probs and **find max along action axis**
- Additionally for convergence criterion, instead of using all() for all entry, just **compare the max change with tolerance**

In [5]:
def update_mdp_value(mdp_data, tolerance, gamma):
    """
    Parameters
    ----------
    mdp_data: tuple of states and reward
    tolerance: threshold for convergence determination
    gamma: discount factor
    Returns
    -------
    whether value iteration converged in one iteration
    """
    V = mdp_data['value']
    P_sa = mdp_data['transition_probs']
    counter = 0 # record the number of iterations
    while True:
        counter += 1
        # store value function for synchronous update
        curr_update = mdp_data['reward'] + gamma * (V @ P_sa).max(axis=1) # before max, shape = (t,2)
        if max(abs(V - curr_update)) < tolerance:
            break
        V = curr_update
    mdp_data['value'] = V
    return counter == 1

In [8]:
def main(plot=True):
    # Seed the randomness of the simulation so this outputs the same thing each time
    seed = 0
    np.random.seed(seed)

    # Simulation parameters
    pause_time = 0.0001
    min_trial_length_to_start_display = 100
    display_started = min_trial_length_to_start_display == 0

    NUM_STATES = 163
    GAMMA = 0.995
    TOLERANCE = 0.01
    NO_LEARNING_THRESHOLD = 20

    # Time cycle of the simulation
    time = 0

    # These variables perform bookkeeping (how many cycles was the pole
    # balanced for before it fell). Useful for plotting learning curves.
    time_steps_to_failure = []
    num_failures = 0
    time_at_start_of_current_trial = 0

    # You should reach convergence well before this
    max_failures = 500

    # Initialize a cart pole
    cart_pole = CartPole(Physics())
    # starting simulation with state tuple being all zero
    x, x_dot, theta, theta_dot = 0.0, 0.0, 0.0, 0.0
    state_tuple = (x, x_dot, theta, theta_dot)

    state = cart_pole.get_state(state_tuple) # get discrete representation
    """Display Control"""
    if min_trial_length_to_start_display == 0 or display_started == 1:
        cart_pole.show_cart(state_tuple, pause_time)

    mdp_data = initialize_mdp_data(NUM_STATES)

    # convergence criterion: several iteration converge in one flop
    consecutive_no_learning_trials = 0
    while consecutive_no_learning_trials < NO_LEARNING_THRESHOLD:
        action = choose_action(state, mdp_data)
        # Get the next state by simulating the dynamics
        state_tuple = cart_pole.simulate(action, state_tuple)
        # increment simulation time
        time += 1
        # Get the state number corresponding to new state vector
        new_state = cart_pole.get_state(state_tuple)
        """Display Control"""
        if display_started == 1:
            cart_pole.show_cart(state_tuple, pause_time)

        # specify reward function - -1 for fail, 0 for other
        R = -1 if new_state == NUM_STATES - 1 else 0
        update_mdp_transition_counts_reward_counts(mdp_data, state, action, new_state, R)
        if new_state == NUM_STATES - 1:
            # update transition probs whenever fails
            update_mdp_transition_probs_reward(mdp_data)
            converged_in_one_iteration = update_mdp_value(mdp_data, TOLERANCE, GAMMA)
            if converged_in_one_iteration:
                consecutive_no_learning_trials += 1
            else:
                consecutive_no_learning_trials = 0 # reset

        # If fail, reinitialize the states
        if new_state == NUM_STATES - 1:
            num_failures += 1
            if num_failures >= max_failures:
                break
            print(f'[INFO] Failure number {num_failures}')
            time_steps_to_failure.append(time - time_at_start_of_current_trial)
            # time_steps_to_failure[num_failures] = time - time_at_start_of_current_trial
            time_at_start_of_current_trial = time

            if time_steps_to_failure[num_failures - 1] > min_trial_length_to_start_display:
                display_started = 1
            # Reinitialize state
            x = -1.1 + np.random.uniform() * 2.2
            x_dot, theta, theta_dot = 0.0, 0.0, 0.0
            state_tuple = (x, x_dot, theta, theta_dot)
            state = cart_pole.get_state(state_tuple)
        else: # update current state
            state = new_state
    # """Display Control"""
    # # after convergence do one more simulation for state showing
    # while True:
    #     try:
    #         state = cart_pole.get_state(state_tuple)
    #         action = choose_action(state, mdp_data)
    #         state_tuple = cart_pole.simulate(action, state_tuple)
    #         new_state = cart_pole.get_state(state_tuple)
    #         cart_pole.show_cart(state_tuple, pause_time)
    #         if new_state == NUM_STATES - 1:
    #             # failure for current trial
    #             break
    #     except KeyboardInterrupt:
    #         break
    # plt.show()

    if plot:
        # plot the learning curve (time balanced vs. trial)
        log_tstf = np.log(np.array(time_steps_to_failure))
        plt.plot(np.arange(len(time_steps_to_failure)), log_tstf, 'k')
        window = 30
        w = np.array([1/window for _ in range(window)])
        weights = lfilter(w, 1, log_tstf)
        x = np.arange(window//2, len(log_tstf) - window//2)
        plt.plot(x, weights[window:len(log_tstf)], 'r--')
        plt.xlabel('Num failures')
        plt.ylabel('Log of num steps to failure')
        plt.title('seed = {}'.format(seed))
    plt.show()
    return np.array(time_steps_to_failure)

##### Calling main() for simulation

In [None]:
main()