In [123]:
import numpy as np
import pandas as pd
import seaborn as sns
import random

# Value Iteration

In [124]:
# Initialize thevariables
cols = 4
rows = 3

gamma = 0.9
epsilon = 0.001

p_intended = 0.8
p_slip = 0.1

actions = {
    'up':    (0, -1),
    'down':  (0, 1),
    'left':  (-1, 0),
    'right': (1, 0)
}

walls = [(1,1)]

In [125]:
U = [[0.0 for x in range(cols)] for y in range(rows)]

In [126]:
R = [[-0.04 for x in range(cols)] for y in range(rows)]

In [127]:
# Initialize the Terminal State utilities
U[1][3] = -1
U[0][3] = 1

R[1][3] = -1
R[0][3] = 1
R[1][1] = 0

In [128]:
def in_grid(x, y):
    return 0 <= x < cols and 0 <= y < rows and (x, y) not in walls

In [129]:
def get_next_states(x,y,action):
  perp_action = []
  dx,dy = actions[action]

  ix,iy = dx+x, dy+y

  if not in_grid(ix,iy):
    ix,iy = x,y

  if action in ['up', 'down']:
    perp_actions = ['left', 'right']
  else:
    perp_actions = ['up', 'down']

  perp_states = []
  for perp_action in perp_actions:
    pdx, pdy = actions[perp_action]
    px, py = x + pdx, y + pdy
    if not in_grid(px, py):
        px, py = x, y
    perp_states.append((px, py))

  return [
        (p_intended, ix, iy),
        (p_slip, perp_states[0][0], perp_states[0][1]),
        (p_slip, perp_states[1][0], perp_states[1][1])
    ]

In [130]:
#Initialize Terminal States

terminal_states = [(3, 0), (3, 1)]

In [131]:
iteration = 0
start_state = (0, 0)
x, y = start_state
while True:
    iteration += 1
    delta = 0
    U_new = [[0 for _ in range(cols)] for _ in range(rows)]

    for y in range(rows):
        for x in range(cols):
            if (x, y) in terminal_states or (x, y) in walls:
                U_new[y][x] = R[y][x]
                continue

            max_utility = float('-inf')
            for action in actions:
                expected_utility = 0
                for (prob, nx, ny) in get_next_states(x, y, action):
                    expected_utility += prob * U[ny][nx]
                max_utility = max(max_utility, expected_utility)

            U_new[y][x] = R[y][x] + gamma * max_utility
            delta = max(delta, abs(U_new[y][x] - U[y][x]))

    U = U_new

    print(f"Iteration {iteration}:")
    for row in U:
        print(["{0:.3f}".format(v) for v in row])
    print()

    if delta < epsilon * (1 - gamma) / gamma:
        break

print("Converged after", iteration, "iterations.")

Iteration 1:
['-0.040', '-0.040', '0.680', '1.000']
['-0.040', '0.000', '-0.040', '-1.000']
['-0.040', '-0.040', '-0.040', '-0.040']

Iteration 2:
['-0.076', '0.442', '0.738', '1.000']
['-0.076', '0.000', '0.356', '-1.000']
['-0.076', '-0.076', '-0.076', '-0.076']

Iteration 3:
['0.265', '0.571', '0.778', '1.000']
['-0.108', '0.000', '0.433', '-1.000']
['-0.108', '-0.108', '0.203', '-0.108']

Iteration 4:
['0.385', '0.623', '0.789', '1.000']
['0.131', '0.000', '0.469', '-1.000']
['-0.138', '0.086', '0.252', '0.006']

Iteration 5:
['0.455', '0.640', '0.793', '1.000']
['0.261', '0.000', '0.480', '-1.000']
['0.050', '0.157', '0.306', '0.052']

Iteration 6:
['0.485', '0.646', '0.795', '1.000']
['0.335', '0.000', '0.484', '-1.000']
['0.166', '0.209', '0.325', '0.095']

Iteration 7:
['0.499', '0.648', '0.795', '1.000']
['0.370', '0.000', '0.486', '-1.000']
['0.235', '0.231', '0.336', '0.112']

Iteration 8:
['0.505', '0.649', '0.795', '1.000']
['0.386', '0.000', '0.486', '-1.000']
['0.268', '

In [132]:
policy = [['' for _ in range(cols)] for _ in range(rows)]

for y in range(rows):
    for x in range(cols):
        if (x, y) in terminal_states:
            policy[y][x] = 'T'
            continue

        best_action = None
        best_value = float('-inf')
        for action in actions:
            expected_utility = 0
            for (prob, nx, ny) in get_next_states(x, y, action):
                expected_utility += prob * U[ny][nx]
            if expected_utility > best_value:
                best_value = expected_utility
                best_action = action

        arrows = {'up': '↑', 'down': '↓', 'left': '←', 'right': '→'}
        policy[y][x] = arrows[best_action]

# Print policy
for row in policy:
    print(row)


['→', '→', '→', 'T']
['↑', '↑', '↑', 'T']
['↑', '→', '↑', '←']


In [133]:
def NextState(s,a):

  new_state = []
  if a in ['up', 'down']:
    perp_actions = ['left', 'right']
  else:
    perp_actions = ['up', 'down']

  r = random.random()
  r_direction = random.random()

  if r<p_intended:
    dx,dy = actions[a]
    px, py = s[1] + dx, s[0] + dy

    if not in_grid(px, py):
      px, py = s[1] , s[0]

  else:
    if r_direction<0.5:
      dx,dy = actions[perp_actions[0]]
      px, py = s[1] + dx, s[0] + dy

      if not in_grid(px, py):
        px, py = s[1] , s[0]

    else:
      dx,dy = actions[perp_actions[1]]
      px, py = s[1] + dx, s[0] + dy

      if not in_grid(px, py):
        px, py = s[1] , s[0]

  return [py,px]

In [134]:
s=[[2,0],[2,0],[1,2],[1,2],[0,2]]
a=['right','up','down','left','left']
results = []
state_action_pairs = list(zip(s, a))

for j in range(0,100,1):
  for state, action in state_action_pairs:
    res = NextState(state,action)
    results.append({
        "State": state,
        "Action": action,
        "Result": res
    })


In [135]:
from collections import defaultdict, Counter
counter = Counter()

# Nested dictionary: (State, Action) -> Counter of Results
counts = defaultdict(Counter)

for record in results:
    sa_key = (tuple(record['State']), record['Action'])
    result_key = tuple(record['Result'])
    counts[sa_key][result_key] += 1

# Display results
for sa, result_counter in counts.items():
    print(f"State: {sa[0]}, Action: {sa[1]}")
    for result, count in result_counter.items():
        print(f"  Result: {result} -> Count: {count}")

State: (2, 0), Action: right
  Result: (2, 1) -> Count: 84
  Result: (1, 0) -> Count: 9
  Result: (2, 0) -> Count: 7
State: (2, 0), Action: up
  Result: (1, 0) -> Count: 85
  Result: (2, 0) -> Count: 9
  Result: (2, 1) -> Count: 6
State: (1, 2), Action: down
  Result: (1, 3) -> Count: 11
  Result: (2, 2) -> Count: 79
  Result: (1, 2) -> Count: 10
State: (1, 2), Action: left
  Result: (2, 2) -> Count: 9
  Result: (1, 2) -> Count: 78
  Result: (0, 2) -> Count: 13
State: (0, 2), Action: left
  Result: (0, 1) -> Count: 77
  Result: (0, 2) -> Count: 16
  Result: (1, 2) -> Count: 7


In [136]:
from collections import defaultdict, Counter
import random

transition_counts = defaultdict(Counter)

num_trials = 5000
start_state = (0,2)

for _ in range(num_trials):
    state = start_state
    while state not in terminal_states:
        action = None
        best_val = float('-inf')
        for a in actions:
            exp_util = 0
            for (prob, nx, ny) in get_next_states(state[0], state[1], a):
                exp_util += prob * U[ny][nx]
            if exp_util > best_val:
                best_val = exp_util
                action = a

        next_state = tuple(NextState([state[1], state[0]], action))
        next_state = (next_state[1], next_state[0])

        transition_counts[(state, action)][next_state] += 1

        state = next_state

transition_probs = {}
for sa, counter in transition_counts.items():
    total = sum(counter.values())
    transition_probs[sa] = {s_next: count/total for s_next, count in counter.items()}

print("P((2,1) | (1,1), Up) =", transition_probs.get(((0,2),'up'), {}).get((0,1), 0))
print("P((1,3) | (1,2), Up) =", transition_probs.get(((0,1),'up'), {}).get((0,0), 0))
print("P((2,3) | (1,3), Right) =", transition_probs.get(((0,0),'right'), {}).get((1,0), 0))
print("P((3,3) | (2,3), Up) =", transition_probs.get(((1,0),'up'), {}).get((2,0), 0))
print("P((2,1) | (3,2), Right) =", transition_probs.get(((2,1),'right'), {}).get((1,2), 0))


P((2,1) | (1,1), Up) = 0.8020833333333334
P((1,3) | (1,2), Up) = 0.8007004138809296
P((2,3) | (1,3), Right) = 0.799928353931578
P((3,3) | (2,3), Up) = 0
P((2,1) | (3,2), Right) = 0


In [137]:
# -------------------------------
# ADP with GLIE Utility Estimation
# -------------------------------

# f(u,n) = 2 if n <= 5, else u
def f(u, n):
    return 2 if n <= 5 else u

# Initialize utilities for all states
U_glie = {(x,y): 0.0 for y in range(rows) for x in range(cols) if (x,y) not in walls}

converged = False
while not converged:
    delta = 0
    U_new = {}
    for (x,y) in U_glie:
        # Terminal states take immediate reward
        if (x,y) in terminal_states:
            U_new[(x,y)] = R[y][x]
            continue

        best_val = float('-inf')
        for a in actions:
            sa = ((x,y), a)
            if sa not in transition_probs:
                continue
            exp_util = sum(prob * U_glie[s_next] for s_next, prob in transition_probs[sa].items())
            n_sa = sum(transition_counts[sa].values())  # N(s,a)
            best_val = max(best_val, f(exp_util, n_sa))

        U_new[(x,y)] = R[y][x] + gamma * best_val
        delta = max(delta, abs(U_new[(x,y)] - U_glie[(x,y)]))
    U_glie = U_new

    if delta < epsilon:
        converged = True

# Print final utility table in Table 4 format
print("\nTable 4: Estimated Utilities (GLIE ADP)")
for y in reversed(range(rows)):  # print top row first
    row = []
    for x in range(cols):
        if (x,y) in walls:
            row.append("WALL")
        elif (x,y) in terminal_states:
            row.append(f"{R[y][x]:.2f}")
        else:
            row.append(f"{U_glie[(x,y)]:.3f}")
    print(row)



Table 4: Estimated Utilities (GLIE ADP)
['0.295', '0.258', '0.350', '0.201']
['0.397', 'WALL', '0.491', '-1.00']
['0.507', '0.647', '0.795', '1.00']


In [138]:
# -------------------------------------------------
# Compute Optimal Policy π* from Learned Utilities
# -------------------------------------------------
policy_opt = [['' for _ in range(cols)] for _ in range(rows)]

for y in range(rows):
    for x in range(cols):
        if (x, y) in terminal_states:
            policy_opt[y][x] = 'T'
            continue
        if (x, y) in walls:
            policy_opt[y][x] = 'WALL'
            continue

        best_action = None
        best_value = float('-inf')
        for action in actions:
            exp_util = 0
            for (prob, nx, ny) in get_next_states(x, y, action):
                exp_util += prob * U[ny][nx]
            if exp_util > best_value:
                best_value = exp_util
                best_action = action

        policy_opt[y][x] = best_action

# Print nicely
print("\nOptimal Policy (π*):")
for row in reversed(policy_opt):  # reverse so Y=3 is top
    print(row)



Optimal Policy (π*):
['up', 'right', 'up', 'left']
['up', 'WALL', 'up', 'T']
['right', 'right', 'right', 'T']
