In [2]:
import numpy as np

from itertools import product

In [3]:
class Track(object):
    def __init__(self):
        """
            0 = off track
            1 = road - on track
            2 = start line
            3 = finish line
        """
        self.track = np.ones((5, 5))
        self.track[3:5, 3:5] = 0
        self.track[-1, :] *= 2
        self.track[:, -1] *= 3

    def get_next_position(self, racecar):
        """
            RaceCar racecar: RaceCar object
        """
        
        reward = -1
        
        new_x = racecar.x + racecar.velocity_x
        new_y = racecar.y + racecar.velocity_y
        
        final_x = new_x
        final_y = new_y
        
        # Compute all the unique boxes we hit on a line between the start and end points
        x_positions = np.linspace(racecar.x, new_x, num=20)
        y_positions = np.linspace(racecar.y, new_y, num=20)
        positions = zip(x_positions, y_positions)
        positions = [(np.floor(x), np.floor(y)) for x, y in positions]
        
        # Get unique discrete positions visited during this time step
        ordered_positions = []
        for pos in positions:
            if len(ordered_positions) == 0 or pos != ordered_positions[-1]:
                ordered_positions.append(pos)
                        
        # Check if the car crashes into the track at any of those time points
        #   or if it reached the finish line
        for pos_idx, pos in enumerate(ordered_positions):
            
            # ability to speed past the finish without penalty
            if self.is_terminal_state_from_coordinates(pos[0], pos[1]):
                reward = -1
                final_x, final_y = ordered_positions[pos_idx]
                break
            
            # check if the car crashes
            if self.is_out_of_bounds(pos):
                reward = -5
                crash_x, crash_y = pos
                final_x, final_y = ordered_positions[pos_idx - 1]
                racecar.velocity_x = 0
                racecar.velocity_y = 0
                break

        # If the car is not moving, the car must move at least 1 step
        if final_x == racecar.x and final_y == racecar.y:
            if self.is_out_of_bounds((final_x + 1, final_y)):
                final_y += 1
                racecar.velocity_y = 1
            elif self.is_out_of_bounds((final_x, final_y + 1)):
                final_x += 1
                racecar.velocity_x = 1
            else:
                random_choice = np.random.choice([0, 1])
                final_x += random_choice
                final_y += (1 - random_choice)
                racecar.velocity_x += random_choice
                racecar.velocity_y += (1 - random_choice)                    
        
        racecar.x = final_x
        racecar.y = final_y
        
        return reward

    def convert_cartesian_to_indexes(self, x, y):
        y_prime, x_prime = x, y
        x_prime = self.track.shape[0] - x_prime - 1
        return int(x_prime), int(y_prime)
    
    def convert_indexes_to_cartesian(self, x, y):
        y_prime, x_prime = x, y
        y_prime = self.track.shape[1] - y_prime - 1
        return int(x_prime), int(y_prime)
    
    def is_terminal_state(self, racecar):
        x, y = self.convert_cartesian_to_indexes(racecar.x, racecar.y)
        if self.track[x, y] == 3:
            return True
        return False
    
    def is_terminal_state_from_coordinates(self, x, y):
        x, y = self.convert_cartesian_to_indexes(x, y)
        if self.track[x, y] == 3:
            return True
        return False
    
    def is_out_of_bounds(self, position):
        x, y = position
        
        if x < 0 or x >= self.track.shape[1]:
            return True
        
        if y < 0 or y >= self.track.shape[0]:
            return True

        # y is reversed in our frame of reference
        x, y = self.convert_cartesian_to_indexes(x, y)

        if self.track[x, y] == 0:
            return True
        
        return False
    
    def get_random_start(self):
        # returns x and y coordinates of random start
        starts = np.argwhere(self.track == 2)
        random_start = np.random.randint(len(starts))
        start = starts[random_start]
        return self.convert_indexes_to_cartesian(*start)
    
    def get_states(self):
        return [self.convert_indexes_to_cartesian(x, y) for x, y in np.argwhere(self.track != 0)]
    
    def print_track(self, x, y):
        x, y = self.convert_cartesian_to_indexes(x, y)
        pt = np.copy(self.track)
        pt[x, y] = -1
        print(pt)
        
    

In [4]:
class RaceCar(object):
    def __init__(self):
        self.velocity_x = 0
        self.velocity_y = 0
        self.x = 0
        self.y = 0
        
        self.MAX_VELOCITY = 5
        self.MIN_VELOCITY = 0

    def get_episode(self, pi, track, actions, states, greedy=False, verbose=False):
        """
            actions: an index to action dictionary
            pi: numpy array of probabilities to take an action given the state
        
        """

        self.velocity_x = 0; self.velocity_y = 0
        self.x, self.y = track.get_random_start()
        
        rewards = []
        saved_actions = []
        visited_states = [((self.x, self.y), (self.velocity_x, self.velocity_y))]
        
        terminated = False
        while not terminated:
            state_idx = states[((self.x, self.y), (self.velocity_x, self.velocity_y))]

            # choose greedy action of action with probability pi
            if greedy:
                action_idx = np.where(pi[state_idx, :] == np.amax(pi[state_idx, :]))[0]
                action_idx = np.random.choice(action_idx)   
            else:
                action_idx = np.random.choice(len(actions), size=1, p=pi[state_idx, :])[0]    
            
            action = actions[action_idx]
            saved_actions.append(action)
            
            # Take the action
            self.velocity_x += action[0]
            self.velocity_y += action[1]
            self.velocity_x = min(max(self.velocity_x, self.MIN_VELOCITY), self.MAX_VELOCITY)
            self.velocity_y = min(max(self.velocity_y, self.MIN_VELOCITY), self.MAX_VELOCITY)

            # save the rewards and states
            reward = track.get_next_position(self)
            if len(visited_states) > 100:
                reward = -1000
                terminated = True
            else:
                terminated = track.is_terminal_state(self)
            
            rewards.append(reward)
            visited_states.append(((self.x, self.y), (self.velocity_x, self.velocity_y)))
            
            if verbose:
                track.print_track(self.x, self.y)
                print('Velocity is now: ', (self.velocity_x, self.velocity_y))
        
        return visited_states, saved_actions, rewards
        
    def get_states(self):
        return list(product(
                range(self.MIN_VELOCITY, self.MAX_VELOCITY + 1),
                range(self.MIN_VELOCITY, self.MAX_VELOCITY + 1)
            )
        )

In [28]:
class MonteCarlo(object):
    def __init__(self, actions, agent, environment):
        self.actions_list = actions
        self.agent = agent
        self.environment = environment
        
        self.actions_to_idx = {action: idx for idx, action in enumerate(self.actions_list)}
        self.idx_to_actions = {idx: action for idx, action in enumerate(self.actions_list)}

        self.states_list = list(product(environment.get_states(), agent.get_states()))
        self.states_to_idx = {state: idx for idx, state in enumerate(self.states_list)}

        self.initialize_random_policy()
        
    def initialize_random_policy(self):
        self.Q = np.random.random((len(self.states_to_idx), len(self.actions_to_idx)))
        self.Returns = {(s, a): [] for s, a in product(self.states_to_idx, self.actions_to_idx)}

        self.pi = np.random.random((len(self.states_to_idx), len(self.actions_to_idx)))
        self.pi = self.pi / np.sum(self.pi, axis=1)[:, None]
    
    def on_policy_learning(self, num_iterations, epsilon=.1, gamma=1):
        """
            epsilon: sets minimum probability threshold for policy pi
            gamma: discount factor in rewards
            
        """

        count = 0
        learning = True

        while learning:

            visited_states, actions_taken, rewards = self.agent.get_episode(
                self.pi, 
                self.environment, 
                self.idx_to_actions, 
                self.states_to_idx
            )

            has_visited_first_occurence = {}
            for idx, sa in enumerate(zip(visited_states, actions_taken)):
                s, a = sa
                if (s, a) not in has_visited_first_occurence:
                    self.Returns[(s, a)].append(sum(rewards[idx:]))
                    self.Q[self.states_to_idx[s], self.actions_to_idx[a]] = np.mean(self.Returns[(s, a)]) 
                    has_visited_first_occurence[(s, a)] = 0

            for s in visited_states:
                # We can take the greedy action, but it's probably better to break ties
                # a_star = np.argmax(Q[states_to_idx[s],:])
                action_idx = np.where(self.Q[self.states_to_idx[s],:] == np.amax(self.Q[self.states_to_idx[s],:]))[0]
                a_star = np.random.choice(action_idx)
                for action_idx, a in enumerate(self.actions_list):
                    if a_star == action_idx:
                        self.pi[self.states_to_idx[s], action_idx] = 1 - epsilon + epsilon / len(self.actions_list)
                    else:
                        self.pi[self.states_to_idx[s], action_idx] = epsilon / len(self.actions_list)

            count += 1

            if count >= num_iterations: learning = False
        
        return self.pi

In [29]:
car = RaceCar()
track = Track()

actions = list(product([-1, 0, 1], [-1, 0, 1]))
mc = MonteCarlo(actions, car, track)

# car.get_episode(pi, track, idx_to_actions, states_to_idx, greedy=True, verbose=True)

In [30]:
pi = mc.on_policy_learning(1000)

In [34]:
car.get_episode(pi, track, mc.idx_to_actions, mc.states_to_idx, greedy=True, verbose=True)

[[ 1.  1.  1.  1.  3.]
 [ 1.  1.  1.  1.  3.]
 [ 1.  1.  1.  1.  3.]
 [ 1. -1.  1.  0.  0.]
 [ 2.  2.  2.  0.  0.]]
('Velocity is now: ', (0, 1))
[[ 1.  1.  1.  1.  3.]
 [ 1.  1.  1.  1.  3.]
 [ 1.  1. -1.  1.  3.]
 [ 1.  1.  1.  0.  0.]
 [ 2.  2.  2.  0.  0.]]
('Velocity is now: ', (1, 1))
[[ 1.  1.  1.  1.  3.]
 [ 1.  1.  1.  1. -1.]
 [ 1.  1.  1.  1.  3.]
 [ 1.  1.  1.  0.  0.]
 [ 2.  2.  2.  0.  0.]]
('Velocity is now: ', (2, 1))


([((1, 0), (0, 0)), ((1, 1), (0, 1)), ((2, 2), (1, 1)), ((4.0, 3.0), (2, 1))],
 [(0, 1), (1, 0), (1, 0)],
 [-1, -1, -1])

In [33]:
mc.actions_to_idx

{(-1, -1): 0,
 (-1, 0): 1,
 (-1, 1): 2,
 (0, -1): 3,
 (0, 0): 4,
 (0, 1): 5,
 (1, -1): 6,
 (1, 0): 7,
 (1, 1): 8}