In [89]:
# Import libraries

import os, sys, warnings
import numpy as np
import matplotlib.pyplot as plt
from IPython import display
from IPython.display import Markdown as md

#sys.path.append('../Environment/')
#from Environment.environment import *
%run ../Environment/environment.ipynb

HBox(children=(VBox(children=(Button(description='Pos+', style=ButtonStyle()), Button(description='Pos-', styl…

In [90]:
def print_matrix(name, matrix):
    matrix = np.atleast_2d(matrix)
    s = ''
    for idx, v in enumerate(matrix.flatten()):
        s = s + ' {:0.3f} '.format(v)
        if idx != (matrix.size-1):
            if idx % matrix.shape[0] == matrix.shape[0]-1:
                s = s + '\\\\'
            else:
                s = s + '&'
    return md(name + ' = $' + '\\begin{bmatrix}' + s + '\\end{bmatrix}$')

# Initialize Environment / Model Variables

In [91]:
# Restart the environment
env = EnvironmentState()

# Define the actions
ACTIONS = np.asarray([0, 1, 2, 3, 4])
NUM_ACTIONS = len(ACTIONS)

# Define the state space, defined to be the possible y positions of te agent.
NUM_POS_STATES = (env.REGION_HEIGHT/0.05)+1
NUM_ANG_STATES = (160/3)+1
POS_STATES = np.linspace(0, env.REGION_HEIGHT, int(NUM_POS_STATES)) - env.REGION_HEIGHT/2
ANG_STATES = np.round(np.round(np.linspace(100, 260, int(NUM_ANG_STATES))))

NUM_ANG_STATES = 1
ANG_STATES = np.array([0])

# Initialize state-value function
Q = np.zeros((int(NUM_POS_STATES), int(NUM_ANG_STATES), int(NUM_ACTIONS)))

# Q is a state x action matrix
# Column 1 is wait action, 2 is pos up, 3 is pos down, 4 is aim up, 5 is aim down
# Each row corresponds to a pos state

# Run Sarsa Learning Algorithm

In [None]:
# Initialize parameters
NUM_OBS = 0 # Number of obstacles
NUM_STEPS = 10 # Number of steps in an episode
NUM_EPISODES = 100

alpha = 0.02
eps   = 0.01
gamma = 0.90

hits  = np.zeros((NUM_EPISODES))
R_SUM = np.zeros((NUM_EPISODES))
MSE   = np.zeros((NUM_EPISODES))

def action_from_epsilon_greedy(epsilon, Q_at_state):
    Q_at_state = np.squeeze(Q_at_state)
    # Create a policy based on epsilon greedy
    if len(np.unique(Q_at_state)) <= 1: # If all Q values are same, set policy to equal for all actions
        pi = np.zeros((NUM_ACTIONS))
        pi[:] = 1/NUM_ACTIONS
    elif len(np.unique(Q_at_state)) <= 4: # If multiple Q have a max, assign priority to the first max Q value
        pi = np.asarray([((1 - eps) + eps/NUM_ACTIONS if Q_at_state[i]==np.max(Q_at_state) else eps/NUM_ACTIONS) for i in range(NUM_ACTIONS)])
        ind = np.where(pi == np.max(pi))[0]
        pi[ind[1:]] = eps/NUM_ACTIONS
    else:
        pi = np.asarray([((1 - eps) + eps/NUM_ACTIONS if Q_at_state[i]==np.max(Q_at_state) else eps/NUM_ACTIONS) for i in range(NUM_ACTIONS)])
    return np.random.choice(ACTIONS, replace = True, p = pi)

def get_env_state(env):
    pos = np.argmin(np.abs(POS_STATES - env._agent_position_y))
    ang = np.argmin(np.abs(ANG_STATES - env._agent_aiming_angle))
    return pos, ang

# Sarsa-learning model
for episode in range(NUM_EPISODES):
    # Try different levels of complexity for agent and target position
    env.initialize() 
    env.centered_obstruction()
    
    # Choose action based on epsilon-greedy policy
    a = action_from_epsilon_greedy(eps, Q[sp, 0, :])
    
    R_vector = np.zeros((NUM_STEPS))
    n = 0
    while True:
        # Find the current state
        sp, sa = get_env_state(env)
        
        # Take action
        env.take_action(a)
        
        # With the action taken, the agent is now in the future state s'. Find that state
        sp_prime, sa_prime = get_env_state(env)
        # choose a_prime using epsilon greedy policy 
        a_prime = action_from_epsilon_greedy(eps, Q[sp_prime, 0, :])
        
        # Collect the reward
        R = env.compute_reward()
        R_vector[n] = R
        
        # Update Q value
        Q[sp, 0, a] = Q[sp, 0, a] + alpha*(R + gamma*Q[sp_prime, 0, a_prime] - Q[sp, 0, a])
        
        # use as next action
        a = a_prime
        
        n = n + 1
        # At the end of the episode, determine if the algorithm hit the target and break
        # the loop to move on to the next episode
        if n == NUM_STEPS:
            hits[episode] = 1 if R == 1 else 0 # If agent is aiming at the target, R is 1. 
            R_SUM[episode] = np.sum(R_vector)
            MSE[episode]  = np.var(R_vector) # For the unbiased case, the RSE is the variance
            break

# print(hits)
print(np.sum(hits)/len(hits))

In [None]:
# Example graphs to make:
# Sum of rewards vs. Episodes
# % accuracy vs. Episodes
# RMSE vs. Episodes
plt.figure()
episodes = np.arange(1, NUM_EPISODES+1, 1)

R_Sum_Ave = np.zeros((NUM_EPISODES))
accuracy = np.zeros((NUM_EPISODES))
MSE_Ave = np.zeros((NUM_EPISODES))

R_Sum_Ave[0] = R_SUM[0]
R_Sum_Ave[1:] = np.asarray([np.sum(R_SUM[:n])/(n+1) for n in range(1, NUM_EPISODES)])

accuracy[0] = hits[0]
accuracy[1:] = np.asarray([np.sum(hits[:n])/(n+1) for n in range(1, NUM_EPISODES)])

MSE_Ave[0] = MSE[0]
MSE_Ave[1:] = np.asarray([np.sum(MSE[:n])/(n+1) for n in range(1, NUM_EPISODES)])

# Make an example graph
fig, axes = plt.subplots(figsize = [12,18], nrows = 3, ncols = 1, sharex = True)
ax1 = axes[0]; ax2 = axes[1]; ax3 = axes[2]

ax1.plot(episodes, R_Sum_Ave, 'r-')
ax1.set_ylabel('Average Sum of Rewards', fontsize = 14)

ax2.plot(episodes, accuracy, 'r-')
ax2.set_ylabel('Average Accuracy', fontsize = 14)

ax3.plot(episodes, MSE_Ave, 'r-')
ax3.set_ylabel('Mean Squared Error of Reward')
ax3.set_xlabel('Episodes', fontsize = 14)

plt.show(block = False)

In [None]:
# Write the data to a comma seperated text file for comparison with another algorithm.
# Naming format:
# Q-Learning_mo_nr.txt
# mo refers to the number of obstacles used when the model learned
#   (e.g., 0o is 0 obstacles, 1o is 1 obstacle, etc.)
# nr will be filled with:
#   nr: No Randomized (neither target nor agent were randomized per episode)
#   tr: Target Randomized (only the target was randomized per episode)
#   ar: Agent Randomized (only the agent was randomized per episode)
#   br: Both Randomized (both the target and agent were randomized per episode)

# Variables saved are:
#   Episodes
#   Hit/Miss for that episode
#   Sum of Rewards for that episode (SR)
#   Mean Square Error (Variance) for that episode (MSE)

# Write the file
f = open('Q-Learning_0o_tr.txt', 'w')

# Write the headers
f.write('Episode,Hit,SR,MSE' + '\n')

# Write the data
for n in range(NUM_EPISODES):
    f.write(str(episodes[n]) + ',' + str(hits[n]) + ',' + str(R_SUM) + ',' + str(MSE) + '\n')
    
# Close the file when finished
f.close()