## On-Policy Bisimulation metric


In [1]:
import numpy as np
from collections import defaultdict

ACTION_SPACE = ['up', 'down', 'left', 'right']
ACTION2ACTION = { action[0]: action for action in ACTION_SPACE }
ACTION2IDX = {'u': 0, 'd': 1, 'l': 2, 'r': 3}
REWARD_FUNC = {'G': 10, '.': -1, 'T': 0} # goal, empty, terminal

def read_grid_world(file_path):
    with open(file_path, 'r') as file:
        grid = [line.split() for line in file]
    return np.array(grid)

def get_neighbors(grid, row, col):
    neighbors = {
        'up': (row - 1, col),
        'down': (row + 1, col),
        'left': (row, col - 1),
        'right': (row, col + 1)
    }
    valid_neighbors = {}
    for action, (r, c) in neighbors.items():
        if 0 <= r < grid.shape[0] and 0 <= c < grid.shape[1] and grid[r, c] != 'x':
            valid_neighbors[action] = (r, c)
        else:
            valid_neighbors[action] = (row, col)  # stay in the same state if out of bounds or wall
    return valid_neighbors

def calculate_rewards_and_transitions(grid):
    states = {}
    rewards = {}
    transitions = {}
    for row in range(grid.shape[0]):
        for col in range(grid.shape[1]):
            state = (row, col)
            if grid[row, col] == 'G':
                # NOTE: Terminal state in the goal has a reward of 0
                rewards[state] = {action: REWARD_FUNC['T'] for action in ACTION_SPACE}
                transitions[state] = {action: {state: 1.0} for action in ACTION_SPACE}
            elif grid[row, col] == '.':
                rewards[state] = {}
                transitions[state] = {}
                neighbors = get_neighbors(grid, row, col)
                for action, (r, c) in neighbors.items():
                    if grid[r, c] == 'G':
                        rewards[state][action] = REWARD_FUNC['G']
                    else:
                        rewards[state][action] = REWARD_FUNC['.']
                    transitions[state][action] = {(r, c): 1.0}
    return rewards, transitions

In [2]:
file_path = 'custom_envs/grid_world.txt'
grid = read_grid_world(file_path)
print(grid)

rewards, transitions = calculate_rewards_and_transitions(grid)
rewards

[['.' '.' 'G']
 ['.' '.' '.']
 ['x' 'x' 'x']
 ['.' '.' 'G']
 ['.' '.' '.']]


{(0, 0): {'up': -1, 'down': -1, 'left': -1, 'right': -1},
 (0, 1): {'up': -1, 'down': -1, 'left': -1, 'right': 10},
 (0, 2): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (1, 0): {'up': -1, 'down': -1, 'left': -1, 'right': -1},
 (1, 1): {'up': -1, 'down': -1, 'left': -1, 'right': -1},
 (1, 2): {'up': 10, 'down': -1, 'left': -1, 'right': -1},
 (3, 0): {'up': -1, 'down': -1, 'left': -1, 'right': -1},
 (3, 1): {'up': -1, 'down': -1, 'left': -1, 'right': 10},
 (3, 2): {'up': 0, 'down': 0, 'left': 0, 'right': 0},
 (4, 0): {'up': -1, 'down': -1, 'left': -1, 'right': -1},
 (4, 1): {'up': -1, 'down': -1, 'left': -1, 'right': -1},
 (4, 2): {'up': 10, 'down': -1, 'left': -1, 'right': -1}}

In [10]:
def value_iteration(grid, gamma=0.9, epsilon=1e-4):
    # Define parameters
    rows, cols = grid.shape
    gamma = 0.9
    # reward_goal = 10
    epsilon = 1e-4

    # Initialize value function
    V = np.zeros((rows, cols))

    # Define the transition probabilities and rewards
    def get_next_state(r, c, action):
        if action == 'up':
            return max(r - 1, 0), c
        elif action == 'down':
            return min(r + 1, rows - 1), c
        elif action == 'left':
            return r, max(c - 1, 0)
        elif action == 'right':
            return r, min(c + 1, cols - 1)
        
    def reward(r, c):
        if grid[r][c] == 'G':
            return REWARD_FUNC['G']
        else:
            return REWARD_FUNC['.']

    def is_terminal(r, c):
        return grid[r][c] == 'G' or grid[r][c] == 'x'

    # Value iteration
    while True:
        delta = 0
        new_V = np.copy(V)
        for r in range(rows):
            for c in range(cols):
                if not is_terminal(r, c):
                    max_value = float('-inf')
                    for action in ACTION_SPACE:
                        next_r, next_c = get_next_state(r, c, action)
                        if grid[next_r][next_c] != 'x':
                            value = reward(next_r, next_c) + gamma * V[next_r][next_c]
                            if value > max_value:
                                max_value = value
                    new_V[r][c] = max_value
                    delta = max(delta, abs(new_V[r][c] - V[r][c]))
        V = new_V
        if delta < epsilon:
            break

    # Extract policy
    policy = np.full((rows, cols), ' ')
    for r in range(rows):
        for c in range(cols):
            if not is_terminal(r, c):
                max_value = float('-inf')
                best_action = None
                for action in ACTION_SPACE:
                    next_r, next_c = get_next_state(r, c, action)
                    if grid[next_r][next_c] != 'x':
                        value = reward(next_r, next_c) + gamma * V[next_r][next_c]
                        if value > max_value:
                            max_value = value
                            best_action = action
                policy[r][c] = best_action[0]  # Just take the first letter of the action

    return V, policy

# Define the grid world environment
file_path = 'custom_envs/grid_world.txt'
grid = read_grid_world(file_path)
print(grid)

# Run value iteration
V, policy = value_iteration(grid)

# Display the results
print("Optimal Value Function:")
print(V)
print("\nOptimal Policy:")
print(policy)

[['.' '.' 'G']
 ['.' '.' '.']
 ['x' 'x' 'x']
 ['.' '.' 'G']
 ['.' '.' '.']]
Optimal Value Function:
[[ 8.  10.   0. ]
 [ 6.2  8.  10. ]
 [ 0.   0.   0. ]
 [ 8.  10.   0. ]
 [ 6.2  8.  10. ]]

Optimal Policy:
[['r' 'r' ' ']
 ['u' 'u' 'u']
 [' ' ' ' ' ']
 ['r' 'r' ' ']
 ['u' 'u' 'u']]


In [12]:
transitions

{(0, 0): {'up': {(0, 0): 1.0},
  'down': {(1, 0): 1.0},
  'left': {(0, 0): 1.0},
  'right': {(0, 1): 1.0}},
 (0, 1): {'up': {(0, 1): 1.0},
  'down': {(1, 1): 1.0},
  'left': {(0, 0): 1.0},
  'right': {(0, 2): 1.0}},
 (0, 2): {'up': {(0, 2): 1.0},
  'down': {(0, 2): 1.0},
  'left': {(0, 2): 1.0},
  'right': {(0, 2): 1.0}},
 (1, 0): {'up': {(0, 0): 1.0},
  'down': {(1, 0): 1.0},
  'left': {(1, 0): 1.0},
  'right': {(1, 1): 1.0}},
 (1, 1): {'up': {(0, 1): 1.0},
  'down': {(1, 1): 1.0},
  'left': {(1, 0): 1.0},
  'right': {(1, 2): 1.0}},
 (1, 2): {'up': {(0, 2): 1.0},
  'down': {(1, 2): 1.0},
  'left': {(1, 1): 1.0},
  'right': {(1, 2): 1.0}},
 (3, 0): {'up': {(3, 0): 1.0},
  'down': {(4, 0): 1.0},
  'left': {(3, 0): 1.0},
  'right': {(3, 1): 1.0}},
 (3, 1): {'up': {(3, 1): 1.0},
  'down': {(4, 1): 1.0},
  'left': {(3, 0): 1.0},
  'right': {(3, 2): 1.0}},
 (3, 2): {'up': {(3, 2): 1.0},
  'down': {(3, 2): 1.0},
  'left': {(3, 2): 1.0},
  'right': {(3, 2): 1.0}},
 (4, 0): {'up': {(3, 0): 1.0

In [None]:
def compute_on_policy_bisimulation_metric(states, policy, transition_prob, rewards, gamma, epsilon=1e-5):
    """
    Compute the on-policy bisimulation metric for a given MDP and policy.

    Parameters:
    - states: List of states each state is a tuple (x,y).
    - policy: Function π(a | s). It is a matrix of size similar to the grid
    - transition_prob: Function P(s' | s, a).
    - rewards: Function R(s, a).
    - gamma: Discount factor.
    - epsilon: Convergence threshold.

    Returns:
    - d: On-policy bisimulation metric.
    """
    n_states = len(states)
    d = np.zeros((n_states, n_states))

    def distance_update(d):
        new_d = np.zeros_like(d)
        for i, s in enumerate(states):
            for j, t in enumerate(states):
                if i != j:
                    action_s = ACTION2ACTION[policy[s]]
                    action_t = ACTION2ACTION[policy[t]]
                    reward_diff = abs(rewards[s][action_s] - rewards[t][action_t])
                    transition_diff = gamma * np.sum([
                        transition_prob(s_next, s, policy(s)) * transition_prob(t_next, t, policy(t)) * d[states.index(s_next)][states.index(t_next)]
                        for s_next in states for t_next in states
                    ])
                    new_d[i][j] = reward_diff + transition_diff
        return new_d

    while True:
        new_d = distance_update(d)
        if np.max(np.abs(new_d - d)) < epsilon:
            break
        d = new_d

    return d