In [1]:
import random
from typing import NamedTuple
import numpy as np

class State(NamedTuple):
    position: tuple[int, int]
    velocity: tuple[int, int]

class Action(NamedTuple):
    dx: int
    dy: int

def line_intersection(a1, a2, b1, b2):
    """ 
    Returns the point of intersection of the lines passing through a2,a1 and b2,b1.
    a1: [x, y] a point on the first line
    a2: [x, y] another point on the first line
    b1: [x, y] a point on the second line
    b2: [x, y] another point on the second line
    """
    s = np.vstack([a1,a2,b1,b2])       
    h = np.hstack((s, np.ones((4, 1))))
    l1 = np.cross(h[0], h[1])          
    l2 = np.cross(h[2], h[3])          
    x, y, z = np.cross(l1, l2)         
    if z == 0:                         
        return (float('inf'), float('inf'))
    return (x/z, y/z)


class TrackEnv:
    def __init__(self, track_file, min_velocity=0, max_velocity=4, no_change_prob=0.1):
        self.track_file = track_file

        with open(track_file, 'r') as f:
            self.track = f.read().splitlines()

        self.track = np.array([list(row) for row in self.track])

        self.width = len(self.track[0])
        self.height = len(self.track)

        self.start_line = np.where(self.track == 'S')
        self.start_line = sorted(list(zip(self.start_line[1], self.start_line[0])))

        self.finish_line = np.where(self.track == 'F')
        self.finish_line = sorted(list(zip(self.finish_line[1], self.finish_line[0])))

        finish_line_A = self.finish_line[0]
        finish_line_B = self.finish_line[-1]

        if finish_line_A[0] == finish_line_B[0]: # finish line is vertical
            self.finish_line_vec = ((finish_line_A[0], finish_line_A[1] - 0.5), (finish_line_B[0], finish_line_B[1] + 0.5))
        else: # finish line is horizontal
            self.finish_line_vec = ((finish_line_A[0] - 0.5, finish_line_A[1]), (finish_line_B[0] + 0.5, finish_line_B[1]))

        self.min_velocity = min_velocity
        self.max_velocity = max_velocity

        self.velocities = [(dx, dy) for dx in range(min_velocity, max_velocity + 1) for dy in range(min_velocity, max_velocity + 1) if (dx != 0 or dy != 0)]
        self.zero_velocity = (0, 0)
        self.actions = [Action(dx, dy) for dx in range(-1, 2) for dy in range(-1, 2)]

        self.no_change_prob = no_change_prob

    def out_of_bounds(self, x, y):
        return x < 0 or x >= self.width or y < 0 or y >= self.height or self.track[y][x] == 'X' or self.track[y][x] == 'F'
    
    def out_of_bounds_vec(self, start, velocity):
        end = (start[0] + velocity[0], start[1] - velocity[1])
        return self.out_of_bounds(end[0], end[1])
    
    def finish_line_crossed(self, start, velocity):
        end = (start[0] + velocity[0], start[1] - velocity[1])
        intersection =  line_intersection(self.finish_line_vec[0], self.finish_line_vec[1], start, end)

        finish_line_start = self.finish_line_vec[0]
        finish_line_end = self.finish_line_vec[1]

        if intersection == (float('inf'), float('inf')):
            return False
        
        if intersection[0] < min(start[0], end[0]) or intersection[0] > max(start[0], end[0]):
            return False
        
        if intersection[1] < min(start[1], end[1]) or intersection[1] > max(start[1], end[1]):
            return False
        
        if intersection[0] < min(finish_line_start[0], finish_line_end[0]) or intersection[0] > max(finish_line_start[0], finish_line_end[0]):
            return False
        
        if intersection[1] < min(finish_line_start[1], finish_line_end[1]) or intersection[1] > max(finish_line_start[1], finish_line_end[1]):
            return False
        
        return True
    
    def step(self, state: State, action: Action) -> tuple[State, bool]:
        """
        state: tuple of position and velocity ((x_pos, y_pos), (x_vel, y_vel))
        action: tuple of x and y acceleration (dx, dy)

        returns: new state, new velocity, terminal
        terminal is True if the car crossed the finish line
        """
        if random.random() < self.no_change_prob:
            action = Action(0, 0)

        position, velocity = state
        new_velocity = (velocity[0] + action[0], velocity[1] + action[1])
        new_position = (position[0] + new_velocity[0], position[1] - new_velocity[1])

        terminal = self.finish_line_crossed(position, new_velocity)

        if terminal:
            return State(new_position, new_velocity), terminal

        # If the car went out of bounds, put it on a random place on the start line
        if self.out_of_bounds_vec(position, new_velocity):
            new_position = self.random_start_position()
            new_velocity = (0, 0)

        return State(new_position, new_velocity), terminal

    
    def is_valid_action(self, velocity, action):
        new_velocity = (velocity[0] + action[0], velocity[1] + action[1])

        return (new_velocity[0] >= self.min_velocity and
                new_velocity[0] <= self.max_velocity and
                new_velocity[1] >= self.min_velocity and
                new_velocity[1] <= self.max_velocity and
                (new_velocity[0] != 0 or new_velocity[1] != 0))
    

    def is_starting_position(self, position):
        return position in self.start_line
    
    def possible_actions(self, state: State):
        actions = [action for action in self.actions if self.is_valid_action(state.velocity, action)]
        return actions
    
    def non_terminal_positions(self):
        return [(x, y) for x in range(self.width) for y in range(self.height) if self.track[y][x] != 'X' and self.track[y][x] != 'F']
    
    def non_terminal_states(self):
        positions = self.non_terminal_positions()

        states = [State(position, velocity) for position in positions for velocity in self.velocities]

        states += [State(position, self.zero_velocity) for position in self.start_line]

        return states
    
    def random_start_position(self):
        return random.choice(self.start_line)
    
    def random_start_state(self):
        return State(self.random_start_position(), self.zero_velocity)
    
    def start_states(self):
        return [State(position, self.zero_velocity) for position in self.start_line]
    
    def print_car_position(self, position):
        track = self.track.copy()
        track[position[1]][position[0]] = 'C'
        print('\n'.join([''.join(row) for row in track]))

    def print_state(self, state):
        position, velocity = state
        self.print_car_position(position)
        print(f'Velocity: {velocity}')
    
    def __repr__(self):
        return '\n'.join([''.join(row) for row in self.track])
    
    def __getitem__(self, key):
        return self.track[key]
        

### Exploring starts

In [2]:
def generate_episode(env, policy, max_steps=300, start_state=None, start_action=None, const_reward = -1):
    if start_state is not None:
        state = start_state
    else:
        state = env.random_start_state()
    episode = []

    if start_action is not None:
        action = start_action
    else:
        action = policy[state]

    for i in range(max_steps):
        next_state, terminal = env.step(state, action)
        episode.append((state, action, const_reward))
        
        if terminal:
            break
        state = next_state

        action = policy[state]
        
    return episode, terminal

In [3]:
track = TrackEnv('./tracks/track.txt')

In [4]:
from tqdm import tqdm


n_iterations = 100000

states = track.non_terminal_states()
# initialization

policy = {state: random.choice(track.possible_actions(state)) for state in states}
Q = {state: {action: 0 for action in track.possible_actions(state)} for state in states}
N = {state: {action: 0 for action in track.possible_actions(state)} for state in states}

for i in tqdm(range(n_iterations)):
    start_state = random.choice(states)
    start_action = random.choice(track.possible_actions(start_state))
    episode, terminal = generate_episode(track, policy, start_state=start_state, start_action=start_action, max_steps=1000)

    if terminal:
        episode_return = 0
    else:
        episode_return = -1000

    for i, (state, action, reward) in list(enumerate(episode))[::-1]:
        if (state, action) not in [(s, a) for s, a, _ in episode[:i]]:
            episode_return += reward
            N[state][action] += 1
            Q[state][action] += (episode_return - Q[state][action]) / N[state][action]
            old_policy = policy[state]
            policy[state] = max(Q[state], key=Q[state].get)

100%|██████████| 100000/100000 [02:20<00:00, 711.46it/s]


In [5]:
state = track.random_start_state()
track.print_state(state)
track.no_change_prob = 0.1

for i in range(1000):
    track.print_state(state)
    action = policy[state]
    print(action)
    state, terminal = track.step(state, action)
    if terminal:
        print(f"Finished in {i} steps")
        break

XXXX..............F
XX................F
XX................F
X.................F
..................F
..................F
..........XXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XXXX.....XXXXXXXXXX
XXXXSSSCSXXXXXXXXXX
Velocity: (0, 0)
XXXX..............F
XX................F
XX................F
X.................F
..................F
..................F
..........XXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XXXX.....XXXXXXXXXX
XXXXSSSCSXXXXXXXXXX
Velocity: (0, 0)
Action(dx=1, dy=1)
XXXX..............F
XX................F
XX................F
X......

### On-policy first-visit MC control (for $\varepsilon$-soft policies)

In [33]:
class Policy:
    def __init__(self, env: TrackEnv):
        self.env = env
        
    def probabilities(self, state):
        pass
    
    def stochastic_action(self, state):
        pass
    
    def deterministic_action(self, state):
        pass

    def probability(self, state, action):
        pass
    
    def improve(self, state, optimal_action):
        pass


class EpislonGreedyPolicy(Policy):
    def __init__(self, env: TrackEnv, epsilon=0.5, greedy_policy=None):
        self.env = env
        self.epislon = epsilon
        self.states = env.non_terminal_states()
        self.actions = {state: env.possible_actions(state) for state in self.states}

        self._probabilities = {state: {action: self.epislon/len(self.actions[state]) for action in self.actions[state]} for state in self.states}

        if greedy_policy is not None:
            self.deterministic_actions = {state: greedy_policy.deterministic_action(state) for state in self.states}
        else:
            self.deterministic_actions = {state: random.choice(self.actions[state]) for state in self.states}
        
        for state in self.states:
            self._probabilities[state][self.deterministic_actions[state]] += 1 - self.epislon

    def probabilities(self, state):
        return self._probabilities[state]
    
    def stochastic_action(self, state):
        return random.choices(self.actions[state], weights=[self._probabilities[state][action] for action in self.actions[state]])[0]
    
    def deterministic_action(self, state):
        return self.deterministic_actions[state]
    
    def probability(self, state, action):
        return self._probabilities[state][action]
    
    def improve(self, state, optimal_action):
        self.deterministic_actions[state] = optimal_action
        self._probabilities[state] = {action: self.epislon/len(self.actions[state]) for action in self.actions[state]}
        self._probabilities[state][optimal_action] += 1 - self.epislon


class GreedyPolicy(Policy):
    def __init__(self, env: TrackEnv):
        self.env = env
        self.states = env.non_terminal_states()
        self.actions = {state: env.possible_actions(state) for state in self.states}
        self.deterministic_actions = {state: random.choice(self.actions[state]) for state in self.states}
    
    def probabilities(self, state):
        return {action: 1 for action in self.actions[state]}
    
    def stochastic_action(self, state):
        return self.deterministic_actions[state]
    
    def deterministic_action(self, state):
        return self.deterministic_actions[state]
    
    def probability(self, state, action):
        if action == self.deterministic_actions[state]:
            return 1
        return 0
    
    def improve(self, state, optimal_action):
        self.deterministic_actions[state] = optimal_action

In [26]:
def generate_episode(env, policy: EpislonGreedyPolicy, max_steps=300, start_state=None, start_action=None, const_reward = -1):
    if start_state is not None:
        state = start_state
    else:
        state = env.random_start_state()
    episode = []

    if start_action is not None:
        action = start_action
    else:
        action = policy.stochastic_action(state)

    for i in range(max_steps):
        next_state, terminal = env.step(state, action)
        episode.append((state, action, const_reward))
        
        if terminal:
            break
        state = next_state

        action = policy.stochastic_action(state)
        
    return episode, terminal

In [8]:
epsilon = 0.2
n_iterations = 1000
track = TrackEnv('./tracks/track.txt')

policy = EpislonGreedyPolicy(track, epsilon=epsilon)
Q = {state: {action: 0 for action in track.possible_actions(state)} for state in states}
N = {state: {action: 0 for action in track.possible_actions(state)} for state in states}


for i in tqdm(range(n_iterations)):
    episode, terminal = generate_episode(track, policy, max_steps=1000)

    if terminal:
        episode_return = 0
    else:
        episode_return = -1000

    for i, (state, action, reward) in list(enumerate(episode))[::-1]:
        if (state, action) not in [(s, a) for s, a, _ in episode[:i]]:
            episode_return += reward
            
            N[state][action] += 1
            Q[state][action] += (episode_return - Q[state][action]) / N[state][action]
            
            optimal_action = max(Q[state], key=Q[state].get)
            policy.improve(state, optimal_action)

100%|██████████| 1000/1000 [00:15<00:00, 63.51it/s]


In [20]:
state = track.random_start_state()
track.print_state(state)
track.no_change_prob = 0.0

for i in range(1000):
    track.print_state(state)
    action = policy.deterministic_action(state)
    print(action)
    state, terminal = track.step(state, action)
    if terminal:
        print(f"Finished in {i} steps")
        break

XXXX..............F
XX................F
XX................F
X.................F
..................F
..................F
..........XXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XXXX.....XXXXXXXXXX
XXXXCSSSSXXXXXXXXXX
Velocity: (0, 0)
XXXX..............F
XX................F
XX................F
X.................F
..................F
..................F
..........XXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XXXX.....XXXXXXXXXX
XXXXCSSSSXXXXXXXXXX
Velocity: (0, 0)
Action(dx=1, dy=1)
XXXX..............F
XX................F
XX................F
X......

### Off-policy MC control

In [None]:
policy = GreedyPolicy(track)
Q = {state: {action: 0 for action in track.possible_actions(state)} for state in states}
N = {state: {action: 0 for action in track.possible_actions(state)} for state in states}
C = {state: {action: 0 for action in track.possible_actions(state)} for state in states}

In [None]:
epsilon = 0.2
n_iterations = 1000
track = TrackEnv('./tracks/track.txt')

for i in tqdm(range(n_iterations)):
    behavior_policy = EpislonGreedyPolicy(track, epsilon=epsilon, greedy_policy=policy)
    episode, terminal = generate_episode(track, behavior_policy, max_steps=50)

    episode_return = 0

    W = 1

    for i, (state, action, reward) in list(enumerate(episode))[::-1]:
        episode_return += reward
        C[state][action] += W
        Q[state][action] += W / C[state][action] * (episode_return - Q[state][action])
        
        optimal_action = max(Q[state], key=Q[state].get)
        policy.improve(state, optimal_action)

        if action != optimal_action:
            break

        W *= 1 / behavior_policy.probability(state, action)

100%|██████████| 1000/1000 [00:45<00:00, 22.20it/s]


In [219]:
state = track.random_start_state()
track.print_state(state)
track.no_change_prob = 0

for i in range(1000):
    track.print_state(state)
    action = policy.deterministic_action(state)
    print(action)
    state, terminal = track.step(state, action)
    if terminal:
        print(f"Finished in {i} steps")
        break

XXXX..............F
XX................F
XX................F
X.................F
..................F
..................F
..........XXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XXXX.....XXXXXXXXXX
XXXXCSSSSXXXXXXXXXX
Velocity: (0, 0)
XXXX..............F
XX................F
XX................F
X.................F
..................F
..................F
..........XXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
.........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
X........XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XX.......XXXXXXXXXX
XXXX.....XXXXXXXXXX
XXXXCSSSSXXXXXXXXXX
Velocity: (0, 0)
Action(dx=0, dy=1)
XXXX..............F
XX................F
XX................F
X......