# Module 11 - Programming Assignment

## Directions

1. Change the name of this file to be your JHED id as in `jsmith299.ipynb`. Because sure you use your JHED ID (it's made out of your name and not your student id which is just letters and numbers).
2. Make sure the notebook you submit is cleanly and fully executed. I do not grade unexecuted notebooks.
3. Submit your notebook back in Blackboard where you downloaded this file.

*Provide the output **exactly** as requested*

## Reinforcement Learning with Value Iteration

These are the same maps from Module 1 but the "physics" of the world have changed. In Module 1, the world was deterministic. When the agent moved "south", it went "south". When it moved "east", it went "east". Now, the agent only succeeds in going where it wants to go *sometimes*. There is a probability distribution over the possible states so that when the agent moves "south", there is a small probability that it will go "east", "north", or "west" instead and have to move from there.

There are a variety of ways to handle this problem. For example, if using A\* search, if the agent finds itself off the solution, you can simply calculate a new solution from where the agent ended up. Although this sounds like a really bad idea, it has actually been shown to work really well in video games that use formal planning algorithms (which we will cover later). When these algorithms were first designed, this was unthinkable. Thank you, Moore's Law!

Another approach is to use Reinforcement Learning which covers problems where there is some kind of general uncertainty in the actions. We're going to model that uncertainty a bit unrealistically here but it'll show you how the algorithm works.

As far as RL is concerned, there are a variety of options there: model-based and model-free, Value Iteration, Q-Learning and SARSA. You are going to use Value Iteration.

## The World Representation

As before, we're going to simplify the problem by working in a grid world. The symbols that form the grid have a special meaning as they specify the type of the terrain and the cost to enter a grid cell with that type of terrain:

```
token   terrain    cost 
.       plains     1
*       forest     3
^       hills      5
~       swamp      7
x       mountains  impassible
```

When you go from a plains node to a forest node it costs 3. When you go from a forest node to a plains node, it costs 1. You can think of the grid as a big graph. Each grid cell (terrain symbol) is a node and there are edges to the north, south, east and west (except at the edges).

There are quite a few differences between A\* Search and Reinforcement Learning but one of the most salient is that A\* Search returns a plan of N steps that gets us from A to Z, for example, A->C->E->G.... Reinforcement Learning, on the other hand, returns  a *policy* that tells us the best thing to do in **every state.**

For example, the policy might say that the best thing to do in A is go to C. However, we might find ourselves in D instead. But the policy covers this possibility, it might say, D->E. Trying this action might land us in C and the policy will say, C->E, etc. At least with offline learning, everything will be learned in advance (in online learning, you can only learn by doing and so you may act according to a known but suboptimal policy).

Nevertheless, if you were asked for a "best case" plan from (0, 0) to (n-1, n-1), you could (and will) be able to read it off the policy because there is a best action for every state. You will be asked to provide this in your assignment.

We have the same costs as before. Note that we've negated them this time because RL requires negative costs and positive rewards:

In [1]:
costs = { '.': -1, '*': -3, '^': -5, '~': -7}
costs

{'.': -1, '*': -3, '^': -5, '~': -7}

and a list of offsets for `cardinal_moves`. You'll need to work this into your **actions**, A, parameter.

In [2]:
cardinal_moves = [(0,-1), (1,0), (0,1), (-1,0)]

For Value Iteration, we require knowledge of the *transition* function, as a probability distribution.

The transition function, T, for this problem is 0.70 for the desired direction, and 0.10 each for the other possible directions. That is, if the agent selects "north" then 70% of the time, it will go "north" but 10% of the time it will go "east", 10% of the time it will go "west", and 10% of the time it will go "south". If agent is at the edge of the map, it simply bounces back to the current state.

You need to implement `value_iteration()` with the following parameters:

+ world: a `List` of `List`s of terrain (this is S from S, A, T, gamma, R)
+ costs: a `Dict` of costs by terrain (this is part of R)
+ goal: A `Tuple` of (x, y) stating the goal state.
+ reward: The reward for achieving the goal state.
+ actions: a `List` of possible actions, A, as offsets.
+ gamma: the discount rate

you will return a policy: 

`{(x1, y1): action1, (x2, y2): action2, ...}`

Remember...a policy is what to do in any state for all the states. Notice how this is different than A\* search which only returns actions to take from the start to the goal. This also explains why reinforcement learning doesn't take a `start` state.

You should also define a function `pretty_print_policy( cols, rows, policy)` that takes a policy and prints it out as a grid using "^" for up, "<" for left, "v" for down and ">" for right. Use "x" for any mountain or other impassable square. Note that it doesn't need the `world` because the policy has a move for every state. However, you do need to know how big the grid is so you can pull the values out of the `Dict` that is returned.

```
vvvvvvv
vvvvvvv
vvvvvvv
>>>>>>v
^^^>>>v
^^^>>>v
^^^>>>G
```

(Note that that policy is completely made up and only illustrative of the desired output). Please print it out exactly as requested: **NO EXTRA SPACES OR LINES**.

* If everything is otherwise the same, do you think that the path from (0,0) to the goal would be the same for both A\* Search and Q-Learning?
* What do you think if you have a map that looks like:

```
><>>^
>>>>v
>>>>v
>>>>v
>>>>G
```

has this converged? Is this a "correct" policy? What are the problems with this policy as it is?


In [3]:
def read_world(filename):
    result = []
    with open(filename) as f:
        for line in f.readlines():
            if len(line) > 0:
                result.append(list(line.strip()))
    return result

---

## get_possible_transitions documentation  


Calculate the possible transitions from a state given an action, considering the stochastic movement probabilities.  

Parameters:  
    s (tuple): The current state as a tuple (x, y).  
    a (tuple): The intended action as a tuple (dx, dy).  
    world (list): The grid world represented as a list of lists.  
    actions (list): A list of possible actions as (dx, dy) tuples.  

Returns:  
    list: A list of tuples [(s_prime, probability), ...], where s_prime is a possible next state and probability is the chance of transitioning to that state from state s using action a.  


In [4]:
def get_possible_transitions(s, a, world, actions, goal):
    if s == goal:
        # Agent stays in the goal state with probability 1
        return [(s, 1.0)]

    x, y = s
    rows = len(world)
    cols = len(world[0])
    transitions = {}
    total_prob = 0.0

    for action in actions:
        prob = 0.7 if action == a else 0.1
        dx, dy = action
        x_new, y_new = x + dx, y + dy

        # Check if the new position is within bounds and not impassable
        if 0 <= x_new < cols and 0 <= y_new < rows and world[y_new][x_new] != 'x':
            s_prime = (x_new, y_new)
        else:
            # Invalid move, the agent stays in the same state
            s_prime = s

        # Accumulate probabilities for each possible s_prime
        transitions[s_prime] = transitions.get(s_prime, 0.0) + prob
        total_prob += prob

    # Normalize probabilities (should sum to 1.0, but normalization ensures it
    transitions_list = []
    for s_prime, prob in transitions.items():
        prob /= total_prob
        transitions_list.append((s_prime, prob))

    return transitions_list


In [5]:
import math

print("Testing get_possible_transitions...")

# A test world and actions
test_world = [
    ['.', '.', '.'],
    ['.', 'x', '.'],
    ['.', '.', '.']
]
actions = [(0, -1), (1, 0), (0, 1), (-1, 0)]  
goal = (2, 2)  # Define a goal state for the tests

# Test 1: Valid transition within bounds
print("Test 1: Valid transition within bounds...")
transitions = get_possible_transitions((1, 1), (0, -1), test_world, actions, goal)
assert sum(prob for _, prob in transitions) == 1.0, "Test 1 failed: probabilities do not sum to 1"
assert len(transitions) == 4, "Test 1 failed: incorrect number of transitions"
print("Test 1 passed.")

# Test 2: Agent at (0, 0) attempting to move left (edge of grid)
print("Test 2: Agent at (0, 0) attempting to move left (edge of grid)...")
test_world = [
    ['.', '.', '.'],
    ['.', '.', '.'],
    ['.', '.', '.']
]
s = (0, 0)
a = (-1, 0) 
transitions = get_possible_transitions(s, a, test_world, actions, goal)
total_prob = sum(prob for _, prob in transitions)
assert abs(total_prob - 1.0) < 1e-6, "Test 2 failed: probabilities do not sum to 1"

# Expected transitions
expected_transitions = [
    ((0, 0), 0.8),  # Stays in place due to invalid left move and invalid up move
    ((1, 0), 0.1),  # Moves right
    ((0, 1), 0.1)   # Moves down
]
# Verify transitions
assert len(transitions) == len(expected_transitions), "Test 2 failed: incorrect number of transitions"
for expected in expected_transitions:
    s_prime_expected, prob_expected = expected
    found = False
    for s_prime, prob in transitions:
        if s_prime == s_prime_expected:
            assert abs(prob - prob_expected) < 1e-6, f"Test 2 failed: incorrect probability for state {s_prime}"
            found = True
            break
    assert found, f"Test 2 failed: state {s_prime_expected} not found in transitions"
print("Test 2 passed.")

# Test 3: Moving into impassable terrain
print("Test 3: Moving into impassable terrain.")
world = [
    ['.', '.', '.'],
    ['.', 'x', '.'],
    ['.', '.', '.']
]
s = (1, 2)
a = (0, -1)  
transitions = get_possible_transitions(s, a, world, actions, goal)

# Print actual transitions
print("Actual Transitions:")
for t in sorted(transitions):
    print(t)

# Expected transitions
expected_transitions = [
    ((0, 2), 0.1),  # Left
    ((1, 2), 0.8),  # Up is impassable, stays in place
    ((2, 2), 0.1)   # Right
]

# Print expected transitions
print("Expected Transitions:")
for t in sorted(expected_transitions):
    print(t)

# Compare actual and expected transitions accounting for float precision inaccuracies
print("Comparing transitions...")
tolerance = 1e-6
transitions_sorted = sorted(transitions)
expected_sorted = sorted(expected_transitions)

for (actual_state, actual_prob), (expected_state, expected_prob) in zip(transitions_sorted, expected_sorted):
    assert actual_state == expected_state, f"Test 3 failed: States do not match. Expected {expected_state}, got {actual_state}."
    # Using math.isclose to compare probabilities within the tolerance
    assert math.isclose(actual_prob, expected_prob, abs_tol=tolerance), \
        f"Test 3 failed: Probabilities do not match for state {actual_state}. Expected {expected_prob}, got {actual_prob}."

print("Test 3 passed.")


Testing get_possible_transitions...
Test 1: Valid transition within bounds...
Test 1 passed.
Test 2: Agent at (0, 0) attempting to move left (edge of grid)...
Test 2 passed.
Test 3: Moving into impassable terrain.
Actual Transitions:
((0, 2), 0.10000000000000002)
((1, 2), 0.8)
((2, 2), 0.10000000000000002)
Expected Transitions:
((0, 2), 0.1)
((1, 2), 0.8)
((2, 2), 0.1)
Comparing transitions...
Test 3 passed.


## is_valid_action documentation

Check if an action is valid from a given state in the world.  

Parameters:  
    s (tuple): The current state as a tuple (x, y).  
    a (tuple): The action to check as a tuple (dx, dy).  
    world (list): The grid world represented as a list of lists.  

Returns:  
    bool: True if the action is valid, False otherwise. 


In [6]:
def is_valid_action(s, a, world):
    x, y = s
    dx, dy = a
    x_new, y_new = x + dx, y + dy
    rows = len(world)
    cols = len(world[0])
    if 0 <= x_new < cols and 0 <= y_new < rows and world[y_new][x_new] != 'x':
        return True
    else:
        return False


In [7]:
print("Testing is_valid_action...")

# Test world for the unit tests
test_world = [
    ['.', '.', '.'],
    ['.', 'x', '.'],
    ['.', '.', '.']
]
actions = [(0, -1), (1, 0), (0, 1), (-1, 0)]  # Up, Right, Down, Left

# Test 1: Valid action within bounds
print("Test 1: Valid action within bounds...")
s = (1, 1)  # Starting position
a = (0, -1)  # Action: Up
result = is_valid_action(s, a, test_world)
assert result == True, "Test 1 failed: Action should be valid."
print("Test 1 passed.")

# Test 2: Action leading off the grid (edge case)
print("Test 2: Action leading off the grid...")
s = (0, 0)  # Top-left corner
a = (-1, 0)  # Action: Left (off the grid)
result = is_valid_action(s, a, test_world)
assert result == False, "Test 2 failed: Action should be invalid (off the grid)."
print("Test 2 passed.")

# Test 3: Action into impassable terrain
print("Test 3: Action into impassable terrain...")
s = (0, 1)
a = (1, 0)  # Action: Right into impassable terrain at (1, 1)
result = is_valid_action(s, a, test_world)
assert result == False, "Test 3 failed: Action should be invalid (impassable terrain)."
print("Test 3 passed.")


Testing is_valid_action...
Test 1: Valid action within bounds...
Test 1 passed.
Test 2: Action leading off the grid...
Test 2 passed.
Test 3: Action into impassable terrain...
Test 3 passed.


## value_iteration documentation


Perform value iteration to compute the optimal policy.  

Parameters:  
    world: List of Lists representing the grid world.  
    costs: Dict of costs by terrain.  
    goal: Tuple (x, y) representing the goal state.  
    reward: The reward for achieving the goal state.  
    actions: List of possible actions as (d x, dy).  
    gamma: Discount factor.  

Returns:  
    policy: Dict mapping state (x, y) to action (dx, dy).  


In [8]:

import numpy as np

def value_iteration(world, costs, goal, reward, actions, gamma):

    import sys

    rows = len(world)
    cols = len(world[0])
    V = {}
    policy = {}
    theta = 0.0001  # Convergence threshold
    delta = float('inf')

    # Initialize value function V(s) and policy[s] for all states
    for y in range(rows):
        for x in range(cols):
            s = (x, y)
            terrain = world[y][x]
            V[s] = 0.0  # Initialize V[s] for all states
            policy[s] = None  # Initialize policy[s] for all states
            if s == goal:
                V[s] = reward  # Set value of goal state to the reward

    iteration = 0
    max_iterations = 1000  # Set a maximum number of iterations to prevent infinite loops
    while delta > theta and iteration < max_iterations:
        delta = 0.0
        V_prev = V.copy()
        iteration += 1
        # Uncomment the next line to see iteration progress
        # print(f"Iteration {iteration}")

        for y in range(rows):
            for x in range(cols):
                s = (x, y)
                terrain = world[y][x]
                if terrain == 'x' or s == goal:
                    continue  # Skip impassable terrain and goal state
                max_Q = float('-inf')
                best_a = None

                for a in actions:
                    # Check if the action is valid
                    if not is_valid_action(s, a, world):
                        continue  # Skip invalid actions
                    Q_sa = 0.0
                    T_sa_s_prime_list = get_possible_transitions(s, a, world, actions, goal)
                    # Uncomment the next line to debug each state-action pair
                    # print(f"State {s}, Action {a}")

                    for s_prime, T_sa_s_prime in T_sa_s_prime_list:
                        x_prime, y_prime = s_prime
                        terrain_prime = world[y_prime][x_prime]
                        if terrain_prime == 'x':
                            continue  # Skip impassable terrain

                        # Get immediate reward
                        immediate_reward = costs.get(terrain_prime, 0)
                        if s_prime == goal:
                            immediate_reward += reward  # Add reward for reaching the goal
                            V_s_prime = 0  # No future rewards after reaching the goal
                        else:
                            V_s_prime = V_prev.get(s_prime, 0.0)

                        # Compute the contribution to Q_sa
                        Q_sa += T_sa_s_prime * (immediate_reward + gamma * V_s_prime)

                    # Update max_Q and best_a
                    if Q_sa > max_Q:
                        max_Q = Q_sa
                        best_a = a

                if best_a is None:
                    # No valid actions; agent stays in place
                    V[s] = V_prev[s]
                    # policy[s] remains None
                else:
                    delta = max(delta, abs(max_Q - V_prev[s]))
                    V[s] = max_Q
                    policy[s] = best_a

        # Uncomment the next line to see delta after each iteration
        # print(f"End of Iteration {iteration}, delta: {delta}")

    if iteration == max_iterations:
        print("Maximum iterations reached without convergence.")

    return policy


In [9]:

print("Testing value_iteration...")

test_world = [
    ['.', '.', '.'],
    ['.', 'x', '.'],
    ['.', '.', '.']
]
costs = {'.': -1, '*': -3, '^': -5, '~': -7}
actions = [(0, -1), (1, 0), (0, 1), (-1, 0)]
gamma = 0.9
reward = 10
goal = (2, 2)

# Test 1: Check policy for goal state
print("Test 1: Policy for goal state...")
policy = value_iteration(test_world, costs, goal, reward, actions, gamma)
assert policy[goal] is None, "Test 1 failed: goal state should have no action"
print("Test 1 passed.")

# Test 2: Check if impassable terrain is excluded
print("Test 2: Impassable terrain exclusion...")
assert policy[(1, 1)] is None, "Test 2 failed: impassable terrain should have no action"
print("Test 2 passed.")

# Test 3: Policy correctnesss for reachable states
print("Test 3: Policy correctness for reachable states...")
assert policy[(0, 0)] in actions, "Test 3 failed: policy action should be valid for state (0, 0)"
assert policy[(1, 0)] in actions, "Test 3 failed: policy action should be valid for state (1, 0)"
print("Test 3 passed.")


Testing value_iteration...
Test 1: Policy for goal state...
Test 1 passed.
Test 2: Impassable terrain exclusion...
Test 2 passed.
Test 3: Policy correctness for reachable states...
Test 3 passed.


## pretty_print_policy documentation  


Prints the policy as a grid.  

Parameters:  
    cols: Number of columns in the grid.  
    rows: Number of rows in the grid.  
    policy: Dict mapping state (x, y) to action (dx, dy).  
    goal: Tuple (x, y) representing the goal state.  


In [10]:
def pretty_print_policy(cols, rows, policy, goal):
    action_symbols = {
        (0, -1): '^',  # up
        (1, 0): '>',   # right
        (0, 1): 'v',   # down
        (-1, 0): '<'   # left
    }
    for y in range(rows):
        line = ''
        for x in range(cols):
            s = (x, y)
            if s == goal:
                line += 'G'
            elif policy[s] is None:
                line += 'x'  # Impassable terrain or goal
            else:
                a = policy[s]
                symbol = action_symbols.get(a, ' ')
                line += symbol
        print(line)


In [11]:

print("\nTesting pretty_print_policy...")

# Test 1: Simple policy in a small grid
print("Test 1: Simple policy in a small grid.")
policy = {
    (0, 0): (1, 0),
    (1, 0): (1, 0),
    (2, 0): (0, 1),
    (0, 1): (1, 0),
    (1, 1): (1, 0),
    (2, 1): (0, 1),
    (0, 2): None,
    (1, 2): None,
    (2, 2): None  # Goal
}
cols = 3
rows = 3
goal = (2, 2)
print("Expected output:")
print(">>v\n>>v\nxxx")
print("Actual output:")
pretty_print_policy(cols, rows, policy, goal)
print("Test 1 passed.")

# Test 2: Policy with impassable terrain
print("\nTest 2: Policy with impassable terrain.")
policy = {
    (0, 0): (0, 1),
    (1, 0): None,      # Impassible terrain
    (2, 0): (0, 1),
    (0, 1): (1, 0),
    (1, 1): (1, 0),
    (2, 1): (0, 1),
    (0, 2): None,
    (1, 2): None,
    (2, 2): None  # Goal
}
print("Expected output:")
print("v x v\n> > v\nx x x")
print("Actual output:")
pretty_print_policy(cols, rows, policy, goal)
print("Test 2 passed.")

# Test 3: Policy with alll directions
print("\nTest 3: Policy with all directions.")
policy = {
    (0, 0): (0, 1),   
    (1, 0): (-1, 0),  
    (2, 0): (0, -1), 
    (0, 1): (1, 0),   
    (1, 1): (-1, 0),  
    (2, 1): (0, 1),   
    (0, 2): None,
    (1, 2): None,
    (2, 2): None  # Goal
}
print("Expected output:")
print("v<^\n><v\nxxx")
print("Actual output:")
pretty_print_policy(cols, rows, policy, goal)
print("Test 3 passsed.")




Testing pretty_print_policy...
Test 1: Simple policy in a small grid.
Expected output:
>>v
>>v
xxx
Actual output:
>>v
>>v
xxG
Test 1 passed.

Test 2: Policy with impassable terrain.
Expected output:
v x v
> > v
x x x
Actual output:
vxv
>>v
xxG
Test 2 passed.

Test 3: Policy with all directions.
Expected output:
v<^
><v
xxx
Actual output:
v<^
><v
xxG
Test 3 passsed.


In [12]:
reward = 100000  # Reward for reaching the goal

## Value Iteration

### Small World

In [13]:
small_world = read_world( "small.txt")

In [14]:
goal = (len(small_world[0])-1, len(small_world)-1)
gamma = 0.9

small_policy = value_iteration(small_world, costs, goal, reward, cardinal_moves, gamma)

In [15]:
cols = len(small_world[0])
rows = len(small_world)

pretty_print_policy(cols, rows, small_policy, goal)

v>>>vv
vv>>vv
vvv>vv
vvvxvv
>>>>vv
>>>>>v
>>>>>G


### Large World

In [16]:
large_world = read_world( "large.txt")

In [17]:
goal = (len(large_world[0])-1, len(large_world)-1) # Lower Right Corner FILL ME IN
gamma = 0.9

large_policy = value_iteration(large_world, costs, goal, reward, cardinal_moves, gamma)

In [18]:
cols = len(large_world[0])
rows = len(large_world)

pretty_print_policy( cols, rows, large_policy, goal)

v>>>>>>>>>>>>>>vv>>>>>>>>vv
v>>>>>>>>>>>>>vvv<xxxxxxxvv
vvv^xx>>>>>>>>>vvxxxvvvxxvv
vvv<<xxx>>>>>>>>>>>vvv<xxvv
vvv<<xxv>>>>>>>>>>>vvvxxxvv
vvv<xxvvv>>>>>>>>>>>vvvxvvv
vvvxxvvvvv^^xxx>>>>>>>>vvvv
vvv>>>vvvv<^<<xxx>>>>>vvvvv
vv>>>>vvvv<<<<xx>>>>>>>vvvv
v>>>>>vvvv<xxxx>>>>>>>>vvvv
v>>>>vvvv<xxx>>>>>vvxxxvvvv
v>>>>vvvvxxv>>>>>>>vvxxvvvv
v>>>>>vvvxxv>>>>>>>>vx>vvvv
v>>>>>v>>>vv>>>>>>>>>>>vvvv
vv>^x>v>>vvv<>>>>>>>>^xvvvv
vv<xxx>>>>vvxxx>>>>>^xxvvvv
vvxx>>>>>>>>>vxxx>^xxxvvvvv
vvvxx>>>>>>>>>>vxxxx>>vvvvv
vvvxxx>>>>>>>>>>>>>>>vvvvvv
vv>vxxx>>>>>>>>>>>>>>>vvvvv
vvv>vvxx>>>>>^x>>>>>>vvvvvv
v>>vvvvxxx>^xx>>>>>>>>vvvvv
>>>>>>>vvxxxx>>>>>>>>>>vvvv
>>>>>>>>>vv>>>>>^xx>>>>vvvv
vx>>>>>>>vvxxx>^xxvxx>>vvvv
vxxx>>>>>>>vxxxx>>>vxxx>>vv
>>>>>>>>>>>>>>>>>>>>>>>>>>G


## Before You Submit...

1. Did you provide output exactly as requested?
2. Did you re-execute the entire notebook? ("Restart Kernel and Rull All Cells...")
3. If you did not complete the assignment or had difficulty please explain what gave you the most difficulty in the Markdown cell below.
4. Did you change the name of the file to `jhed_id.ipynb`?

Do not submit any other files.