In [2]:
import numpy as np

In [3]:
n = 5 # dimension of the matrix
states = [(r,c) for r in range(n) for c in range(n)] # States: (row, col)

# Actions: Up, Down, Left, Right
actions = ["U","D","L","R"]
action_effects = {"U": (-1, 0), "D":(1,0), "L":(0,-1), "R": (0,1) }

# -10 is hazard, +10 is goal
rewards = {(1,0): -10,
           (1,2): -10,
           (0,2): -10,
           (3,1): -10,
           (3,3): -10,
           (3,4): -10,
           (4,4): +10}

In [4]:
#Visualize the rewards of each state

# Create a list of rewards corresponding to the states list
rewards_list = [rewards.get(state, 0) for state in states]

def print_states_as_matrix(states, rows, cols, data_list):
    matrix = [[None for _ in range(cols)] for _ in range(rows)] # Initialize with None
    for i, (r, c) in enumerate(states):
        matrix[r][c] = data_list[i]

    # Format and print the matrix for better alignment
    for row in matrix:
        # Adjust formatting to accommodate tuples (states)
        formatted_row = [f"{str(cell): >4}" for cell in row] # Use a larger width for states
        print(" ".join(formatted_row))

print("Rewards Matrix:")
print_states_as_matrix(states, n, n, rewards_list)

print("\nStates Matrix:")
print_states_as_matrix(states, n, n, states) # Print states as well

Rewards Matrix:
   0    0  -10    0    0
 -10    0  -10    0    0
   0    0    0    0    0
   0  -10    0  -10  -10
   0    0    0    0   10

States Matrix:
(0, 0) (0, 1) (0, 2) (0, 3) (0, 4)
(1, 0) (1, 1) (1, 2) (1, 3) (1, 4)
(2, 0) (2, 1) (2, 2) (2, 3) (2, 4)
(3, 0) (3, 1) (3, 2) (3, 3) (3, 4)
(4, 0) (4, 1) (4, 2) (4, 3) (4, 4)


In [5]:
# Transition function: P(s'|s, a)

def transition(state, action):
  if state in rewards.keys():  # Terminal states
    return state # terminate

  # Intende move (80% chance)
  dr, dc = action_effects[action]
  test_state = (state[0] + dr, state[1] + dc)
  if test_state in states:
    new_state = test_state
  else:
    new_state = state

  # Slipping sideways (20% chance)
  if np.random.random() < 0.2:
    slip_action = np.random.choice([a for a in actions if a != action])
    dr, dc = action_effects[slip_action]
    test_state = (state[0] + dr, state[1] + dc)
    if test_state in states:
      new_state = test_state
    else:
      new_state = state

  return new_state

In [6]:
# Simulate an episode
def simulate(policy=None, initial_state=(0,0), max_steps=100):
  # Initialize the agent's state and track its path
  state = initial_state
  path = [state]
  total_reward = 0

  for _ in range(max_steps):
    # Check if current state is terminal
    if state in rewards:
      total_reward += rewards[state]
      break #terminate

    # Default random policy if none provided
    if policy is None:
        action = np.random.choice(actions)
    else:
        action = policy.get(state, np.random.choice(actions))

    # Execute action and transition to next state
    state = transition(state, action)
    path.append(state)
    total_reward -= 1  # Step cost

  return path, total_reward

In [None]:
# Random policy
path, reward = simulate()
print(f"Path: {path}, Total Reward: {reward}")

Path: [(0, 0), (0, 0), (0, 1), (1, 1), (0, 1), (1, 1), (1, 0)], Total Reward: -16


In [None]:
# Custom policy (always go Right then Down)
custom_policy = {
    (0, 0): "R", (0, 1): "D", (0, 2): "D",
    (1, 0): "R", (1, 1): "R", (1, 2): "D",
    (2, 0): "R", (2, 1): "R",
}
path, reward = simulate(policy=custom_policy)
print(f"Path: {path}, Total Reward: {reward}")

Path: [(0, 0), (0, 0), (0, 1), (1, 1), (1, 2)], Total Reward: -14


In [None]:
# Another costum policy that i tried to make it as good as possible manually
custom_policy = {
    (0, 0):"R", (0, 1):"D", (0, 2):"R", (0, 3):"D", (0, 4):"D",
    (1, 0):"R", (1, 1):"D", (1, 2):"R", (1, 3):"D", (1, 4):"D",
    (2, 0):"D", (2, 1):"R", (2, 2):"D", (2, 3):"L", (2, 4):"L",
    (3, 0):"D", (3, 1):"R", (3, 2):"D", (3, 3):"R", (3, 4):"R",
    (4, 0):"R", (4, 1):"R", (4, 2):"R", (4, 3):"R", (4, 4):"R",
}
path, reward = simulate(policy=custom_policy)
print(f"Path: {path}, Total Reward: {reward}")

Path: [(0, 0), (0, 0), (0, 0), (0, 1), (1, 1), (1, 0)], Total Reward: -15


In [7]:
 #--- Value Iteration (to compute optimal policy) ---
def get_transitions(state, action):
    transitions = []

    # if the current state is a terminal state it stays there
    if state in rewards:
        return [(state, 1.0)]

    # Intended move (80%)
    dr, dc = action_effects[action]
    # The new state is clipped to stay within grid bounds
    new_state = (max(0, min(n-1, state[0] + dr))), (max(0, min(n-1, state[1] + dc)))
    transitions.append((new_state, 0.8))

    # Slip sideways (20%)
    slip_actions = [a for a in actions if a != action]
    for a in slip_actions:
        dr, dc = action_effects[a]
        new_state = (max(0, min(n-1, state[0] + dr))), (max(0, min(n-1, state[1] + dc)))
        #Each slip action has equal probability (0.2 / (num_actions - 1))
        transitions.append((new_state, 0.2 / (len(actions) - 1)))

    return transitions

In [None]:
def value_iteration(gamma=1, theta=1e-6):
    V = {s: 0 for s in states}
    while True:
        delta = 0
        for s in states:
            if s in rewards:
                V[s] = rewards[s]
                continue
            v_old = V[s]
            max_value = -float('inf')
            for a in actions:
                expected_value = 0
                for (s_prime, prob) in get_transitions(s, a):
                    reward = rewards.get(s_prime, -1)  # Default step cost = -1
                    if s_prime in rewards:
                        expected_value += prob * reward
                    else:
                        expected_value += prob * (reward + gamma * V[s_prime])
                if expected_value > max_value:
                    max_value = expected_value
            V[s] = max_value
            delta = max(delta, abs(v_old - V[s]))
        if delta < theta:
            break

    # Extract optimal policy
    policy = {}
    for s in states:
        if s in rewards:
            policy[s] = None  # No action in terminal states
            continue
        best_action = None
        best_value = -float('inf')
        for a in actions:
            expected_value = 0
            for (s_prime, prob) in get_transitions(s, a):
                reward = rewards.get(s_prime, -1)
                expected_value += prob * (reward + gamma * V[s_prime])
            if expected_value > best_value:
                best_value = expected_value
                best_action = a
        policy[s] = best_action
    return V, policy

In [None]:
# --- Run Simulation with Optimal Policy ---
V, optimal_policy = value_iteration()

# Simulate 5 episodes and show paths
for episode in range(5):
    path, reward = simulate(policy=optimal_policy, initial_state=(0, 0))
    print(f"Episode {episode + 1}:")
    print(f"  Path: {path}")
    print(f"  Total Reward: {reward}")
    print(f"{'-'*50}")

# Print optimal policy (for reference)
print("Optimal Policy Grid (Actions) vs. Value Function (V):")
print("-" * 50)
for r in range(n):
    # Print Policy (Actions)
    for c in range(n):
        s = (r, c)
        if s in rewards:
            print(f"{rewards[s]:>5}", end=" ")
        else:
            print(f"{optimal_policy[s]:>5}", end=" ")
    print("   |   ", end="")

    # Print Value Function (V)
    for c in range(n):
        s = (r, c)
        print(f"{V[s]:>7.2f}", end=" ")
    print()  # New line after each row

Episode 1:
  Path: [(0, 0), (0, 0), (0, 0), (1, 0)]
  Total Reward: -13
--------------------------------------------------
Episode 2:
  Path: [(0, 0), (0, 1), (1, 1), (2, 1), (2, 2), (3, 2), (4, 2), (4, 3), (4, 4)]
  Total Reward: 2
--------------------------------------------------
Episode 3:
  Path: [(0, 0), (0, 1), (1, 1), (2, 1), (2, 2), (3, 2), (4, 2), (4, 3), (4, 4)]
  Total Reward: 2
--------------------------------------------------
Episode 4:
  Path: [(0, 0), (0, 1), (1, 1), (2, 1), (2, 2), (3, 2), (4, 2), (4, 1), (4, 2), (3, 2), (4, 2), (4, 3), (4, 4)]
  Total Reward: -2
--------------------------------------------------
Episode 5:
  Path: [(0, 0), (0, 1), (1, 1), (2, 1), (2, 2), (3, 2), (4, 2), (4, 3), (4, 4)]
  Total Reward: 2
--------------------------------------------------
Optimal Policy Grid (Actions) vs. Value Function (V):
--------------------------------------------------
    R     D   -10     D     D    |     -6.85   -5.42  -10.00   -5.16   -5.86 
  -10     D   -10

################

In [None]:
import random
# Simulate environment (unknown dynamics)
def step(state, action):
    if state in rewards:  # Terminal state
        return state, rewards[state], True

    # Determine actual action (with slip)
    if random.random() < 0.8:
        action = action
    else:
        slip_actions = [a for a in actions if a != action]
        action = random.choice(slip_actions)

    # Simulate transition (agent doesn't know this!)
    dr, dc = action_effects[action]
    new_state = (max(0, min(n-1, state[0] + dr)), max(0, min(n-1, state[1] + dc)))
    reward = rewards.get(new_state, -1)  # Default step cost = -1
    done = new_state in rewards
    return new_state, reward, done

In [None]:
# Hyperparameters
alpha = 0.1  # Learning rate
gamma = 1  # Discount factor
epsilon = 0.2  # Exploration rate
episodes = 5000  # Training episodes

# Initialize Q-table
Q = {(s, a): 0 for s in states for a in actions}


for episode in range(episodes):
  state = random.choice(states)
  done = False

  while not done:
    # ε-greedy action selection
    if random.uniform(0, 1) < epsilon:
        action = random.choice(actions)
    else:
        action = max(actions, key=lambda a: Q[(state, a)])

    # Take action, observe next state and reward
    next_state, reward, done = step(state, action)

    # Update Q-table
    best_next_action = max(actions, key=lambda a: Q[(next_state, a)])
    if done:
        td_target = reward
    else:
        td_target = reward + gamma * Q[(next_state, best_next_action)]
    td_error = td_target - Q[(state, action)]
    Q[(state, action)] += alpha * td_error

    state = next_state


In [None]:

# Extract optimal policy from Q-table
policy = {}
for s in states:
    if s in rewards:
        policy[s] = None  # Terminal state
    else:
        policy[s] = max(actions, key=lambda a: Q[(s, a)])

# Print policy
print("Optimal Policy (Learned via Q-Learning):")
for r in range(n):
    for c in range(n):
        s = (r, c)
        if s in rewards:
            print(f"{rewards[s]:>5}", end=" ")
        else:
            print(f"{policy[s]:>5}", end=" ")
    print()
print("-" *50)
print("Optimal Policy (Learned via Q-Learning) and Q-values:")
for r in range(n):
    for c in range(n):
        s = (r, c)
        if s in rewards:  # Terminal state (no action)
            print(f"{rewards[s]:>7}", end=" ")
        else:
            best_action = policy[s]
            best_q = Q[(s, best_action)]
            print(f"{best_q:>7.1f}", end=" ")
    print()  # Newline after each row

Optimal Policy (Learned via Q-Learning):
    R     D   -10     D     D 
  -10     D   -10     D     D 
    D     R     D     L     L 
    D   -10     D   -10   -10 
    R     R     R     R    10 
--------------------------------------------------
Optimal Policy (Learned via Q-Learning) and Q-values:
   -5.6    -4.3     -10    -4.1    -5.2 
    -10    -3.7     -10    -1.8    -3.9 
   -1.5     0.4     2.2     0.7    -2.0 
    1.1     -10     3.8     -10     -10 
    3.0     4.3     5.9     7.3      10 


In [None]:
# Known transition function (used in step())
def known_transition(state, action):
    if state in rewards:  # Terminal state
        return state, rewards[state], True

    # Probabilistic transitions (80% intended, 20% slip)
    if random.random() < 0.8:
        new_state = (max(0, min(n-1, state[0] + action_effects[action][0])),
                    max(0, min(n-1, state[1] + action_effects[action][1])))
    else:
        # Slip sideways (equal probability for other actions)
        slip_actions = [a for a in actions if a != action]
        slip_action = random.choice(slip_actions)
        new_state = (max(0, min(n-1, state[0] + action_effects[slip_action][0])),
                    max(0, min(n-1, state[1] + action_effects[slip_action][1])))

    reward = rewards.get(new_state, -1)  # Default step cost = -1
    done = new_state in rewards
    return new_state, reward, done

In [None]:
# Q-Learning (same as before, but uses known_transition)
for episode in range(episodes):
    state = random.choice(states)  # Random start state
    done = False

    while not done:
        # ε-greedy action selection
        if random.uniform(0, 1) < epsilon:
            action = random.choice(actions)
        else:
            action = max(actions, key=lambda a: Q[(state, a)])

        # Take action using known dynamics
        next_state, reward, done = known_transition(state, action)

        # Update Q-table
        best_next_action = max(actions, key=lambda a: Q[(next_state, a)])
        if done:
            td_target = reward
        else:
            td_target = reward + gamma * Q[(next_state, best_next_action)]
        td_error = td_target - Q[(state, action)]
        Q[(state, action)] += alpha * td_error

        state = next_state

In [None]:
# Extract optimal policy from Q-table
policy = {}
for s in states:
    if s in rewards:
        policy[s] = None  # Terminal state
    else:
        policy[s] = max(actions, key=lambda a: Q[(s, a)])

# Print policy
print("Optimal Policy (Learned via Q-Learning with Known Dynamics):")
for r in range(n):
    for c in range(n):
        s = (r, c)
        if s in rewards:
            print(f"{rewards[s]:>5}", end=" ")
        else:
            print(f"{policy[s]:>5}", end=" ")
    print()

print("-" *50)
print("Optimal Policy (Learned via Q-Learning) and Q-values:")
for r in range(n):
    for c in range(n):
        s = (r, c)
        if s in rewards:  # Terminal state (no action)
            print(f"{rewards[s]:>7}", end=" ")
        else:
            best_action = policy[s]
            best_q = Q[(s, best_action)]
            print(f"{best_q:>7.1f}", end=" ")
    print()  # Newline after each row

Optimal Policy (Learned via Q-Learning with Known Dynamics):
    R     D   -10     D     D 
  -10     D   -10     D     D 
    D     R     D     L     L 
    D   -10     D   -10   -10 
    R     R     R     R    10 
--------------------------------------------------
Optimal Policy (Learned via Q-Learning) and Q-values:
   -5.8    -4.9     -10    -4.5    -5.8 
    -10    -3.5     -10    -3.2    -4.0 
   -1.4    -2.4     0.3    -1.6    -3.6 
    0.0     -10     2.0     -10     -10 
    1.7     4.0     5.7     8.7      10 


################# Q-value Iteration

In [9]:
# Transition function (known dynamics)
def get_transitions(state, action):
    transitions = []
    if state in rewards:  # Terminal state
        return [(state, 1.0, rewards[state])]

    # Intended move (80%)
    dr, dc = action_effects[action]
    new_state = (max(0, min(n-1, state[0] + dr)), max(0, min(n-1, state[1] + dc)))
    transitions.append((new_state, 0.8, rewards.get(new_state, -1)))

    # Slip sideways (20% split between other actions)
    slip_actions = [a for a in actions if a != action]
    for a in slip_actions:
        dr, dc = action_effects[a]
        new_state = (max(0, min(n-1, state[0] + dr)), max(0, min(n-1, state[1] + dc)))
        transitions.append((new_state, 0.2 / len(slip_actions), rewards.get(new_state, -1)))

    return transitions

In [14]:

# Q-Value Iteration
def q_value_iteration(gamma=0.9, theta=1e-6):
    Q = {(s, a): 0 for s in states for a in actions}
    while True:
        delta = 0
        new_Q = {}
        for s in states:
            for a in actions:
                q_old = Q[(s, a)]
                q_new = 0
                for (s_prime, prob, r) in get_transitions(s, a):
                    if s_prime in rewards:
                        q_new += prob * r
                    else:
                        q_new += prob * (r + gamma * max(Q[(s_prime, a_prime)] for a_prime in actions))
                new_Q[(s, a)] = q_new
                delta = max(delta, abs(q_old - new_Q[(s, a)]))
        Q = new_Q
        if delta < theta:
            break
    return Q

# Run Q-Value Iteration
Q = q_value_iteration()

# Extract optimal policy
policy = {}
for s in states:
    if s in rewards:
        policy[s] = None  # Terminal state
    else:
        policy[s] = max(actions, key=lambda a: Q[(s, a)])


In [15]:
# Print policy
print("Optimal Policy (Q-Value Iteration):")
for r in range(n):
    for c in range(n):
        s = (r, c)
        if s in rewards:
            print(f"{rewards[s]:>5}", end=" ")
        else:
            print(f"{policy[s]:>5}", end=" ")
    print()
print("-" *50)
print("Optimal Policy (Learned via Q-Learning) and Q-values:")
for r in range(n):
    for c in range(n):
        s = (r, c)
        if s in rewards:  # Terminal state (no action)
            print(f"{rewards[s]:>7}", end=" ")
        else:
            best_action = policy[s]
            best_q = Q[(s, best_action)]
            print(f"{best_q:>7.1f}", end=" ")
    print()  # Newline after each row

Optimal Policy (Q-Value Iteration):
    R     D   -10     D     D 
  -10     D   -10     D     D 
    D     R     D     L     L 
    D   -10     D   -10   -10 
    R     R     R     R    10 
--------------------------------------------------
Optimal Policy (Learned via Q-Learning) and Q-values:
   -6.3    -5.5     -10    -5.2    -5.4 
    -10    -4.4     -10    -4.1    -4.8 
   -2.9    -2.6    -0.8    -2.6    -4.0 
   -1.3     -10     1.6     -10     -10 
    0.8     2.4     5.3     8.0      10 
