<a href="https://colab.research.google.com/github/Bosy-Ayman/DSAI-402-RL/blob/main/Quiz_C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [43]:
"""
Nile Quest - MDP Exam Template
==============================
This file defines a 6x6 gridworld MDP environment and the required reinforcement learning
functions for the lab exam. You are expected to implement the TODOs in this file
without using external RL libraries.

Rules:
- Do not use classes.
- You may use numpy only.
- Complete the TODO function(s) as described.
"""

import numpy as np


# -----------------------------
# Environment Setup
# -----------------------------

def make_board():
    """
    Create the 6x6 Nile Quest grid.

    Returns:
        np.ndarray: 6x6 array of characters representing the environment.
                    Legend:
                        S - Start
                        G - Goal
                        H - Hazard (wind)
                        Q - Quicksand (absorbing)
                        W - Water vortex (double-step)
                        R - Rock wall (impassable)
                        . - Empty terrain
    """
    return np.array([
        list("S....G"),
        list(".R..H."),
        list(".Q...."),
        list("..W.R."),
        list("..H..."),
        list("......")
    ])


# Define global constants
ACTIONS = ['U', 'R', 'D', 'L']
ACTION_DELTA = {
    'U': (-1, 0),
    'R': (0, 1),
    'D': (1, 0),
    'L': (0, -1)
}


# -----------------------------
# Helper Functions
# -----------------------------

def in_bounds(state, shape):
    """
    Check whether a state lies inside the grid boundaries.

    Args:
        state (tuple): (row, column)
        shape (tuple): (n_rows, n_cols)

    Returns:
        bool: True if inside grid, False otherwise.
    """
    r, c = state
    nrows, ncols = shape
    return 0 <= r < nrows and 0 <= c < ncols


def move(state, action, board):
    """
    Compute next position from a given state and action.

    Args:
        state (tuple): Current position (row, column)
        action (str): One of ['U', 'R', 'D', 'L']
        board (np.ndarray): Grid environment

    Returns:
        tuple: Next position (row, column) after applying movement.
    """
    dr, dc = ACTION_DELTA[action]
    r, c = state
    nr, nc = r + dr, c + dc
    if not in_bounds((nr, nc), board.shape) or board[nr, nc] == 'R':
        return (r, c)  # stay in place if blocked
    return (nr, nc)


# -----------------------------
# Transition Function
# -----------------------------

def get_transition_probs(state, action, board):
    """
    Get transition probabilities for a given state-action pair.

    Args:
        state (tuple): Current position (row, column)
        action (str): Action taken
        board (np.ndarray): Grid environment

    Returns:
        list[tuple]: List of (probability, next_state) pairs.
    """
    tile = board[state]
    probs = []

    if tile == 'Q':
        # Quicksand is absorbing, but with 2% chance of escape
        probs.append((0.98, state))
        for a in ACTIONS:
            ns = move(state, a, board)
            probs.append((0.02 / len(ACTIONS), ns))
        return probs

    if tile == 'H':
        # Hazard: slips left/right 15% each, intended direction 70%
        slip_actions = {
            'U': ['L', 'R'],
            'D': ['R', 'L'],
            'L': ['D', 'U'],
            'R': ['U', 'D']
        }
        probs.append((0.7, move(state, action, board)))
        probs.append((0.15, move(state, slip_actions[action][0], board)))
        probs.append((0.15, move(state, slip_actions[action][1], board)))
        return probs

    if tile == 'W':
        # Water: 50% chance to double-step, else normal step
        ns1 = move(state, action, board)
        ns2 = move(ns1, action, board)
        probs.append((0.5, ns1))
        probs.append((0.5, ns2))
        return probs

    # Default (S, ., G)
    probs.append((1.0, move(state, action, board)))
    return probs


# -----------------------------
# Reward Function
# -----------------------------

def get_reward(state, next_state, board):
    """
    Compute reward for a transition.

    Args:
        state (tuple): Current state
        next_state (tuple): Next state
        board (np.ndarray): Grid

    Returns:
        float: Reward value
    """
    if board[next_state] == 'G':
        return 1.0
    if board[next_state] == 'Q':
        return -0.2
    return -0.04  # living cost


# -----------------------------
# Policy Evaluation
# -----------------------------

def policy_evaluation(policy, V, board, gamma=0.95, theta=1e-4):
    """
    Evaluate a given policy using iterative policy evaluation.

    Args:
        policy (dict): Mapping from state -> action.
        V (dict): Initial value estimates.
        board (np.ndarray): Grid environment.
        gamma (float): Discount factor.
        theta (float): Convergence threshold.

    Returns:
        dict: Updated value function after convergence.
    """
    while True:
        delta = 0
        for r in range(board.shape[0]):
            for c in range(board.shape[1]):
                s = (r, c)
                if board[s] == 'G':
                    continue
                v = V[s]
                a = policy[s]
                new_v = 0
                for p, ns in get_transition_probs(s, a, board):
                    new_v += p * (get_reward(s, ns, board) + gamma * V[ns])
                V[s] = new_v
                delta = max(delta, abs(v - new_v))
        if delta < theta:
            break
    return V


# -----------------------------
# Policy Improvement
# -----------------------------

def policy_improvement(V, board, gamma=0.95):
    """
    Improve the policy by acting greedily with respect to V.

    Args:
        V (dict): Current value function.
        board (np.ndarray): Grid environment.
        gamma (float): Discount factor.

    Returns:
        tuple: (new_policy, policy_stable)
    """
    policy = {}
    stable = True
    for r in range(board.shape[0]):
        for c in range(board.shape[1]):
            s = (r, c)
            if board[s] == 'G':
                continue
            old_action = policy.get(s, None)
            q_values = []
            for a in ACTIONS:
                q = 0
                for p, ns in get_transition_probs(s, a, board):
                    q += p * (get_reward(s, ns, board) + gamma * V[ns])
                q_values.append(q)
            best_a = ACTIONS[np.argmax(q_values)]
            policy[s] = best_a
            if old_action != best_a:
                stable = False
    return policy, stable


# -----------------------------
# Policy Iteration
# -----------------------------

def policy_iteration(board, gamma=0.95):
    """
    Perform policy iteration to find optimal policy and value function.

    Args:
        board (np.ndarray): Grid.
        gamma (float): Discount factor.

    Returns:
        tuple: (optimal_policy, optimal_value_function)
    """
    policy = {(r, c): np.random.choice(ACTIONS) for r in range(board.shape[0]) for c in range(board.shape[1])}
    V = {(r, c): 0 for r in range(board.shape[0]) for c in range(board.shape[1])}

    while True:
        V = policy_evaluation(policy, V, board, gamma)
        policy, stable = policy_improvement(V, board, gamma)
        if stable:
            break
    return policy, V


# -----------------------------
# Value Iteration (TODO)
# -----------------------------

def value_iteration(board, gamma=0.95, theta=1e-4):
    """
    Perform value iteration to compute the optimal value function and policy.

    Args:
        board (np.ndarray): Grid environment.
        gamma (float): Discount factor.
        theta (float): Convergence threshold.

    Returns:
        tuple: (optimal_value_function, optimal_policy)

    TODO:
        Implement this function by:
        1. Initializing all V(s) = 0.
        2. Iteratively updating V(s) using the Bellman optimality equation:
               V(s) <- max_a Σ [ P(s'|s,a) * (R(s,a,s') + γ * V(s')) ]
        3. Repeat until the change in V(s) < θ.
        4. Derive the optimal policy π*(s) from the final V(s).
    """
    V = {(r, c): 0 for r in range(board.shape[0]) for c in range(board.shape[1])}
    while True:
        delta = 0
        for r in range(board.shape[0]):
            for c in range(board.shape[1]):
                s = (r, c)
                if board[s] == 'G':
                    continue
                v = V[s]
                q_values = []
                q = 0
                for a in ACTIONS:
                    for p, ns in get_transition_probs(s, a, board):
                        q += p * (get_reward(s, ns, board) + gamma * V[ns])
                    q_values.append(q)

                V[s] = max(q_values)
                delta = max(delta, abs(v - V[s]))
        if delta < theta:
            break

        # value optmization
        policy = {}
        for r in range(board.shape[0]):
            for c in range(board.shape[1]):
                s = (r, c)
                if board[s] == 'G':
                    continue
                q_values = []
                for a in ACTIONS:
                    q = 0
                    for p, ns in get_transition_probs(s, a, board):
                        q += p * (get_reward(s, ns, board) + gamma * V[ns])
                    q_values.append(q)
                best_a = ACTIONS[np.argmax(q_values)]
                policy[s] = best_a

    return V,policy

In [44]:
# -----------------------------
# Simulation Functions
# -----------------------------

def simulate_episode(policy, board, start=(0, 0), gamma=0.95, max_steps=100):
    """
    Simulate one episode following the given policy.

    Args:
        policy (dict): Deterministic policy mapping state -> action.
        board (np.ndarray): Environment grid.
        start (tuple): Starting state coordinates.
        gamma (float): Discount factor.
        max_steps (int): Maximum episode length.

    Returns:
        tuple: (total_reward, steps)
               total_reward (float): Discounted cumulative return.
               steps (int): Number of steps taken.
    """
    s = start
    total_reward, step = 0, 0

    for t in range(max_steps):
        if board[s] == 'G':
            break
        a = policy.get(s, np.random.choice(ACTIONS))
        transitions = get_transition_probs(s, a, board)
        probs, next_states = zip(*transitions)
        s_next = next_states[np.random.choice(len(next_states), p=probs)]
        r = get_reward(s, s_next, board)
        total_reward += (gamma ** t) * r
        s = s_next
        step += 1
        if board[s] == 'G':
            break
    return total_reward, step


def run_experiments(policy, board, episodes=50):
    """
    Run multiple simulations and report average results.

    Args:
        policy (dict): Policy to evaluate.
        board (np.ndarray): Environment grid.
        episodes (int): Number of episodes to simulate.

    Returns:
        tuple: (avg_reward, avg_steps)
    """
    rewards, steps = [], []
    for _ in range(episodes):
        r, s = simulate_episode(policy, board)
        rewards.append(r)
        steps.append(s)
    return np.mean(rewards), np.mean(steps)

In [45]:
# Run the full MDP simulation once value_iteration() is implemented

board = make_board()
gamma = 0.95
V, policy = value_iteration(board, gamma)  # <-- you must have implement this

avg_return, avg_steps = run_experiments(policy, board, episodes=100)
print(f"\nAverage discounted return: {avg_return:.4f}")
print(f"Average steps to goal: {avg_steps:.2f}")


Average discounted return: -0.7953
Average steps to goal: 100.00


---