In [1]:
import numpy as np
import random

In [2]:
# terrain = \
# '''
#  \                                              /
#   \      /wwwww\                               /
#    \    /       \            _____            /
#     \__/         \          /     \          /
#                   \        /       \        /
#                    \      /         \      /
#                     \    /           \    /
#                      \__/             \__/
# '''
terrain = \
'''
\             /
 \           /
  \__/www\__/
'''
chars = ['\\', '-', '_', '/', 'w']
grid = terrain.split('\n')[1:-1]
max_length = max([len(r) for r in grid])
states = []
for a in range(max_length):
    for b in range(len(grid)):
        if grid[b][a] != ' ':
            states.append(grid[b][a])
            break

# AVERAGING_RATE = .1
DISCOUNT = .9
LEARNING_RATE = .01

In [None]:
action_momentum_dict = {
    'right': 1,
    'left': -1,
    'neutral': 0
}
state_momentum_dict = {
    '\\': 1,
    '/': -1,
    '_': 0,
    'w': 0,
    'x': 0
}

# Q = Weights * Feature Vector
def phi(states, state, momentum, action_digit):

    phi_vector = []

    # Momentum
    phi_vector.append(momentum/50)

    # Direction (One Hot Encoding) (OHE)
    if momentum > 0:
        phi_vector.extend([0, 0, 1])
    elif momentum < 0:
        phi_vector.extend([1, 0, 0])
    else:
        phi_vector.extend([0, 1, 0])

    # Action (OHE)
    if action_digit == 1:
        phi_vector.extend([0, 0, 1])
    elif action_digit == -1:
        phi_vector.extend([1, 0, 0])
    else:
        phi_vector.extend([0, 1, 0])

    w_states = []
    for i, s in enumerate(states):
        if s == 'w':
            w_states.append(i)
    num_positive_slopes = 0
    num_negative_slopes = 0
    num_neutral_slopes = 0
    if state in w_states:
        phi_vector.append(0) # Distance
        phi_vector.extend([0, 0, 0]) # Slope Types
        pass
    else:
        if state > w_states[-1]:
            phi_vector.append((state - w_states[-1])/50) # Distance
            for i in range(w_states[-1], state+1):
                if states[i] == '\\':
                    num_positive_slopes += 1
                elif states[i] == '/':
                    num_negative_slopes += 1
                else:
                    num_neutral_slopes += 1
        else:
            phi_vector.append((w_states[0] - state)/50) # Distance
            for i in range(state, w_states[0]+1):
                if states[i] == '\\':
                    num_positive_slopes += 1
                elif states[i] == '/':
                    num_negative_slopes += 1
                else:
                    num_neutral_slopes += 1
        total_slopes = num_positive_slopes + num_negative_slopes + num_neutral_slopes
        phi_vector.extend([num_positive_slopes/total_slopes, num_negative_slopes/total_slopes, num_neutral_slopes/total_slopes]) # Slope Types

    tile_encoding = [0] * 20
    tile_encoding[round((state/len(states)) * (len(tile_encoding) - 1))] = (num_positive_slopes - num_negative_slopes)/50
    phi_vector.extend(tile_encoding) # Tile Encoding, Distance to Goal

    return np.array(phi_vector)


def epsilon_greedy_policy(q_weights, states, state, momentum, epsilon=0.1):

    if random.random() < epsilon:
        return random.choice(['right', 'left', 'neutral'])
    else:
        best_q = float('-inf')
        best_action = []
        for a in ['right', 'left', 'neutral']:
            q_val = np.dot(q_weights, phi(states, state, momentum, action_momentum_dict[a]))
            if best_q < q_val:
                best_q = q_val
                best_action = [a]
            elif best_q == q_val:
                best_action.append(a)

        return random.choice(best_action)


def semi_gradient_SARSA(states, lr=LEARNING_RATE, d=DISCOUNT, episode_count=500):

    weights_len = len(phi(states, 0, 0, 0))
    q_weights = np.random.uniform(-1, 1, size=weights_len)

    starting_states = []
    for i, s in enumerate(states):
        if s != 'w' and s != 'x':
            starting_states.append(i)

    while episode_count > 0:
        episode_count -= 1
        
        # Initialize S
        s = random.choice(starting_states)
        m = 0
        a = epsilon_greedy_policy(q_weights, states, s, m)
        r = 0
        avg_r_est = 0 # Not a true average reward, only an est. at best
        fin_ep = False

        while not fin_ep:
            # Take action A, observe R, S'
            m_prime = m + action_momentum_dict[a] + state_momentum_dict[states[s]]
            
            if m_prime > 0:
                s_prime = min(len(states)-1, s+1)
            elif m_prime < 0:
                s_prime = max(0, s-1)
            else:
                s_prime = s

            m_prime += state_momentum_dict[states[s_prime]]

            if states[s_prime] == 'w':
                if m_prime == 0:
                    r = 10
                    fin_ep = True
                else:
                    r = -1
            elif s == s_prime:
                r = -5
            else:
                r = -1

                
            # Choose A' as a function q(S', ., w) using epsilon greedy
            a_prime = epsilon_greedy_policy(q_weights, states, s, m)
            
            delta = r - avg_r_est \
                + np.dot(q_weights, phi(states, s_prime, m_prime, action_momentum_dict[a_prime])) \
                - np.dot(q_weights, phi(states, s, m, action_momentum_dict[a]))
            avg_r_est += b * delta
            q_weights += lr * delta * phi(states, s, m, action_momentum_dict[a])

            # Update S, A with S', A' accordingly
            s = s_prime
            m = m_prime
            a = a_prime
    
    return q_weights

In [4]:
q_weights = semi_gradient_SARSA(states, episode_count=500)

In [5]:
q_weights

array([-1.95405285e-01, -2.80669307e+02, -2.76743069e+02, -2.81232547e+02,
       -2.79721964e+02, -2.79704218e+02, -2.79552507e+02])

In [6]:
for a in ['right', 'left', 'neutral']:
    q_val = np.dot(q_weights, phi(states, 0, 0, action_momentum_dict[a]))
    print(f"Action: {a}, Q-value: {q_val}")

Action: right, Q-value: -556.2955764094316
Action: left, Q-value: -556.4650333931065
Action: neutral, Q-value: -556.4472874821809


In [7]:
def greedy_policy(q_weights, states, state, momentum):

    return epsilon_greedy_policy(q_weights, states, state, momentum)

def test_weights(states, starting_state, q_weights):
        
    # Initialize S
    s = starting_state
    m = 0
    action_history = []
    a = greedy_policy(q_weights, states, s, m)
    
    while True:
        print(a)

        action_history.append(a)

        # Take action A, observe R, S'
        m_prime = m + action_momentum_dict[a] + state_momentum_dict[states[s]]
        
        if m_prime > 0:
            s_prime = min(len(states)-1, s+1)
        elif m_prime < 0:
            s_prime = max(0, s-1)
        else:
            s_prime = s

        m_prime += state_momentum_dict[states[s_prime]]

        if states[s_prime] == 'w':
            if m_prime == 0:
                break
            
        # Choose A' as a function q(S', ., w) using greedy
        a_prime = greedy_policy(q_weights, states, s, m)
                
        # Update S, A with S', A' accordingly
        s = s_prime
        m = m_prime
        a = a_prime

In [8]:
print(terrain)
test_weights(states, 0, q_weights)


\             /
 \           /
  \__/www\__/

right
right
right
right
right
right
right
right
right
right
right
neutral
right
right
right
right
right
right
right
right
right
right
right
left
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
neutral
right
right
neutral
right
right
neutral
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
right
neutral
right
left
right
right
right
right
