In [13]:
import numpy as np


np.random.seed(42)

# Grid size and actions
grid_size = 5
num_states = grid_size * grid_size
actions = ['up', 'down', 'left', 'right']
num_actions = len(actions)
action_indices = {a: i for i, a in enumerate(actions)}

# Discount factor for future rewards
gamma = 0.95

# Convert row and column to single index and back
def to_1d(row, col):
    return row * grid_size + col

def to_2d(state):
    return divmod(state, grid_size)

# Special square definitions
blue = to_1d(0, 1)
green = to_1d(0, 3)
red = to_1d(4, 1)
yellow = to_1d(2, 3)

# Move logic
def step(state, action):
    """
    Returns the next state and reward after taking an action.
    Handles jump squares and edge penalties.
    """
    row, col = to_2d(state)

    # Special jumps(SP)
    if state == blue:
        return red, 5.0
    if state == green:
        target = yellow if np.random.rand() < 0.5 else red
        return target, 2.5

    # Attempt normal moves
    if action == 'up':
        next_row, next_col = max(row - 1, 0), col
    elif action == 'down':
        next_row, next_col = min(row + 1, grid_size - 1), col
    elif action == 'left':
        next_row, next_col = row, max(col - 1, 0)
    elif action == 'right':
        next_row, next_col = row, min(col + 1, grid_size - 1)
    else:
        raise ValueError(f"Invalid action: {action}")

    # Check attempt off grid
    if (next_row == row and next_col == col) and state not in (red, yellow):
        return state, -0.5
    else:
        next_state = to_1d(next_row, next_col)
        return next_state, 0.0  # normal move



In [14]:
from numpy.linalg import solve

# Transition and reward setup under a random policy
p_pi = np.zeros((num_states, num_states))
r_pi = np.zeros(num_states)

# Equal chance for each action
action_probs = [0.25] * num_actions

for s in range(num_states):
    if s == blue:
        # Blue square: always jumps to red with reward +5
        p_pi[s, red] = 1.0
        r_pi[s] = 5.0

    elif s == green:
        # Green square: 5050 jump to red or yellow, reward is 2.5
        p_pi[s, red] = 0.5
        p_pi[s, yellow] = 0.5
        r_pi[s] = 2.5

    else:
        # Regular square: move in all directions equally
        for action in actions:
            next_state, reward = step(s, action)
            p_pi[s, next_state] += 0.25
            r_pi[s] += 0.25 * reward



In [15]:
from numpy.linalg import solve

identity = np.eye(num_states)

# Solve the equation to get the value of each state
a = identity - gamma * p_pi
v_pi = solve(a, r_pi)



In [16]:
def iterative_policy_evaluation(theta=1e-6, gamma=gamma, max_iterations=1000):
    """
    Repeatedly updates the value function using a uniform random policy.
    Stops when the values stop changing much (less than theta).
    """
    v = np.zeros(num_states)

    for i in range(max_iterations):
        delta = 0
        new_v = np.zeros_like(v)

        for s in range(num_states):
            if s == blue:
                # Blue always goes to red and gives +5
                new_v[s] = 5 + gamma * v[red]

            elif s == green:
                # Green jumps to red or yellow randomly
                new_v[s] = 2.5 + gamma * 0.5 * (v[red] + v[yellow])

            else:
                expected_value = 0
                for action in actions:
                    next_state, reward = step(s, action)
                    expected_value += 0.25 * (reward + gamma * v[next_state])
                new_v[s] = expected_value

            delta = max(delta, abs(new_v[s] - v[s]))

        v = new_v

        if delta < theta:
            print(f"Converged after {i + 1} iterations.")
            break

    return v



In [17]:
def print_value_matrix(matrix, title="Value Function"):
    print(f"\n{title}:\n")
    for r in range(grid_size):
        row = matrix[r * grid_size:(r + 1) * grid_size]
        print(" ".join(f"{val:6.2f}" for val in row))

In [18]:
v_iter = iterative_policy_evaluation()

print("Value function from iterative policy evaluation:\n")
print_value_matrix(v_iter)



Converged after 149 iterations.
Value function from iterative policy evaluation:


Value Function:

  2.07   4.51   2.39   2.36   0.73
  1.11   1.79   1.34   1.00   0.30
  0.23   0.56   0.46   0.21  -0.23
 -0.41  -0.12  -0.15  -0.35  -0.71
 -0.89  -0.51  -0.63  -0.84  -1.18


In [19]:
#1 part 2.1

In [20]:
def value_iteration(theta=1e-6, gamma=gamma, max_iterations=1000):
    """
    Runs value iteration to find the best value function and policy.
    Stops when updates get really small.
    """
    v = np.zeros(num_states)

    for i in range(max_iterations):
        delta = 0
        new_v = np.zeros_like(v)

        for s in range(num_states):
            if s == blue:
                new_v[s] = 5 + gamma * v[red]

            elif s == green:
                new_v[s] = 2.5 + gamma * 0.5 * (v[red] + v[yellow])

            else:
                q_values = []
                for action in actions:
                    next_state, reward = step(s, action)
                    q = reward + gamma * v[next_state]
                    q_values.append(q)
                new_v[s] = max(q_values)

            delta = max(delta, abs(new_v[s] - v[s]))

        v = new_v

        if delta < theta:
            print(f"Value iteration converged after {i + 1} iterations.")
            break

    # Build a policy that picks the best action in each state
    policy = np.full((num_states,), '', dtype=object)

    for s in range(num_states):
        if s in (blue, green):
            policy[s] = 'special'
        else:
            best_action = None
            best_value = float('-inf')

            for action in actions:
                next_state, reward = step(s, action)
                q = reward + gamma * v[next_state]

                if q > best_value:
                    best_value = q
                    best_action = action

            policy[s] = best_action

    return v, policy



In [21]:
v_opt, policy_opt = value_iteration()

print("Optimal value function:\n")
print_value_matrix(v_opt)

print("\nOptimal policy per state:\n")
policy_grid = np.empty((grid_size, grid_size), dtype=object)

for row in range(grid_size):
    for col in range(grid_size):
        state = to_1d(row, col)
        a = policy_opt[state]
        if a == 'special':
            policy_grid[row, col] = '**'
        else:
            policy_grid[row, col] = a[:2].upper()

print(policy_grid)


Value iteration converged after 302 iterations.
Optimal value function:


Value Function:

 21.00  22.10  21.00  19.60  18.62
 19.95  21.00  19.95  18.95  18.00
 18.95  19.95  18.95  18.00  17.10
 18.00  18.95  18.00  17.10  16.25
 17.10  18.00  17.10  16.25  15.43

Optimal policy per state:

[['RI' '**' 'LE' '**' 'LE']
 ['UP' 'UP' 'UP' 'LE' 'LE']
 ['UP' 'UP' 'UP' 'UP' 'UP']
 ['UP' 'UP' 'UP' 'UP' 'UP']
 ['UP' 'UP' 'UP' 'UP' 'UP']]


In [22]:
#1 part 2.2

In [23]:
def policy_iteration(gamma=gamma, theta=1e-6, max_iterations=1000):
    """
    Finds the best value function and policy using policy iteration.
    It starts with a random policy and keeps improving it.
    """
    # Initialise policy randomly
    policy = np.random.choice(actions, size=num_states)
    policy[blue] = 'special'
    policy[green] = 'special'

    v = np.zeros(num_states)

    for i in range(max_iterations):
        #  Evaluate the current policy
        while True:
            delta = 0
            new_v = np.zeros_like(v)

            for s in range(num_states):
                if policy[s] == 'special':
                    if s == blue:
                        new_v[s] = 5 + gamma * v[red]
                    elif s == green:
                        new_v[s] = 2.5 + gamma * 0.5 * (v[red] + v[yellow])
                else:
                    a = policy[s]
                    next_state, reward = step(s, a)
                    new_v[s] = reward + gamma * v[next_state]

                delta = max(delta, abs(new_v[s] - v[s]))

            v = new_v
            if delta < theta:
                break

        # Improve the policy
        policy_stable = True

        for s in range(num_states):
            if s in (blue, green):
                continue

            old_action = policy[s]
            best_action = None
            best_value = float('-inf')

            for a in actions:
                next_state, reward = step(s, a)
                q = reward + gamma * v[next_state]

                if q > best_value:
                    best_value = q
                    best_action = a

            policy[s] = best_action

            if old_action != best_action:
                policy_stable = False

        if policy_stable:
            print(f"Policy iteration converged after {i + 1} iterations.")
            break

    return v, policy


In [24]:
np.random.seed(42)

v_pi, policy_pi = policy_iteration()

print("Value function from policy iteration:\n")
print_value_matrix(v_pi)

print("\nOptimal policy from policy iteration:\n")

policy_grid = np.empty((grid_size, grid_size), dtype=object)

for row in range(grid_size):
    for col in range(grid_size):
        state = to_1d(row, col)
        action = policy_pi[state]
        if action == 'special':
            policy_grid[row, col] = "**"
        else:
            policy_grid[row, col] = action[:2].upper()

print(policy_grid)


Policy iteration converged after 5 iterations.
Value function from policy iteration:


Value Function:

 21.00  22.10  21.00  19.60  18.62
 19.95  21.00  19.95  18.95  18.00
 18.95  19.95  18.95  18.00  17.10
 18.00  18.95  18.00  17.10  16.25
 17.10  18.00  17.10  16.25  15.43

Optimal policy from policy iteration:

[['RI' 'SP' 'LE' 'SP' 'LE']
 ['UP' 'UP' 'UP' 'LE' 'LE']
 ['UP' 'UP' 'UP' 'UP' 'UP']
 ['UP' 'UP' 'UP' 'UP' 'UP']
 ['UP' 'UP' 'UP' 'UP' 'UP']]


#Section 2

In [25]:
import numpy as np
import random


np.random.seed(42)
random.seed(42)
# Grid setup
grid_size = 5
gamma = 0.95
actions = ['up', 'down', 'left', 'right']


action_to_delta = {
    'up': (-1, 0),
    'down': (1, 0),
    'left': (0, -1),
    'right': (0, 1)
}

# Functions to switch between 2D grid and 1D state
def to_1d(row, col):
    return row * grid_size + col

def to_2d(state):
    return divmod(state, grid_size)

# Special squares that cause jumps
blue = to_1d(0, 1)
green = to_1d(0, 4)

# Jump targets
red = to_1d(3, 2)
yellow = to_1d(4, 4)

# The episode ends if agent lands here
terminal_states = {
    to_1d(2, 0),
    to_1d(2, 4),
    to_1d(4, 0)
}


In [26]:
def step(state, action):
    """
    Takes an action from the current state.
    Returns the new state, the reward, and whether the episode is over.
    """
    if state in terminal_states:
        return state, 0, True

    row, col = to_2d(state)
    dr, dc = action_to_delta[action]
    new_row, new_col = row + dr, col + dc

    # If move goes off the grid, stay in place and get a penalty
    if new_row < 0 or new_row >= grid_size or new_col < 0 or new_col >= grid_size:
        return state, -0.5, False

    next_state = to_1d(new_row, new_col)

    if state == blue:
        return red, 5.0, False

    # Handle green jump
    if state == green:
        jump_target = red if random.random() < 0.5 else yellow
        return jump_target, 2.5, False

    # Check if the move landed in a terminal state
    if next_state in terminal_states:
        return next_state, -0.2, True

    # Normal move with small penalty
    return next_state, -0.2, False


In [27]:
def mc_control_es(num_episodes=1000, gamma=gamma, alpha=0.1, max_steps=100):
    """
    Monte Carlo control with exploring starts.
    Returns Q-values and the learned greedy policy.
    """
    # Set up Q-values and an initial random policy
    q = {s: {a: 0.0 for a in actions} for s in range(grid_size * grid_size)}
    policy = {s: random.choice(actions) for s in range(grid_size * grid_size)}

    for s in terminal_states:
        policy[s] = None

    for _ in range(num_episodes):
        # Start from a random non terminal state and action
        while True:
            state = random.randint(0, grid_size * grid_size - 1)
            if state not in terminal_states:
                break
        action = random.choice(actions)

        # Run an episode
        episode = []
        done = False
        steps = 0

        while not done and steps < max_steps:
            next_state, reward, done = step(state, action)
            episode.append((state, action, reward))

            if done:
                break

            action = policy[next_state]
            state = next_state
            steps += 1

        # Update Q-values using first visit MC
        G = 0
        visited = set()

        for t in reversed(range(len(episode))):
            s, a, r = episode[t]
            G = gamma * G + r

            if (s, a) not in visited:
                visited.add((s, a))

                # Update Q using incremental MC estimate
                q[s][a] += alpha * (G - q[s][a])

                # Update policy to pick the best action
                best_action = max(q[s], key=q[s].get)
                policy[s] = best_action

    return q, policy


In [28]:
np.random.seed(42)

In [29]:
# Run MC control with exploring starts
q_es, policy_es = mc_control_es()

# Set up matrices for value and policy
v_matrix = np.zeros((grid_size, grid_size), dtype=np.float32)
policy_matrix = np.empty((grid_size, grid_size), dtype=object)

for r in range(grid_size):
    for c in range(grid_size):
        s = to_1d(r, c)

        if s in terminal_states:
            v_matrix[r, c] = 0.0
            policy_matrix[r, c] = "TT"

        elif s == blue:
            # Value from jumping to red with +5
            v_matrix[r, c] = 5.0 + gamma * max(q_es[red].values())
            policy_matrix[r, c] = "SP" # Special jump

        elif s == green:
            # Use value implied by special green transition
            red_val = max(q_es[red].values())
            yellow_val = max(q_es[yellow].values())
            v_matrix[r, c] = 2.5 + gamma * 0.5 * (red_val + yellow_val)
            policy_matrix[r, c] = "SP"

        else:
            # Use the best Q-value and the corresponding action
            v_matrix[r, c] = max(q_es[s].values())
            policy_matrix[r, c] = policy_es[s][:2].upper()

v_matrix = np.round(v_matrix, 2)

print("\n State-Value Function Matrix:")
print(v_matrix)

print("\n Policy Matrix:")
print(policy_matrix)



 State-Value Function Matrix:
[[ 1.19  4.71 -0.34 -0.47  2.12]
 [-0.11 -0.38 -0.28 -0.38 -0.18]
 [ 0.   -0.2  -0.4  -0.2   0.  ]
 [-0.17 -0.28 -0.31 -0.39 -0.2 ]
 [ 0.   -0.2  -0.37 -0.57 -0.49]]

 Policy Matrix:
[['RI' 'SP' 'DO' 'UP' 'SP']
 ['DO' 'DO' 'UP' 'DO' 'DO']
 ['TT' 'LE' 'LE' 'RI' 'TT']
 ['DO' 'RI' 'LE' 'RI' 'UP']
 ['TT' 'LE' 'LE' 'UP' 'DO']]


In [30]:
# Part 2.1.2 — Monte Carlo Control with epsilon-soft Policy


In [31]:
def mc_control_eps_soft(num_episodes=5000, gamma=gamma, alpha=0.1, epsilon=0.1, max_steps=100):
    """
    Monte Carlo control using an epsilon-soft policy.
    Returns the learned Q-values and greedy policy.
    """
    # Initialise Q-values for all state-action pairs
    q = {s: {a: 0.0 for a in actions} for s in range(grid_size * grid_size)}

    def get_action_probabilities(state):
        """
        Returns action probabilities using epsilon soft strategy.
        """
        values = q[state]
        best_action = max(values, key=values.get)
        probs = {a: epsilon / len(actions) for a in actions}
        probs[best_action] += 1.0 - epsilon
        return probs

    for _ in range(num_episodes):
        # Pick a random starting state that is not terminal
        while True:
            state = random.randint(0, grid_size * grid_size - 1)
            if state not in terminal_states:
                break

        episode = []
        done = False
        steps = 0

        while not done and steps < max_steps:
          # Choose an action based on epsilon-soft probabilities
            action_probs = get_action_probabilities(state)
            action = random.choices(list(action_probs.keys()), weights=list(action_probs.values()))[0]

            next_state, reward, done = step(state, action)
            episode.append((state, action, reward))
            state = next_state
            steps += 1


        G = 0
        visited = set()

        for t in reversed(range(len(episode))):
            s, a, r = episode[t]
            G = gamma * G + r

            if (s, a) not in visited:
                visited.add((s, a))
                q[s][a] += alpha * (G - q[s][a])

    # Final greedy policy from Q-values
    final_policy = {}
    for s in range(grid_size * grid_size):
        if s in terminal_states:
            final_policy[s] = None
        elif s == blue or s == green:
            final_policy[s] = 'special'
        else:
            final_policy[s] = max(q[s], key=q[s].get)

    return q, final_policy


In [32]:
# Run MC control with epsilon soft policy

q_eps, policy_eps = mc_control_eps_soft()

# Set up the value and policy grids
v_matrix = np.zeros((grid_size, grid_size), dtype=np.float32)
policy_matrix = np.empty((grid_size, grid_size), dtype=object)

for r in range(grid_size):
    for c in range(grid_size):
        s = to_1d(r, c)

        if s in terminal_states:
            v_matrix[r, c] = 0.0
            policy_matrix[r, c] = "TT"
        elif s == blue:
          # Value comes from jumping to red with reward
            v_matrix[r, c] = 5.0 + gamma * v_matrix[to_2d(red)]
            policy_matrix[r, c] = "SP"
        elif s == green:
          # Average value from jumping to red or yellow
            v_matrix[r, c] = 2.5 + gamma * 0.5 * (
                v_matrix[to_2d(red)] + v_matrix[to_2d(yellow)]
            )
            policy_matrix[r, c] = "SP"
        else:
          # Take the best Q-value and action
            v_matrix[r, c] = max(q_eps[s].values())
            policy_matrix[r, c] = policy_eps[s][:2].upper()

v_matrix = np.round(v_matrix, 2)

print("\n State-Value Function Matrix (epsilon-soft):")
print(v_matrix)

print("\n Policy Matrix (epsilon-soft):")
for row in policy_matrix:
    print(" ".join(f"{a:>4}" for a in row))




 State-Value Function Matrix (epsilon-soft):
[[ 3.82  5.    3.63  3.1   2.5 ]
 [ 3.14  3.78  3.11  2.51  1.64]
 [ 0.   -0.2   1.5   1.6   0.  ]
 [-0.2  -0.39 -0.58 -0.41 -0.2 ]
 [ 0.   -0.2  -0.43 -0.57 -0.39]]

 Policy Matrix (epsilon-soft):
  RI   SP   LE   LE   SP
  RI   UP   LE   UP   UP
  TT   LE   UP   LE   TT
  UP   LE   LE   RI   UP
  TT   LE   LE   RI   UP


In [33]:
# Part 2.2  Off-Policy Monte Carlo Control with Importance Sampling

In [34]:
def mc_off_policy_control(num_episodes=5000, gamma=gamma, max_steps=100):
    """
    Off-policy Monte Carlo control using importance sampling.
    Learns a greedy policy while generating episodes from a uniform behaviour policy.
    """
    # Set up Q-values and tracking weights
    q = {s: {a: 0.0 for a in actions} for s in range(grid_size * grid_size)}
    c = {s: {a: 0.0 for a in actions} for s in range(grid_size * grid_size)}

    # Start with a random target policy
    target_policy = {s: random.choice(actions) for s in range(grid_size * grid_size)}
    for s in terminal_states:
        target_policy[s] = None

    for _ in range(num_episodes):
        # Pick a random non terminal starting state
        while True:
            state = random.randint(0, grid_size * grid_size - 1)
            if state not in terminal_states:
                break

        # Generate an episode
        episode = []
        done = False
        steps = 0

        while not done and steps < max_steps:
            action = random.choice(actions)
            next_state, reward, done = step(state, action)
            episode.append((state, action, reward))
            state = next_state
            steps += 1

        # Work backwards through the episode using importance sampling
        G = 0
        W = 1

        for t in reversed(range(len(episode))):
            s, a, r = episode[t]
            G = gamma * G + r

            c[s][a] += W
            q[s][a] += (W / c[s][a]) * (G - q[s][a])

            # Greedy policy update
            best_action = max(q[s], key=q[s].get)
            target_policy[s] = best_action

            # Stop if action differs from the target policy
            if a != best_action:
                break

            # Update weight based on behaviour policy (1/0.25 for uniform)
            W *= 1 / 0.25

    return q, target_policy


In [35]:
 # Run off-policy MC control
q_off, policy_off = mc_off_policy_control()

# Create value and policy matrices
v_matrix = np.zeros((grid_size, grid_size), dtype=np.float32)
policy_matrix = np.empty((grid_size, grid_size), dtype=object)

for r in range(grid_size):
    for c in range(grid_size):
        s = to_1d(r, c)

        if s in terminal_states:
            v_matrix[r, c] = 0.0
            policy_matrix[r, c] = "TT"
        elif s == blue:
            v_matrix[r, c] = 5.0 + gamma * v_matrix[to_2d(red)]
            policy_matrix[r, c] = "SP"
        elif s == green:
            v_matrix[r, c] = 2.5 + gamma * 0.5 * (
                v_matrix[to_2d(red)] + v_matrix[to_2d(yellow)]
            )
            policy_matrix[r, c] = "SP"
        else:
            v_matrix[r, c] = max(q_off[s].values())
            policy_matrix[r, c] = policy_off[s][:2].upper()


v_matrix = np.round(v_matrix, 2)

print("\n State-Value Function Matrix (Off-Policy):")
print(v_matrix)

print("\n Policy Matrix (Off-Policy):")
for row in policy_matrix:
    print(" ".join(f"{a:>4}" for a in row))



 State-Value Function Matrix (Off-Policy):
[[0.   5.   0.   0.   2.5 ]
 [0.   4.55 0.   0.   0.  ]
 [0.   4.12 3.72 0.   0.  ]
 [0.   0.   3.33 2.96 0.  ]
 [0.   0.   0.   2.62 2.29]]

 Policy Matrix (Off-Policy):
  RI   SP   UP   RI   SP
  UP   UP   DO   UP   UP
  TT   UP   LE   UP   TT
  LE   UP   UP   LE   DO
  TT   UP   LE   UP   LE
