# Problem

Let us look at a problem that is a simple finite Markov Decision Process. 
Consider the following 5 x 5 grid:

![Gridworld](imgs/Gridworld.png)

## Define the problem

- The cells of the grid correspond to the the states of the environment. 
- At each cell, 4 actions are possible: _north_, _south_, _east_, _west_ - which deterministically cause the agent to move 1 cell in the respective direction on the grid.
- Actions that would the agent off the grid leave its location unchanges, but also result in a reward of -1.
- From state A, any action yields a reward of +10 and takes the agent to A'.
- From state B, any action yields a reward of +5 and takes the agent to B'.
- All other actions result in a reward of 0.

Find the Optimal Value Function using Q-Learning.

In [None]:
@dataclass
class Grid:
    x_min: int
    x_max: int
    y_min: int
    y_max: int
    x_size: int
    y_size: int

@dataclass
class Coordinate:
    x: int
    y: int

@dataclass
class Direction:
    x: int
    y: int

In [None]:
def _calc_reward(loc: Coordinate, grid: Grid, A, B) -> int:
    """
    This function takes the location of a state and returns the reward for it.

    Args:
        loc: 

    """
    if loc == A: #Reward for position A
        reward = 10
    elif loc == B: #Reward for position B
        reward = 5
    elif loc.x < grid.x_min or loc.x > grid.x_max or loc.y < grid.y_min or loc.y > grid.y_max: #Negative reward outside grid
        reward = -1
    else: # Reward for all other states
        reward = 0
    return reward

In [None]:
def is_valid_loc(loc: Coordinate, grid: Grid) -> bool:
    if loc.x < grid.x_min or loc.x > grid.x_max or loc.y < grid.y_min or loc.y > grid.y_max:
        return False
    else:
        return True

In [None]:
def move(loc: Coordinate, action: int) -> (tuple, float):
    loc = Coordinate(loc[0], loc[1]) #Convert to Coordinate object for convenience
    A = Coordinate(1, 0) # Define special state A
    B = Coordinate(3, 0) # Define special state B
    Ap = Coordinate(1, 4) # Define special state A prime
    Bp = Coordinate(3, 2) # Define special state B prime
    reward = _calc_reward(loc, grid, A, B)
    if loc == A:
        proposed_loc = Ap
    elif loc == B:
        proposed_loc = Bp
    else:
        if action == 0:
            proposed_loc = Coordinate(loc.x, loc.y - 1)
        elif action == 1:
            proposed_loc = Coordinate(loc.x, loc.y + 1)
        elif action == 2:
            proposed_loc = Coordinate(loc.x + 1, loc.y)
        elif action == 3:
            proposed_loc = Coordinate(loc.x - 1, loc.y)

    if is_valid_loc(proposed_loc, grid):
        next_loc = proposed_loc
    else:
        next_loc = loc
    
    next_loc = tuple((next_loc.x, next_loc.y)) #Convert to tuple object for dict
    return next_loc, reward

In [None]:
def initialise_q(grid: Grid, actions: list) -> dict:
    q = {}
    for x in range(grid.x_size):
        for y in range(grid.y_size):
            q[(x, y)] = {}
            for action in actions:
                q[(x, y)][action] = 0.
    return q

In [None]:
def choose_action(eta: float, q: dict, state: tuple, actions: list) -> int:
    next_action_choices = ["random", "argmax"]
    choice = np.random.choice(next_action_choices, 1, p=[eta, 1-eta])
    if choice == "random":
        action = np.random.choice(actions, 1)[0]
    elif choice == "argmax":
        action = np.argmax(list(q[state].values()))
    return action

In [None]:
def get_xy_direction(arrow):
    if arrow == 0:
        direction = Direction(0, -1)
    elif arrow == 1:
        direction = Direction(0, 1)
    elif arrow == 2:
        direction = Direction(1, 0)
    elif arrow == 3:
        direction = Direction(-1, 0)
    return direction

In [None]:
def get_q_arr(q_star):
    q_star_arr = np.zeros((5, 5))
    states = list(q_star.keys())
    for state in states:
        directions = list(q_star[state].keys())
        for direction in directions:
            q_val = np.round(q_star[state][direction])
            q_star_arr[state[1]][state[0]] = q_val
    return q_star_arr

In [None]:
def q_learning(grid: Grid, actions: list, alpha: float, eta: float, gamma: float, episodes: int, N: int) -> dict:
    q = initialise_q(grid, actions) # Initialize q_table 
    states = list(q.keys())
    for i in range(episodes):
        state = random.choice(states) # Initialise state with random starting point
        for j in range(N):
            action = choose_action(eta, q, state, actions)
            next_state, reward = move(state, action)
            next_action = np.argmax(list(q[next_state].values()))
            q[state][action] = q[state][action] + alpha * (reward + gamma * (q[next_state][next_action]) - q[state][action])
            state = next_state
    return q

In [None]:
def plot_gridworld_results(q_star, q_star_arr):
    fig, axs = plt.subplots(1, 2)
    fig.set_size_inches(15, 7)
    fig.suptitle(fr"Gridworld states and directions, using Q-Learning")
    fig.supxlabel("x")
    fig.supylabel("y")
    axs[0].set_title(fr"$q_*$")
    axs[1].set_title(fr"$\pi_* directions$")
    sns.heatmap(q_star_arr, cmap='Spectral', annot=True, ax=axs[0])
    plt.imshow(q_star_arr, cmap='Spectral')
    states = list(q_star.keys())
    for state in states:
        arrows = np.argwhere(list(q_star[state].values()) == np.amax(list(q_star[state].values()))).flatten().tolist()
        for arrow in arrows:
            direction = get_xy_direction(arrow)
            plt.quiver(state[0], state[1], direction.x, -direction.y)
    plt.show()

In [None]:
eta = 0.1
alpha = 0.2
gamma = 0.9
episodes = 1000
N = 1000
actions = [0, 1, 2, 3] #Define actions: North, South, East, West
grid = Grid(x_min=0, x_max=4, y_min=0, y_max=4, x_size=5, y_size=5) #Define a 5 x 5 grid.

In [None]:
q_star = q_learning(grid, actions, alpha, eta, gamma, episodes, N)
q_star_arr = get_q_arr(q_star)
plot_gridworld_results(q_star, q_star_arr)