# Final Project- Formal Methods in Robotics and AI

### Erik Bloomquist, Adam Benali, Rodrigo Bazan

Markov Decision Process constrained using LTL. Agent (Jerry) must never reach any hazard states (traps or Tom) while maximizing the probability of reaching a goal state (cheese). The run ends when either a hazard is reached or a cheese is reached. An optimal policy is computed using value iteration and displayed using arrows, representing the chosen action in each state. Success probabilities from each initial state are displayed. Tom's initial state is random uses a uniformely random movement policy.

In [1]:
"""
Tom & Jerry temporal-logic-style gridworld using PyMDPtoolbox.

- Grid: 5x5 (25 cells)
- State: (Jerry_pos, Tom_pos) => 25*25 = 625 states
- Actions: N, S, E, W for Jerry
- Jerry's motion: 0.8 intended dir, 0.1 left, 0.1 right
- Tom's motion: random over N,S,E,W (uniform)
- Goal: Jerry reaches a cheese cell (absorbing success)
- Bad: Jerry hits a trap or same cell as Tom (absorbing failure)

We then solve a discounted MDP with PyMDPtoolboxâ€™s ValueIteration.
With only a terminal reward of 1 at goal and discount ~1, the value
function approximates the probability of safely reaching cheese.
"""

import numpy as np

# Try to import PyMDPtoolbox (module name is usually mdptoolbox)
try:
    from mdptoolbox.mdp import ValueIteration
except ImportError:
    from pymdptoolbox.mdp import ValueIteration  # fallback if installed under old name


# ----------------------------------------------------------------------
# 1. Grid and indexing utilities
# ----------------------------------------------------------------------

GRID_SIZE = 5
NUM_CELLS = GRID_SIZE * GRID_SIZE

def rc_to_pos(r, c):
    """Map (row, col) to single index 0..24."""
    return r * GRID_SIZE + c

def pos_to_rc(pos):
    """Map 0..24 to (row, col)."""
    return divmod(pos, GRID_SIZE)




In [2]:
# ----------------------------------------------------------------------
# 2. Environment layout: traps & cheese
# ----------------------------------------------------------------------
# You can change these to match your figure exactly.

# Example: traps at (row,col) = (0,0) and (3,1)
TRAP_CELLS = {
    rc_to_pos(0, 0),
    rc_to_pos(3, 1),
}

# Example: cheese at (row,col) = (2,1) and (2,3)
CHEESE_CELLS = {
    rc_to_pos(2, 1),
    rc_to_pos(2, 3),
}

is_trap = np.zeros(NUM_CELLS, dtype=bool)
is_cheese = np.zeros(NUM_CELLS, dtype=bool)
for p in TRAP_CELLS:
    is_trap[p] = True
for p in CHEESE_CELLS:
    is_cheese[p] = True

In [3]:
# ----------------------------------------------------------------------
# 3. Actions and motion dynamics
# ----------------------------------------------------------------------

NORTH, SOUTH, EAST, WEST = 0, 1, 2, 3
ACTIONS = [NORTH, SOUTH, EAST, WEST]
NUM_ACTIONS = len(ACTIONS)

ACTION_DELTAS = {
    NORTH: (-1, 0),
    SOUTH: ( 1, 0),
    EAST:  ( 0, 1),
    WEST:  ( 0,-1),
}

LEFT_OF = {
    NORTH: WEST,
    SOUTH: EAST,
    EAST:  NORTH,
    WEST:  SOUTH,
}
RIGHT_OF = {
    NORTH: EAST,
    SOUTH: WEST,
    EAST:  SOUTH,
    WEST:  NORTH,
}

def move_cell(pos, action):
    """Deterministic move for a single agent on the grid; if off-grid, stay."""
    r, c = pos_to_rc(pos)
    dr, dc = ACTION_DELTAS[action]
    nr, nc = r + dr, c + dc
    if 0 <= nr < GRID_SIZE and 0 <= nc < GRID_SIZE:
        return rc_to_pos(nr, nc)
    else:
        return pos  # bounce into wall


def jerry_transition_probs(jerry_pos, action):
    """
    Jerry's stochastic motion under chosen action.
    Returns dict next_pos -> probability.
    0.8 intended dir, 0.1 left, 0.1 right.
    """
    intended = action
    left = LEFT_OF[action]
    right = RIGHT_OF[action]
    moves = [intended, left, right]
    probs = [0.8, 0.1, 0.1]

    outcomes = {}
    for a, p in zip(moves, probs):
        nxt = move_cell(jerry_pos, a)
        outcomes[nxt] = outcomes.get(nxt, 0.0) + p
    return outcomes


def tom_transition_probs(tom_pos):
    """
    Tom's random policy: uniform over N,S,E,W.
    Returns dict next_pos -> probability.
    """
    outcomes = {}
    for a in ACTIONS:
        nxt = move_cell(tom_pos, a)
        outcomes[nxt] = outcomes.get(nxt, 0.0) + 1.0 / NUM_ACTIONS
    return outcomes


In [4]:
# ----------------------------------------------------------------------
# 4. Joint state space (Jerry_pos, Tom_pos)
# ----------------------------------------------------------------------

NUM_STATES = NUM_CELLS * NUM_CELLS

def encode_state(jerry_pos, tom_pos):
    """(jerry, tom) -> single state index 0..624."""
    return jerry_pos * NUM_CELLS + tom_pos

def decode_state(s):
    """state index -> (jerry_pos, tom_pos)."""
    jerry_pos = s // NUM_CELLS
    tom_pos = s % NUM_CELLS
    return jerry_pos, tom_pos


def is_bad_state(jerry_pos, tom_pos):
    """Jerry is in trap OR collides with Tom."""
    return is_trap[jerry_pos] or (jerry_pos == tom_pos)


def is_goal_state(jerry_pos, tom_pos):
    """Jerry is on any cheese cell; Tom can be anywhere."""
    return is_cheese[jerry_pos]



In [5]:
# ----------------------------------------------------------------------
# 5. Build transition matrices P for PyMDPtoolbox
# ----------------------------------------------------------------------

def build_transition_matrices():
    """
    Returns:
        P: numpy array of shape (A, S, S)
    where P[a, s, s_next] = Pr(s_next | s, a)
    """
    P = np.zeros((NUM_ACTIONS, NUM_STATES, NUM_STATES), dtype=float)

    # Base dynamics (before making goal/bad absorbing)
    for jerry in range(NUM_CELLS):
        for tom in range(NUM_CELLS):
            s = encode_state(jerry, tom)

            for a in ACTIONS:
                j_trans = jerry_transition_probs(jerry, a)
                t_trans = tom_transition_probs(tom)

                for j_next, pj in j_trans.items():
                    for t_next, pt in t_trans.items():
                        s_next = encode_state(j_next, t_next)
                        P[a, s, s_next] += pj * pt

    # Make goal and bad states absorbing
    for s in range(NUM_STATES):
        jerry, tom = decode_state(s)
        if is_goal_state(jerry, tom) or is_bad_state(jerry, tom):
            P[:, s, :] = 0.0
            for a in ACTIONS:
                P[a, s, s] = 1.0

    # Sanity check: each P[a] should be row-stochastic
    # (Optional: you can comment this out for speed)
    row_sums = P.sum(axis=2)
    assert np.allclose(row_sums, 1.0, atol=1e-8), "Transition matrices are not stochastic."

    return P


In [6]:
# ----------------------------------------------------------------------
# 6. Reward vector R for reachability
# ----------------------------------------------------------------------

def build_reward_vector():
    """
    R[s] = 1 if s is goal, else 0.
    ValueIteration will then try to maximize expected discounted reward.
    With only terminal rewards and discount close to 1, this approximates
    the probability of eventually reaching cheese.
    """
    R = np.zeros(NUM_STATES, dtype=float)
    for s in range(NUM_STATES):
        jerry, tom = decode_state(s)
        if is_goal_state(jerry, tom):
            R[s] = 1.0
        else:
            R[s] = 0.0
    return R

In [7]:
# ----------------------------------------------------------------------
# 7. Solve the MDP with PyMDPtoolbox
# ----------------------------------------------------------------------

def solve_mdp(P, R, discount=0.99, epsilon=1e-5, max_iter=10_000):
    """
    Run Value Iteration to approximate optimal safe-reachability policy.

    Args:
        P: transition matrices, shape (A, S, S)
        R: reward vector, shape (S,)
        discount: gamma in [0,1); close to 1 for long-horizon
        epsilon: convergence threshold
        max_iter: safety cap

    Returns:
        policy: array of shape (S,) with optimal action indices
        V: array of shape (S,) with value function
    """
    vi = ValueIteration(P, R, discount=discount, epsilon=epsilon, max_iter=max_iter)
    vi.run()
    return np.array(vi.policy), np.array(vi.V)

In [8]:
# ----------------------------------------------------------------------
# 8. Aggregating results for Jerry (optional but useful)
# ----------------------------------------------------------------------

def jerry_value_marginal_over_tom(V, tom_initial_dist=None):
    """
    Compute an aggregated "success value" for each Jerry cell,
    averaging over Tom's initial position distribution.

    Args:
        V: value function over joint states, shape (S,)
        tom_initial_dist: length-25 array giving Pr(Tom starts at cell k).
                          If None, use uniform over all 25 cells.

    Returns:
        jerry_values: array shape (25,) where jerry_values[j]
                      is the expected value when Jerry starts at cell j
                      and Tom's initial location is random.
    """
    if tom_initial_dist is None:
        tom_initial_dist = np.ones(NUM_CELLS) / NUM_CELLS
    tom_initial_dist = np.asarray(tom_initial_dist)
    assert tom_initial_dist.shape == (NUM_CELLS,)
    assert np.allclose(tom_initial_dist.sum(), 1.0)

    jerry_values = np.zeros(NUM_CELLS)
    for jerry in range(NUM_CELLS):
        val = 0.0
        for tom in range(NUM_CELLS):
            s = encode_state(jerry, tom)
            val += tom_initial_dist[tom] * V[s]
        jerry_values[jerry] = val
    return jerry_values


def jerry_value_grid(jerry_values):
    """Reshape 25-element vector into 5x5 array for visualization."""
    return jerry_values.reshape(GRID_SIZE, GRID_SIZE)

In [None]:
# ----------------------------------------------------------------------
# 9. Main experiment
# ----------------------------------------------------------------------

if __name__ == "__main__":
    # Build MDP
    P = build_transition_matrices()
    R = build_reward_vector()

    # Solve it
    policy, V = solve_mdp(P, R, discount=0.99)

    print("Optimal policy shape:", policy.shape)
    print("Value function shape:", V.shape)

    # Example: value from a specific initial state
    # Suppose Jerry starts at bottom-left (4,0), Tom at top-center (0,2)
    jerry_start = rc_to_pos(4, 0)
    tom_start = rc_to_pos(0, 2)
    s0 = encode_state(jerry_start, tom_start)
    print(f"Value at initial state (Jerry={jerry_start}, Tom={tom_start}): {V[s0]:.4f}")

    # Aggregate over random Tom initial position
    jerry_vals = jerry_value_marginal_over_tom(V)
    grid_vals = jerry_value_grid(jerry_vals)

    print("\nJerry's aggregated values over starting positions (row-major 5x5):")
    np.set_printoptions(precision=3, suppress=True)
    print(grid_vals)

    # If you want, you can also print the optimal action for Jerry at each joint state
    # or just for Jerry's position assuming random Tom.

In [None]:
import numpy as np

# Try to import PyMDPtoolbox (module name can vary)
try:
    from mdptoolbox.mdp import ValueIteration
except ImportError:
    from pymdptoolbox.mdp import ValueIteration  # some installs use this name


# ----------------------------------------------------------------------
# 1. Grid / indexing utilities
# ----------------------------------------------------------------------
GRID_SIZE = 5
NUM_CELLS = GRID_SIZE * GRID_SIZE

def rc_to_pos(r, c):
    return r * GRID_SIZE + c

def pos_to_rc(pos):
    return divmod(pos, GRID_SIZE)


# ----------------------------------------------------------------------
# 2. Environment layout: traps & cheese (EDIT THESE TO MATCH YOUR FIGURE)
# ----------------------------------------------------------------------
TRAP_CELLS = {
    rc_to_pos(0, 0),
    rc_to_pos(3, 1),
}
CHEESE_CELLS = {
    rc_to_pos(2, 1),
    rc_to_pos(2, 3),
}

is_trap = np.zeros(NUM_CELLS, dtype=bool)
is_cheese = np.zeros(NUM_CELLS, dtype=bool)
for p in TRAP_CELLS:
    is_trap[p] = True
for p in CHEESE_CELLS:
    is_cheese[p] = True


# ----------------------------------------------------------------------
# 3. Actions and motion dynamics
# ----------------------------------------------------------------------
NORTH, SOUTH, EAST, WEST = 0, 1, 2, 3
ACTIONS = [NORTH, SOUTH, EAST, WEST]
NUM_ACTIONS = len(ACTIONS)

ACTION_DELTAS = {
    NORTH: (-1, 0),
    SOUTH: ( 1, 0),
    EAST:  ( 0, 1),
    WEST:  ( 0,-1),
}

LEFT_OF = {
    NORTH: WEST,
    SOUTH: EAST,
    EAST:  NORTH,
    WEST:  SOUTH,
}
RIGHT_OF = {
    NORTH: EAST,
    SOUTH: WEST,
    EAST:  SOUTH,
    WEST:  NORTH,
}

def move_cell(pos, action):
    r, c = pos_to_rc(pos)
    dr, dc = ACTION_DELTAS[action]
    nr, nc = r + dr, c + dc
    if 0 <= nr < GRID_SIZE and 0 <= nc < GRID_SIZE:
        return rc_to_pos(nr, nc)
    else:
        return pos  # bounce into wall

def jerry_transition_probs(jerry_pos, action):
    intended = action
    left = LEFT_OF[action]
    right = RIGHT_OF[action]
    moves = [intended, left, right]
    probs = [0.8, 0.1, 0.1]

    outcomes = {}
    for a, p in zip(moves, probs):
        nxt = move_cell(jerry_pos, a)
        outcomes[nxt] = outcomes.get(nxt, 0.0) + p
    return outcomes

def tom_transition_probs(tom_pos):
    outcomes = {}
    for a in ACTIONS:
        nxt = move_cell(tom_pos, a)
        outcomes[nxt] = outcomes.get(nxt, 0.0) + 1.0 / NUM_ACTIONS
    return outcomes


# ----------------------------------------------------------------------
# 4. Joint state space: (Jerry_pos, Tom_pos)
# ----------------------------------------------------------------------
NUM_STATES = NUM_CELLS * NUM_CELLS

def encode_state(jerry_pos, tom_pos):
    return jerry_pos * NUM_CELLS + tom_pos

def decode_state(s):
    jerry_pos = s // NUM_CELLS
    tom_pos = s % NUM_CELLS
    return jerry_pos, tom_pos

def is_bad_state(jerry_pos, tom_pos):
    return is_trap[jerry_pos] or (jerry_pos == tom_pos)

def is_goal_state(jerry_pos, tom_pos):
    return is_cheese[jerry_pos]


# ----------------------------------------------------------------------
# 5. Build transition matrices P (as np.array) and then convert to list
# ----------------------------------------------------------------------
def build_transition_matrices():
    P = np.zeros((NUM_ACTIONS, NUM_STATES, NUM_STATES), dtype=np.float64)

    # base dynamics
    for jerry in range(NUM_CELLS):
        for tom in range(NUM_CELLS):
            s = encode_state(jerry, tom)

            for a in ACTIONS:
                j_trans = jerry_transition_probs(jerry, a)
                t_trans = tom_transition_probs(tom)

                for j_next, pj in j_trans.items():
                    for t_next, pt in t_trans.items():
                        s_next = encode_state(j_next, t_next)
                        P[a, s, s_next] += pj * pt

    # make goal and bad states absorbing
    for s in range(NUM_STATES):
        j, t = decode_state(s)
        if is_goal_state(j, t) or is_bad_state(j, t):
            P[:, s, :] = 0.0
            for a in ACTIONS:
                P[a, s, s] = 1.0

    # convert 3D array -> list of (S,S) matrices for PyMDPtoolbox
    P_list = [P[a] for a in range(NUM_ACTIONS)]
    return P_list


# ----------------------------------------------------------------------
# 6. Reward vector R
# ----------------------------------------------------------------------
def build_reward_vector():
    R = np.zeros(NUM_STATES, dtype=np.float64)
    for s in range(NUM_STATES):
        j, t = decode_state(s)
        if is_goal_state(j, t):
            R[s] = 1.0
        else:
            R[s] = 0.0
    return R


# ----------------------------------------------------------------------
# 7. Solve MDP with Value Iteration
# ----------------------------------------------------------------------
def solve_mdp(P_list, R, discount=0.99, epsilon=1e-5, max_iter=10_000):
    vi = ValueIteration(P_list, R, discount=discount, epsilon=epsilon, max_iter=max_iter)
    vi.run()
    return np.array(vi.policy), np.array(vi.V)


# ----------------------------------------------------------------------
# 8. Aggregate values for Jerry only (marginalizing Tom)
# ----------------------------------------------------------------------
def jerry_value_marginal_over_tom(V, tom_initial_dist=None):
    if tom_initial_dist is None:
        tom_initial_dist = np.ones(NUM_CELLS) / NUM_CELLS
    tom_initial_dist = np.asarray(tom_initial_dist)
    assert tom_initial_dist.shape == (NUM_CELLS,)
    assert np.allclose(tom_initial_dist.sum(), 1.0)

    jerry_values = np.zeros(NUM_CELLS)
    for j in range(NUM_CELLS):
        val = 0.0
        for t in range(NUM_CELLS):
            s = encode_state(j, t)
            val += tom_initial_dist[t] * V[s]
        jerry_values[j] = val
    return jerry_values.reshape(GRID_SIZE, GRID_SIZE)


# ----------------------------------------------------------------------
# 9. Main experiment
# ----------------------------------------------------------------------
if __name__ == "__main__":
    print("Building transition matrices...")
    P_list = build_transition_matrices()
    print("Done. Building rewards...")
    R = build_reward_vector()

    print("Solving MDP with Value Iteration...")
    policy, V = solve_mdp(P_list, R, discount=0.99)
    print("Done.")
    print("Policy shape:", policy.shape)
    print("Value function shape:", V.shape)

    # Example initial state
    jerry_start = rc_to_pos(4, 0)
    tom_start = rc_to_pos(0, 2)
    s0 = encode_state(jerry_start, tom_start)
    print(f"Value at initial state (Jerry={jerry_start}, Tom={tom_start}): {V[s0]:.4f}")

    # Aggregate over random Tom initial position
    grid_vals = jerry_value_marginal_over_tom(V)
    np.set_printoptions(precision=3, suppress=True)
    print("\nJerry's aggregated values over starting positions (5x5 grid):")
    print(grid_vals)


Building transition matrices...
Done. Building rewards...
Solving MDP with Value Iteration...
