# CSCN8020 – Assignment 1 

**Course:** Reinforcement Learning Programming (CSCN8020)  
**Student:** Jahnavi Pakanati
**Student ID:** 9013742

This notebook contains:
- **Problem 1**: MDP design for a pick-and-place robot  



## Problem 1 – Pick-and-Place Robot (MDP Design)

We model the **pick-and-place** task as a **Markov Decision Process (MDP)**.

#### 1) States (S)
A state should capture the **minimum information** needed to choose good actions:
- Robot arm pose (discretized or continuous), e.g., `(x, y, z)` or `(joint1, joint2, joint3, ...)`
- Gripper: `open` / `closed`
- Object status: `on_table` / `in_gripper` / `at_target`
- (Optional) Velocities for smoothness: `vx, vy, vz`

##### 2) Actions (A)
Primitive, low-level actions that the policy can choose:
- Arm motion: `move_up`, `move_down`, `move_left`, `move_right` (optionally `move_forward`, `move_backward`)
- Gripper: `open_gripper`, `close_gripper`

##### 3) Rewards (R)
Shape the behavior to be **fast** and **smooth**:
- `+10` when the object is placed at the target location (`at_target`)
- `-1` per time step to discourage unnecessary movement
- `-5` penalty for dropping the object or collisions

> ***Reasoning:*** This reward encourages successful completion, penalizes wasted motion, and discourages unsafe behaviors. The state and action choices reflect the ***control levers*** the agent has and the **task status** it must track.


# Problem 2 — 2×2 Gridworld (Two Value-Iteration Sweeps)

**Setup**
- **States:** \(s_1=(0,0),\ s_2=(0,1),\ s_3=(1,0),\ s_4=(1,1)\)  
- **Actions:** up, down, left, right  
- **Transitions:** deterministic if valid; **invalid moves keep you in the same state** (\(s' = s\))  
- **Rewards (per state):** \(R(s_1)=5,\ R(s_2)=10,\ R(s_3)=1,\ R(s_4)=2\)  
- **Discount:** \(\gamma=0.9\)  
- **Update rule (Bellman optimality for this state-reward setting):**  
  \[
  V_{k+1}(s)\ \leftarrow\ R(s)\ +\ \gamma\ \max_{a\in A}\ V_k\!\big(s'(s,a)\big)
  \]

---

## Iteration 0 (Initialization)
All zeros:
| State | \(V_0(s)\) |
|---|---|
| \(s_1\) | 0 |
| \(s_2\) | 0 |
| \(s_3\) | 0 |
| \(s_4\) | 0 |

---

## Iteration 1
Neighbors were 0 in Iteration 0, so:
\[
V_1(s) = R(s) + \gamma\cdot 0 = R(s)
\]

| State | Calculation | \(V_1(s)\) |
|---|---|---|
| \(s_1\) | \(5 + 0.9\cdot 0\) | **5** |
| \(s_2\) | \(10 + 0.9\cdot 0\) | **10** |
| \(s_3\) | \(1 + 0.9\cdot 0\) | **1** |
| \(s_4\) | \(2 + 0.9\cdot 0\) | **2** |

---

## Iteration 2
Look one step ahead using \(V_1\). *(Invalid moves allow “stay”.)*

- **\(s_1\)**: up→\(s_1\) (5), left→\(s_1\) (5), right→\(s_2\) (10), down→\(s_3\) (1)  
  \[
  V_2(s_1)=5 + 0.9\cdot \max(5,5,10,1)=5+0.9\cdot 10=\mathbf{14}
  \]
- **\(s_2\)**: up→\(s_2\) (10), right→\(s_2\) (10), left→\(s_1\) (5), down→\(s_4\) (2)  
  \[
  V_2(s_2)=10 + 0.9\cdot \max(10,10,5,2)=10+0.9\cdot 10=\mathbf{19}
  \]
- **\(s_3\)**: left→\(s_3\) (1), down→\(s_3\) (1), up→\(s_1\) (5), right→\(s_4\) (2)  
  \[
  V_2(s_3)=1 + 0.9\cdot \max(1,1,5,2)=1+0.9\cdot 5=\mathbf{5.5}
  \]
- **\(s_4\)**: right→\(s_4\) (2), down→\(s_4\) (2), up→\(s_2\) (10), left→\(s_3\) (1)  
  \[
  V_2(s_4)=2 + 0.9\cdot \max(2,2,10,1)=2+0.9\cdot 10=\mathbf{11}
  \]

**Values after Iteration 2**

| State | \(V_2(s)\) |
|---|---|
| \(s_1\) | **14.0** |
| \(s_2\) | **19.0** |
| \(s_3\) | **5.5** |
| \(s_4\) | **11.0** |

---

## Greedy Policy after Iteration 2
Pick \(a=\arg\max_a \big(R(s)+\gamma V_2(s'(s,a))\big)\).

- \(s_1\): best next is \(s_2\) → **right**  
- \(s_2\): best is to **stay at \(s_2\)** (via an invalid move like up/right)  
- \(s_3\): best next is \(s_1\) → **up**  
- \(s_4\): best next is \(s_2\) → **up**

> **Note:** If exploiting invalid moves to “stay” is **not allowed**, the best *valid* action from \(s_2\) is **left to \(s_1\)** (since \(V_2(s_1)=14\) > \(V_2(s_4)=11\)).

---


In [1]:

# Problem 3: 5×5 Gridworld (Value Iteration + In-Place Variant)
# ================================
# Specs (from problem statement):
#   • Goal (terminal): s_goal = (4,4) with reward +10 (episode ends after entering goal)
#   • Grey states: {(2,2), (3,0), (0,4)} with reward −5
#   • All other states: −1
#   • Actions: up, down, left, right (deterministic). Invalid moves keep you in place.
#   • Deliverables: value iteration (standard + in-place), V*, π*, comparison (iterations/time)
#
# Notes:
#   • Reward is applied on ARRIVAL to the next state s′ (common “state reward” convention).
#   • Terminal is absorbing: after you reach GOAL, future rewards are 0 and the state does not change.

from pprint import pprint
from time import perf_counter

# ----- Environment definition -----
H, W = 5, 5
GOAL = (4, 4)
GREY = {(2, 2), (3, 0), (0, 4)}
ACTIONS = {
    "up":    (-1,  0),
    "down":  ( 1,  0),
    "left":  ( 0, -1),
    "right": ( 0,  1),
}
gamma = 0.9  # discount factor (can adjust if needed)

def in_bounds(r, c, H=H, W=W):
    return 0 <= r < H and 0 <= c < W

def is_terminal(s):
    return s == GOAL

def reward(s):
    """Reward for being in state s (applied on arrival)."""
    if s == GOAL:
        return +10
    if s in GREY:
        return -5
    return -1

def step(s, a):
    """Deterministic transition; invalid moves keep you in place.
       Returns (s_next, r), where r is reward on ARRIVAL to s_next.
       Terminal is absorbing: no further reward after reaching GOAL."""
    if is_terminal(s):
        return s, 0.0  # absorbing
    r, c = s
    dr, dc = ACTIONS[a]
    nr, nc = r + dr, c + dc
    if in_bounds(nr, nc):
        s2 = (nr, nc)
    else:
        s2 = s  # bump into wall -> stay
    return s2, reward(s2)

STATES = [(r, c) for r in range(H) for c in range(W)]

# ----- Value Iteration (Standard/Synchronous two-array) -----
def value_iteration_synchronous(theta=1e-8, max_iters=10000):
    """Standard VI with a copy. Stops when max change < theta."""
    V = {s: 0.0 for s in STATES}  # often V(goal)=0; reward is earned upon entering goal
    iters = 0
    while True:
        delta = 0.0
        V_new = V.copy()
        for s in STATES:
            if is_terminal(s):
                V_new[s] = 0.0  # terminal value (no future)
                continue
            best = float("-inf")
            for a in ACTIONS:
                s2, r = step(s, a)
                q = r + gamma * V[s2]
                if q > best:
                    best = q
            delta = max(delta, abs(best - V[s]))
            V_new[s] = best
        V = V_new
        iters += 1
        if delta < theta or iters >= max_iters:
            break
    return V, iters

# ----- Value Iteration (In-Place/Gauss–Seidel) -----
def value_iteration_inplace(theta=1e-8, max_iters=10000):
    """In-place VI: reuses updated values within the same sweep. Typically converges faster."""
    V = {s: 0.0 for s in STATES}
    iters = 0
    while True:
        delta = 0.0
        for s in STATES:
            if is_terminal(s):
                V[s] = 0.0
                continue
            best = float("-inf")
            for a in ACTIONS:
                s2, r = step(s, a)
                q = r + gamma * V[s2]  # reuse newly updated V[s2] when available
                if q > best:
                    best = q
            delta = max(delta, abs(best - V[s]))
            V[s] = best
        iters += 1
        if delta < theta or iters >= max_iters:
            break
    return V, iters

# ----- Greedy Policy from V -----
def greedy_policy(V):
    """π*(s) = argmax_a [ r(s→s′) + γ V(s′) ]"""
    pi = {}
    for s in STATES:
        if is_terminal(s):
            pi[s] = "•"  # terminal marker
            continue
        best_a, best_q = None, float("-inf")
        for a in ACTIONS:
            s2, r = step(s, a)
            q = r + gamma * V[s2]
            if q > best_q:
                best_q, best_a = q, a
        pi[s] = best_a
    return pi

# ----- Pretty printing helpers -----
def print_value_grid(V, title="Value Function"):
    print(f"\n{title}")
    for r in range(H):
        row = [f"{V[(r,c)]:7.3f}" for c in range(W)]
        print(" ".join(row))

def print_policy_grid(pi, title="Policy (greedy)"):
    arrows = {"up":"↑", "down":"↓", "left":"←", "right":"→", "•":"•"}
    print(f"\n{title}")
    for r in range(H):
        row = []
        for c in range(W):
            s = (r, c)
            if s in GREY and s != GOAL:
                row.append("X")  # mark grey cells
            else:
                row.append(arrows.get(pi[s], "?"))
        print(" ".join(row))

def l1_diff(Va, Vb):
    return sum(abs(Va[s] - Vb[s]) for s in STATES)

# ----- Run both methods and compare -----
t0 = perf_counter()
V_sync, it_sync = value_iteration_synchronous(theta=1e-8, max_iters=10000)
t1 = perf_counter()

V_inp, it_inp = value_iteration_inplace(theta=1e-8, max_iters=10000)
t2 = perf_counter()

pi_sync = greedy_policy(V_sync)
pi_inp  = greedy_policy(V_inp)

print_value_grid(V_sync, title=f"Standard VI: V* (converged in {it_sync} sweeps, {t1 - t0:.4f}s)")
print_policy_grid(pi_sync, title="Standard VI: π*")

print_value_grid(V_inp, title=f"In-Place VI: V* (converged in {it_inp} sweeps, {t2 - t1:.4f}s)")
print_policy_grid(pi_inp, title="In-Place VI: π*")

print("\n=== Convergence / Equality Check ===")
print(f"Standard VI sweeps: {it_sync}, time: {t1 - t0:.4f}s")
print(f"In-Place  VI sweeps: {it_inp}, time: {t2 - t1:.4f}s")
print(f"L1 difference between V* (should be ~0): {l1_diff(V_sync, V_inp):.8f}")

# Optional: assert near-equality of value functions
# (use a tolerance since floating point can differ slightly)
tol = 1e-6
if l1_diff(V_sync, V_inp) < tol:
    print("OK: V* from both methods match within tolerance.")
else:
    print("Warning: V* mismatch beyond tolerance.")



Standard VI: V* (converged in 9 sweeps, 0.0018s)
 -0.434   0.629   1.810   3.122   4.580
  0.629   1.810   3.122   4.580   6.200
  1.810   3.122   4.580   6.200   8.000
  3.122   4.580   6.200   8.000  10.000
  4.580   6.200   8.000  10.000   0.000

Standard VI: π*
↓ ↓ ↓ ↓ X
↓ ↓ → ↓ ↓
→ ↓ X ↓ ↓
X ↓ ↓ ↓ ↓
→ → → → •

In-Place VI: V* (converged in 9 sweeps, 0.0013s)
 -0.434   0.629   1.810   3.122   4.580
  0.629   1.810   3.122   4.580   6.200
  1.810   3.122   4.580   6.200   8.000
  3.122   4.580   6.200   8.000  10.000
  4.580   6.200   8.000  10.000   0.000

In-Place VI: π*
↓ ↓ ↓ ↓ X
↓ ↓ → ↓ ↓
→ ↓ X ↓ ↓
X ↓ ↓ ↓ ↓
→ → → → •

=== Convergence / Equality Check ===
Standard VI sweeps: 9, time: 0.0018s
In-Place  VI sweeps: 9, time: 0.0013s
L1 difference between V* (should be ~0): 0.00000000
OK: V* from both methods match within tolerance.


- 5×5 grid: goal gives **+10**, grey cells **−5**, all other steps **−1**.  
- Value Iteration repeatedly sets \(V(s)=\max_a\{\text{reward(next)}+\gamma V(\text{next})\}\), making squares nearer the goal worth more and penalizing grey/long routes.  
- The greedy policy then points along the **shortest safe path** to the goal.  
- **In-place** VI converges faster but ends with the **same \(V^*\) and \(\pi^*\)** as standard VI.


In [2]:

# Gridworld: same 5×5 environment as Problem 3
# ================================
# Goal: Estimate the value function using episodes generated by a fixed behavior policy (random),
#       while the target policy is greedy (off-policy MC control with importance sampling).
#
# Method: Sutton & Barto (MC Off-Policy Control, Weighted IS)
#   - Behavior policy b(a|s): uniform over actions (0.25 each)
#   - Target policy  π(a|s): greedy w.r.t. current Q(s,a) (deterministic)
#   - Backward pass per episode with cumulative weight W
#   - If a_t != π(s_t), break (since π(a_t|s_t)=0 → weight becomes 0)
#
# Outputs:
#   • Estimated V(s) (from Q) and greedy π(s)
#   • Basic stats on episodes/updates
#   • (Optional) comparison with Value Iteration from Problem 3 (same V* expected qualitatively)
# ================================

import random
from collections import defaultdict
from time import perf_counter

# ----- Environment definition (same as Problem 3) -----
H, W = 5, 5
GOAL = (4, 4)
GREY = {(2, 2), (3, 0), (0, 4)}
ACTIONS = ("up", "down", "left", "right")
DELTA = {
    "up":    (-1,  0),
    "down":  ( 1,  0),
    "left":  ( 0, -1),
    "right": ( 0,  1),
}
gamma = 0.9

def in_bounds(r, c):
    return 0 <= r < H and 0 <= c < W

def is_terminal(s):
    return s == GOAL

def reward(s):
    """Reward on arrival to s."""
    if s == GOAL:
        return +10
    if s in GREY:
        return -5
    return -1

def step(s, a):
    """Deterministic step; invalid moves keep you in place. Terminal is absorbing."""
    if is_terminal(s):
        return s, 0.0
    r, c = s
    dr, dc = DELTA[a]
    nr, nc = r + dr, c + dc
    s2 = (nr, nc) if in_bounds(nr, nc) else s
    return s2, reward(s2)

STATES = [(r, c) for r in range(H) for c in range(W)]

# ----- Behavior policy b(a|s): uniform random over 4 actions -----
def b_sample_action(s):
    return random.choice(ACTIONS)

def b_prob(a, s):
    return 1.0 / len(ACTIONS)  # = 0.25

# ----- Target policy π(a|s): greedy w.r.t. Q(s,a) -----
def greedy_action(Q, s):
    """Return argmax_a Q[s,a]. If ties, pick first; if unseen, default to 'up'."""
    if is_terminal(s):
        return None
    best_a, best_q = None, float("-inf")
    for a in ACTIONS:
        q = Q[(s, a)]
        if q > best_q:
            best_q, best_a = q, a
    return best_a

# ----- Generate one episode with behavior policy b -----
def generate_episode_b(max_steps=200):
    """Returns list of (s_t, a_t, r_{t+1}). Start from a random non-terminal state."""
    # Exploring starts: pick any non-terminal uniformly
    s = random.choice([s for s in STATES if not is_terminal(s)])
    episode = []
    steps = 0
    while not is_terminal(s) and steps < max_steps:
        a = b_sample_action(s)
        s2, r = step(s, a)
        episode.append((s, a, r))
        s = s2
        steps += 1
    return episode

# ----- Off-Policy MC Control with (Weighted) Importance Sampling -----
def offpolicy_mc_control(
    num_episodes=20000,
    max_steps_per_ep=200,
    seed=42,
    verbose_every=5000
):
    random.seed(seed)
    # Action-value estimates and cumulative weights
    Q = defaultdict(float)     # Q[(s,a)]
    C = defaultdict(float)     # C[(s,a)] = sum of weights

    updates = 0
    nonzero_weight_episodes = 0
    t_start = perf_counter()

    for ep in range(1, num_episodes + 1):
        episode = generate_episode_b(max_steps=max_steps_per_ep)
        # Compute return G backward
        G = 0.0
        W = 1.0

        # Backward pass
        for t in reversed(range(len(episode))):
            s_t, a_t, r_tp1 = episode[t]
            G = gamma * G + r_tp1

            # Current greedy action under target policy π
            a_star = greedy_action(Q, s_t)
            # If terminal, a_star is None; but we never store terminal steps as s_t (we stop before stepping from terminal)

            # If behavior deviates from target action at this time, break (π(a_t|s_t)=0)
            if a_star is not None and a_t != a_star:
                break

            # Weighted incremental update (Weighted Importance Sampling):
            C[(s_t, a_t)] += W
            # Q <- Q + (W/C) * (G - Q)
            Q[(s_t, a_t)] += (W / C[(s_t, a_t)]) * (G - Q[(s_t, a_t)])
            updates += 1

            # If a_star is None, we are effectively at terminal; no further weighting
            if a_star is None:
                break

            # Update weight: multiply by 1 / b(a_t|s_t) (since π(a_t|s_t)=1 when a_t == a_star)
            W *= 1.0 / b_prob(a_t, s_t)

        if W > 0:
            nonzero_weight_episodes += 1

        if verbose_every and ep % verbose_every == 0:
            print(f"[Episode {ep}/{num_episodes}] updates so far: {updates}")

    elapsed = perf_counter() - t_start
    return Q, updates, nonzero_weight_episodes, elapsed

# ----- Helpers to derive V and π from Q, and to pretty-print -----
def V_from_Q(Q):
    V = {}
    for s in STATES:
        if is_terminal(s):
            V[s] = 0.0
            continue
        V[s] = max(Q[(s, a)] for a in ACTIONS)
    return V

def pi_from_Q(Q):
    pi = {}
    for s in STATES:
        if is_terminal(s):
            pi[s] = "•"
        else:
            pi[s] = greedy_action(Q, s)
    return pi

def print_value_grid(V, title="Value Function (MC Off-Policy)"):
    print(f"\n{title}")
    for r in range(H):
        row = [f"{V[(r,c)]:7.3f}" for c in range(W)]
        print(" ".join(row))

def print_policy_grid(pi, title="Policy (greedy from Q)"):
    arrows = {"up":"↑", "down":"↓", "left":"←", "right":"→", "•":"•"}
    print(f"\n{title}")
    for r in range(H):
        row = []
        for c in range(W):
            s = (r, c)
            if s in GREY and s != GOAL:
                row.append("X")
            else:
                row.append(arrows.get(pi[s], "?"))
        print(" ".join(row))

# ===== Run Off-Policy MC Control =====
if __name__ == "__main__":
    Q, updates, nz_eps, elapsed = offpolicy_mc_control(
        num_episodes=20000,   # increase for smoother results (e.g., 50_000+)
        max_steps_per_ep=200,
        seed=7,
        verbose_every=5000
    )
    V = V_from_Q(Q)
    pi = pi_from_Q(Q)

    print_value_grid(V, "MC Off-Policy (IS): Estimated V(s)")
    print_policy_grid(pi, "MC Off-Policy (IS): Greedy π(s)")

    print("\n=== Stats ===")
    print(f"Episodes processed: 20000")
    print(f"Total weighted updates: {updates}")
    print(f"Episodes with non-zero IS contribution (tail matched π): {nz_eps}")
    print(f"Elapsed time: {elapsed:.3f}s")

    # (Optional) Quick qualitative note:
    print("\nNote: With a deterministic greedy target policy, only the tail of an episode that happens to follow the greedy action contributes (others break).")
    print("Increase episodes for smoother V(s), or switch to per-decision IS / epsilon-greedy target for more coverage if allowed.")


[Episode 5000/20000] updates so far: 1572
[Episode 10000/20000] updates so far: 5956
[Episode 15000/20000] updates so far: 10635
[Episode 20000/20000] updates so far: 15537

MC Off-Policy (IS): Estimated V(s)
 -1.360  -1.318  -1.851   3.067   4.084
 -1.212  -1.450  -1.327   4.542   6.166
 -1.791  -0.902   4.549   6.190   7.985
  3.082   4.546   6.172   7.990  10.000
  1.795   3.109   4.555   3.083   0.000

MC Off-Policy (IS): Greedy π(s)
↓ ↑ ← ↓ X
← → ← ↓ ↓
↑ → X ↓ ↓
X → → → ↓
→ → ↑ ← •

=== Stats ===
Episodes processed: 20000
Total weighted updates: 15537
Episodes with non-zero IS contribution (tail matched π): 20000
Elapsed time: 5.543s

Note: With a deterministic greedy target policy, only the tail of an episode that happens to follow the greedy action contributes (others break).
Increase episodes for smoother V(s), or switch to per-decision IS / epsilon-greedy target for more coverage if allowed.


- Generate episodes with a **random behavior policy**; the **target policy** is greedy.  
- For each episode, compute return \(G\) and weight steps by **importance ratios** \(\pi(a|s)/b(a|s)\); if an action isn’t the greedy one, the weight becomes 0 and we stop.  
- Apply the weighted updates to estimate \(Q(s,a)\), then set \(V(s)=\max_a Q(s,a)\) and the policy to **greedy from \(Q\)**.  
- This is **model-free** (no transition model needed); more episodes ⇒ smoother values, unlike DP which uses known dynamics.
