In [None]:
import numpy as np
import scipy as sp
import scipy.ndimage
import copy
import seaborn as sns
import time
import matplotlib.pyplot as plt
from mpmath import mpf, mpc, mp, log

# S&B 4.9 - Gambler's Problem

For reference the update rule for value iteration is:

$$ V_{k+1}(s) = \max_{a} \sum_{s', r} p(s', r| s, a) \left[ r + \gamma \cdot V_k(s') \right]$$

We ran the simulation and found that:

+ For $p_h = 0.45$ we replicate the results in the book. If we wanted to replicate the policy graph on the curve we would need to account for numerical error and simply take the **minimum** argmax of the Q function. The argmax should in theory return many different possibilities, and the minimum of these would produce the policy in the book. However, as mentioned in the book there are multiple "optimality" contours per se in the value function.

    + Some of the contours focus on gambling the minimal amount that will get us to the next quartile (similar to what we see in the book).
    
    + Other contours focus on gambling **everything**. This strategy makes sense because it minimizes the number of coin flips we make. 
    
    + From the graphs of $Q(s,a)$ and $Q(a|s)$ we can see that these optimal strategies form a set of diamonds, with up and to the right edges of the diamonds representing "betting it all trajectories" and up and to the left edges representing "betting just enough to get to the next quartile" strategies.
    
    
+ For $p_h = 0.5$ the value function is basically $V(s) = s/100$ and when $p_h > 0.5$ as $p_h$ increases the value function gets closer and closer to a square (meaning the likelihood of winning gets closer and closer to 1 for low $s$ states).


+ As $p_h$ gets smaller and smaller the kinks in the value function get more and more extreme and defined. In the limit of extremely small $p_h$ it becomes essentially a step function. The step jumps occur when we have enough "coins" so that we reduce the integral number of flips needed to win. The heights of the steps correlate with $p_h^n$ where $n$ is the number of consecutive flips needed to win.

We'll leave our analysis at that.

In [None]:
mp.dps = 200

win_threshold = 100
p_heads = mpf(1)*4/10
log_theta = -200
V = np.array([mpf(0) for i in range(win_threshold + 1)])
pi = np.zeros(win_threshold + 1)
Q = np.array([[mpf(0) for i in range(win_threshold + 1)] for j in range(win_threshold + 1)])

def get_states_and_rewards(s):
    actions = np.arange(1, s + 1, dtype=int)
    win_cond = (s + actions) >= win_threshold
    lose_cond = (s - actions) <= 0
    reward = np.array([mpf(int(cond)) for cond in win_cond])
    winning_states = np.where(win_cond, win_threshold, s + actions)
    losing_states = np.where(lose_cond, 0, s - actions)
    
    return actions, reward, winning_states, losing_states

counter = 0
t_i = time.time()

while True:
    log_delta = -mp.inf
    for s in range(1, win_threshold):
        actions, reward, winning_states, losing_states = get_states_and_rewards(s)
        v = V[s]
        V[s] = np.max(p_heads * (reward + V[winning_states]) + (1 - p_heads) * (V[losing_states]))
        
        log_delta = max(log_delta, log(abs(V[s] - v)))
        
    counter += 1
    
    if counter % 10 == 0:
        t_f = time.time()
        delta_t = round(t_f - t_i, 2)
        print("Iteration #{} complete in {} s. Current Log Delta: {} ".format(counter, delta_t, log_delta))
        t_i = t_f
    if log_delta < log_theta:
        for s in range(1, win_threshold):
            actions, reward, winning_states, losing_states = get_states_and_rewards(s)
            Q_sa = p_heads * (reward + V[winning_states]) + (1 - p_heads) * (V[losing_states])
            
            for action, q_value in zip(actions, Q_sa):
                Q[s, action] = q_value
            
            pi[s] = np.argmax(Q_sa) 
        
        print("Optimization completed after {} iterations".format(counter))
        break

In [None]:
fig = plt.figure(figsize=(20,10))

ax0 = fig.add_subplot(121)
plt.plot(V[1:100])
plt.xlabel("Current Stash Amount", size=20)
plt.xticks(size=15)
plt.ylabel("State Value", size=20)
plt.yticks(size=15)
ax1 = fig.add_subplot(122)
plt.bar(x=np.arange(1,100),height=pi[1:100])
plt.xlabel("Current Stash Amount", size=20)
plt.xticks(size=15)
plt.ylabel("Wager Amount", size=20)
plt.yticks(size=15)
plt.tight_layout()

In [None]:
fig, ax = plt.subplots(figsize=(15,10)) 
mp_to_float = np.vectorize(float)
sns.heatmap(mp_to_float(Q.T))
plt.title("Action Value Function - Q(s,a)", size=30)
plt.xlabel("s", size=20)
plt.ylabel("a", size=20)
plt.tight_layout()
plt.show()

In [None]:
fig = plt.figure(figsize=(20,20))

q_max = np.max(Q[s][0:25])
max_list = []

for s in np.arange(0, 25):
    plt.plot(Q[s][0:25], linewidth=2)
    q_smax = np.max((mp_to_float(Q[s])))
    plt.hlines(q_smax, 0, 25, linestyles='dashed')
    max_list.append(q_smax)
    plt.vlines(s, 0, q_max, linestyles='dashed')
    
plt.plot(max_list, color='black', linewidth=5)
plt.plot(np.arange(25, 0, -1), max_list, color='black', linewidth=5)

plt.legend(np.arange(0, 25))
plt.tight_layout()

In [None]:
fig = plt.figure(figsize=(20,20))

q_max = np.max(Q[s][1:51])
max_list = []

for s in np.arange(0, 51):
    plt.plot(range(0,51), Q[s][0:51], linewidth=2)
    q_smax = np.max((mp_to_float(Q[s])))
    plt.hlines(q_smax, 0, 51, linestyles='dashed')
    max_list.append(q_smax)
    plt.vlines(s, 0, q_max, linestyles='dashed')
    
#plt.plot(max_list, color='black', linewidth=5)
#plt.plot(np.arange(50, 25, -1), max_list, color='black', linewidth=5)

plt.legend(np.arange(0, 51))
plt.tight_layout()

# S&B 5.12 - Racetrack

## I. Track Generation

The first thing is that we need to create a racetrack. The simplest representation for the racetrack is simply an $N \times N$ matrix in which an entry is 1 if it's outside the track and zero otherwise. We generate tracks as follows:

+ We initialize the track to an $N \times N$ grid where every square in the grid is on the track.
+ We take out rectangles from the upper left and lower right of the track. The sizes of the rectangles are randomly sampled, with the length of each dimension $L$ being distributed as $L \sim RandInt[Floor(N/4), \ Floor(N/2)]$. Taking out these rectangles creates the "right" turn at the end requested by the authors, it also creates left turn for fun.
+ We then iterate through each cell on the grid, check if it's outside the grid, and if it is we flip it to being on the track with probability equal to the percentage of its neighbors on the track. This (approximately) guarantees that only cells at the edge of the track get moved into the track.
+ This iteration is repeated some number of times so we get some interesting shapes due to the randomness of the sampling. We do smoothing after the final iteration to attempt to eliminate blocks that are technically "on the track" but are inaccessible due to being surrounded by blocks "off the track".

Here is the code -- if you run it you can see we get some nice tracks with semi-smooth curves.

In [None]:
N = 25
repeats = 4
filter_threshold = 0.2
filter_size = 3

base_track = np.zeros((N, N), dtype=float)

upper_left_rect = np.random.randint(int(N/4), int(3*N/4)), np.random.randint(N - int(3*N/4), N - int(N/4))
lower_right_rect = np.random.randint(int(N/4), int(3*N/4)), np.random.randint(int(N/4), int(3*N/4))

for i in range(N):
    for j in range(N):
        if i < upper_left_rect[0] and j > upper_left_rect[1]:
            base_track[i, j] = 1
            
        if i > N - lower_right_rect[0] and j < lower_right_rect[1]:
            base_track[i, j] = 1

for repeat in range(repeats):
    for i in range(N):
        for j in range(N):
            if base_track[i, j] == 1:
                neighs = np.array([base_track[i + n, j + m] for n in np.arange(-1, 2) for m in np.arange(-1, 2)
                                   if 0 <= i + n < N and 0 <= j + m < N])

                neighs_sum = np.sum(neighs)
                is_flipped = np.random.binomial(1, 1 - np.mean(neighs))

                if is_flipped:
                    base_track[i, j] = 0
                    
track = (scipy.ndimage.filters.uniform_filter(base_track, filter_size) > filter_threshold)

fig, ax = plt.subplots(figsize=(15,10)) 

sns.heatmap(track.T, ax=ax)
plt.xlabel("X coordinate", size=20)
plt.ylabel("Y coordinate", size=20)
plt.tight_layout()
plt.show()

Note that the indices of the track array correspond to the x and y coordinates respectively. We only transpose when plotting so that the plot aligns with our intuition.

## II. Trajectory Sampling

First let's formulate the problem concretely. The state space is the set of possible x and y coordinates and x and y velocities. Let $X_N$ be the set of all coordinates in the grid, $X_{off}$ be the set of all off-track coordinates, $X_S$ the set of start coordinates, and $V$ be the set of all possible velocities. Then $X_N = [0, N] \times [0, N]$ where these are integer intervals, and the set of possible spatial states is the quotient $X_N - X_{off}$ since all off-track coordinates transition you to a start state. Also we have that $V = [0, v_{x, max}] \times [0, v_{y, max}]$ where $v_{i, max} = 5$ in our instance of the problem. Finally the actual set of possible states is:

$$ \mathscr{S} = \left( X_S \times \{ \vec{0} \} \right) \ \bigcup \ \left( X_N - X_{off} - X_S \right) \times \left( V -  \{ \vec{0} \} \right) $$

Let us denote a particular state in terms of the spatial and velocity coordinates, $\left( \vec{x}, \vec{v} \right) \in \mathscr{S}$. Then if $\vec{v} = (v_x, v_y)$ and $H$ is the Heaviside step function with the convention that $H(0) = 1$:

$$ \mathscr{A}(\vec{x}, \vec{v}) = \left\{ -H(v_y - 1), 0, 1 -  H(v_x - v_{x,max}) \right\} \times \left\{-H(v_y - 1),0, 1 - H(v_y - v_{y,max}) \right\},  \forall \left( \vec{x}, \vec{v} \right) \in \mathscr{S} $$

Note that here we rely on the convention that duplicate values in a set are not distinct. Finally, if $\vec{a} \in \mathscr{A}(\vec{x}, \vec{v})$ is the action vector then the environment acts as:

$$ 
\begin{equation}
  (\vec{X}', \vec{V}') =\begin{cases}
    (\vec{X} + \vec{V} + \vec{A}, \vec{V} + \vec{A}), & \text{with probability } 1 - \delta\\
    (\vec{X} + \vec{V}, \vec{V}), & \text{with probability } \delta
  \end{cases}
\end{equation}
$$

In our particular case $\delta=0.1$. Note that we are choosing to have the action $\vec{A}$ apply immediately on the current state, so that it determines the next state if successful. Note that this equation only applies exactly if the newly calculated state is on the track -- otherwise the new state is the starting state.

The task is to determine the optimal policy from each starting state. This means we start each episode always from the starting states and simulate our trajectories in this manner.

In [None]:
vx_max = 5
vy_max = 5
v_max = [vx_max, vy_max]

In [None]:
def get_start_states(track):
    N = len(track)
    x_start = []
    for x in range(N):
        if track[x, 0] == 0:
            x_start.append(np.array([x, 0]))
            
    return np.array(x_start)
    
def get_end_states(track):
    N = len(track)
    y_end = []
    for y in range(N):
        if track[N - 1, y] == 0:
            y_end.append(np.array([N - 1, y]))
            
    return np.array(y_end)

def valid_actions(v_x, v_y, v_max):
    all_actions = [(i, j) for i in range(-1, 2) for j in range(-1, 2)]
    actions = [(i, j) for i, j in all_actions 
               if (0 <= v_x + i <= v_max[0]) and (0 <= v_y + j <= v_max[1])
                   and  (v_x + i, v_y + j) != (0, 0)]
    return actions
    

class PolicyClass():
    
    def __init__(self, track, epsilon, v_max=v_max):
        """
        We represent the policy as a dictionary indexed by
        (valid) coordinate tuples. Each value of the dictionary
        is a dictionary whose keys correspond to an allowed velociy 
        state. 
        
        Thus self.policy[(x, y)][(v_x, v_y)] returns the policy [a_x, a_y]
        for some state with a caveat.
        """
        self.start_states = get_start_states(track).tolist()
        self.epsilon = epsilon
        N = len(track)
        self.policy = {}
        self.v_max = v_max

        for i in range(N):
            for j in range(N):
                if track[i, j] == 0 and [i, j] not in self.start_states:
                    self.policy[(i, j)] = {}
                    
                    for v_x in range(0, v_max[0] + 1):
                        for v_y in range(0, v_max[1] + 1):
                            if v_x == 0 and v_y == 0:
                                continue 
                                
                            if v_x == v_max[0] and v_y != v_max[1]:
                                self.policy[(i, j)][(v_x, v_y)] = np.array([0 , 1])
                            elif v_x != v_max[0] and v_y == v_max[1]:
                                self.policy[(i, j)][(v_x, v_y)] = np.array([1 , 0])
                            elif v_x == v_max[0] and v_y == v_max[1]:
                                self.policy[(i, j)][(v_x, v_y)] = np.array([0 , 0])
                            else:
                                self.policy[(i, j)][(v_x, v_y)] = np.array([1 , 1])

                elif track[i, j] == 0 and [i, j] in self.start_states:
                    self.policy[(i, j)] = {(0,0): np.array([1, 1])}
                else:
                    continue
                    
    def update_action(self, new_a_x, new_a_y, x, y, v_x, v_y):
        if new_a_x not in [-1, 0, 1] or new_a_y not in [-1, 0, 1]:
            raise ValueError("a_x and a_y must be one of `[-1, 0, 1]`")
        
        if v_x == v_max[0] and new_a_x == 1:
            raise ValueError("a_x cannot be 1 when v_x is already at its maximum value")
            
        if v_y == v_max[1] and new_a_y == 1:
            raise ValueError("a_y cannot be 1 when v_y is already at its maximum value")
            
        if v_x == 0 and new_a_x == -1:
            raise ValueError("a_x cannot be -1 when v_x is already 0")
            
        if v_y == 0 and new_a_y == -1:
            raise ValueError("a_y cannot be -1 when v_y is already 0")
            
        if [x,y] in self.start_states and [v_x, v_y] != [0, 0]:
            raise ValueError("The velocity at a start location must be zero")
            
        self.policy[(x, y)][(v_x, v_y)] = np.array([new_a_x, new_a_y], dtype=int)
            
    def get_action(self, x, y, v_x, v_y):
        return self.policy[(x, y)][(v_x, v_y)]
        
    def sample(self, x, y, v_x, v_y):
        """
        Note the policy is actually epsilon greedy, but we 
        keep track of the deterministic part of it
        """
        if [x, y] not in self.start_states:
            actions = valid_actions(v_x, v_y, v_max)
        else:
            # Any starting velocity MUST leave the start line, just makes things simpler
            actions = [(i,j) for i in [0, 1] for j in [1]]
        
        num_actions = len(actions)
        
        is_greedy = np.random.binomial(1, 1 - self.epsilon + self.epsilon/num_actions)

        if is_greedy:
            return self.get_action(x, y, v_x, v_y)
        else:
            action_index = np.random.randint(num_actions)
            
            a_x = actions[action_index][0]
            a_y = actions[action_index][1]
            return np.array([a_x, a_y])
        
class QClass():
    
    def __init__(self, track, v_max=v_max, init=0.0):
        """
        We represent the action-value function as a dictionary indexed by
        (valid) coordinate tuples. Each value of the dictionary
        is a dictionary with keys corresponding to a combination
        of allowed velociy states. The values of the velocity dictionary 
        are in turn dictionaries whose keys are valid action tuples
        
        Thus self.Q[(x, y)][(v_x, v_y)][(a_x, a_y)] returns the value
        Q(s=(x,y,v_x,v_y), a=(a_x, a_y)) for some state and action combination.
        
        We also keep track of counts in a similar tensor so we can calcualte the
        MC average incrementally.
        """
        
        self.start_states = get_start_states(track).tolist()
        N = len(track)
        self.Q = {}
        self.counts = {}
        self.v_max = v_max

        for i in range(N):
            for j in range(N):
                if track[i, j] == 0 and [i, j] not in self.start_states:
                    self.Q[(i, j)] = {}
                    self.counts[(i, j)] = {}
                    
                    for v_x in range(0, v_max[0] + 1):
                        for v_y in range(0, v_max[1] + 1):
                            
                            if v_x == 0 and v_y == 0:
                                continue
                            
                            actions = valid_actions(v_x, v_y, v_max)
                                
                            self.Q[(i, j)][(v_x, v_y)] = {a: init for a in actions}
                            self.counts[(i, j)][(v_x, v_y)] = {a: 0 for a in actions}
                    
                elif track[i, j] == 0 and [i, j] in self.start_states:
                    # Any starting velocity MUST leave the start line, just makes things simpler
                    start_actions = [(0, 1), (1, 1)]
                    self.Q[(i, j)] = {(0,0): {a: init for a in start_actions}}
                    self.counts[(i, j)] = {(0,0): {a: 0 for a in start_actions}}
                else:
                    continue
                    
    def update(self, G, x, y, v_x, v_y, a_x, a_y):
        n = self.get_n(x, y, v_x, v_y, a_x, a_y)
        q = self.get_q(x, y, v_x, v_y, a_x, a_y)
        
        self.Q[(x, y)][(v_x, v_y)][(a_x, a_y)] = q + (1/(n + 1))*(G - q)    
        self.counts[(x, y)][(v_x, v_y)][(a_x, a_y)] = n + 1
        
    def get_q_s(self, x, y, v_x, v_y):
        return self.Q[(x, y)][(v_x, v_y)]
        
    def get_q(self, x, y, v_x, v_y, a_x, a_y):
        return self.get_q_s(x, y, v_x, v_y)[(a_x, a_y)]
            
    def get_n(self, x, y, v_x, v_y, a_x, a_y):
        return self.counts[(x, y)][(v_x, v_y)][(a_x, a_y)]
        
    def get_max_action(self, x, y, v_x, v_y):
        q_s = self.get_q_s(x, y, v_x, v_y)
        max_a = None
        max_q_s = -np.inf
        
        for a, q in q_s.items():
            if q > max_q_s:
                max_q_s = q
                max_a = a
            
        return np.array(max_a)    

def generate_trajectory(pi, track, delta):
    """
    Samples from pi to generate a simulated trajectory
    Handles the logic for determining state transitions
    including going off track, enviroment noise and episode 
    termination. 
    
    The finish state is defined as crossing the finishing line
    WITHOUT going above or below it.
    """
    start_index = np.random.randint(len(pi.start_states))
    X, V = np.array(copy.deepcopy(pi.start_states[start_index])), np.array([0, 0])
    end_states = get_end_states(track)
    end_x = end_states[0][0]
    end_y_range = (end_states[0][1], end_states[len(end_states) - 1][1])
    
    
    trajectory = []
    
    while True:
        try: 
            A = pi.sample(X[0], X[1], V[0], V[1])
        except:
            print("SAMPLING ERROR")
            print(trajectory)
            print(X, V)
            raise
        
        trajectory.append((copy.deepcopy(X), copy.deepcopy(V), copy.deepcopy(A)))
        
        is_noise = np.random.binomial(1, delta)
        if is_noise:
            pass
        else:
            V += A
        
        X += V
        
        try:
            is_on_track = not track[X[0], X[1]]
            finish_state = False
        except IndexError:
            is_on_track = False
            finish_state = X[0] > end_x and (end_y_range[0] <= X[1] <= end_y_range[1])
            
        if not is_on_track and not finish_state:
            start_index = np.random.randint(len(pi.start_states))
            X, V = np.array(copy.deepcopy(pi.start_states[start_index])), np.array([0, 0])
        elif not is_on_track and finish_state:
            return trajectory
        elif is_on_track and not finish_state:
            continue
        elif is_on_track and finish_state:
            raise ValueError("Agent should never be on track and in a finish state")
            
def update_q_and_pi(trajectory, Q, pi):
    """                          
    Recursively calculates G based on the trajectory
    Uses G to update Q, and then Q to update pi.
    
    Note that our task is undiscounted so we don't
    reference gamma. Also the reward is always -1 
    for every step 
    """             
    trajectory.reverse()
    G = 0
                              
    for X, V, A in trajectory:
        G = G - 1
        try: 
            Q.update(G, X[0], X[1], V[0], V[1], A[0], A[1])
        except KeyError:
            print("Q UPDATE ERROR")
            print(trajectory)
            print(X, V, A)
            raise
        new_A = Q.get_max_action(X[0], X[1], V[0], V[1])
        pi.update_action(new_A[0], new_A[1], X[0], X[1], V[0], V[1])

def on_policy_mc_control(track, N, epsilon, delta, v_max=v_max, init=0.0, update_iter=1000):
    """Initialization is handled internally by each class"""
    pi = PolicyClass(track, epsilon, v_max=v_max)
    Q = QClass(track, v_max=v_max, init=0.0)
    
    t_i = time.time()
    total_time = 0
    
    for iteration in range(N):
        if (iteration + 1) % update_iter == 0:
            t_f = time.time()
            print("Iterations {} through {} complete: {} secs".format(iteration + 1 - update_iter, 
                                                                      iteration + 1, 
                                                                      round((t_f - t_i), 2)))
            total_time += (t_f - t_i)
            t_i = time.time()
            
        trajectory = generate_trajectory(pi, track, delta)
        update_q_and_pi(trajectory, Q, pi)
    
    print("Total Running Time: {} mins".format(round(total_time/60, 2)))
        
    return Q, pi

In [None]:
Q, pi = on_policy_mc_control(track, 10**6, 0.1, 0.1, update_iter=5 * 10**4)

In [None]:
# FIXME -- Need better collision detection. Currently just checking
# if car lands on (not crosses) an off-track area. Also may be worth
# trying training with less simulations and having a termination 
# condition on the simulator just in case.
trajectory = generate_trajectory(pi, track, 0)

spatial_trajectory = [state[0].tolist() for state in trajectory]
velocities = [state[1].tolist() for state in trajectory]
accel = [state[2].tolist() for state in trajectory]

traj_steps = len(spatial_trajectory)
track_and_path = np.array(copy.deepcopy(track), dtype=float)
counter = 0

for i in range(len(track)):
    for j in range(len(track)):
        if [i, j] in spatial_trajectory:
            index = spatial_trajectory.index([i, j])
            track_and_path[i, j] = 0.25 + (0.75-0.25)/traj_steps * counter
            counter += 1
            
fig, ax = plt.subplots(figsize=(15,10)) 
sns.heatmap(track_and_path.T, ax=ax)

for i in range(len(track)):
    for j in range(len(track)):
        if [i, j] in spatial_trajectory:
            index = spatial_trajectory.index([i, j])
            try: 
                plt.arrow(i , j, 
                          velocities[index + 1][0], 
                          velocities[index + 1][1], 
                          width=0.1, head_width=0.5,
                          color="blue",alpha=0.9)
                plt.arrow(i , j, 
                          accel[index][0], 
                          accel[index][1], 
                          width=0.02, head_width=0.2,
                          color="mediumseagreen",alpha=0.9)
            except IndexError:
                continue
                
for i in range(len(track)):
    for j in range(len(track)):
        if [i, j] in get_start_states(track).tolist():
            try: 
                plt.arrow(i , j, 1, 0, 
                          width=0.5, head_width=0,
                          color="green",alpha=0.9)
            except IndexError:
                continue
                
for i in range(len(track)):
    for j in range(len(track)):
        if [i, j] in get_end_states(track).tolist():
            try: 
                plt.arrow(i + 1, j, 0, 1, 
                          width=0.5, head_width=0,
                          color="red",alpha=0.9)
            except IndexError:
                continue


plt.xlabel("X coordinate", size=20)
plt.ylabel("Y coordinate", size=20)
plt.tight_layout()
plt.show()