<a href="https://colab.research.google.com/github/Pran9177r/info7375_Self_Improving_Ai/blob/main/week3_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

> IMPORT REQUIRED LIB.

In [1]:
import numpy as np
import random
from collections import defaultdict
import math


In [2]:
class GridworldFigure4_1:

    def __init__(self, shape=(4, 4), gamma=0.9):
        self.shape = shape
        self.H, self.W = shape
        self.states = [(r, c) for r in range(self.H) for c in range(self.W)]
        self.terminal_states = [(0, 0), (self.H - 1, self.W - 1)]
        self.actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        self.action_symbols = ['↑', '↓', '←', '→']
        self.num_actions = len(self.actions)
        self.gamma = gamma
        self.reward = -1.0

    def is_terminal(self, state):
        return state in self.terminal_states

    def start_state(self):
        non_terminals = [s for s in self.states if not self.is_terminal(s)]
        return random.choice(non_terminals)

    def _get_next_state(self, r, c, dr_a, dc_a):

        next_r = max(0, min(r + dr_a, self.H - 1))
        next_c = max(0, min(c + dc_a, self.W - 1))
        return (next_r, next_c)

    def step(self, state, action_idx):

        if self.is_terminal(state): return state, 0.0

        r, c = state
        dr_a, dc_a = self.actions[action_idx]

        intended_state = self._get_next_state(r, c, dr_a, dc_a)

        if random.random() < 0.8:
            next_state = intended_state
        else: # 0.2 probability of staying put
            next_state = state

        return next_state, self.reward

    def get_transition_prob(self, state, action_idx):

        if self.is_terminal(state): return [(1.0, state, 0.0)]

        r, c = state
        dr_a, dc_a = self.actions[action_idx]

        intended_state = self._get_next_state(r, c, dr_a, dc_a)

        final_transitions = defaultdict(float)

        final_transitions[intended_state] += 0.8

        final_transitions[state] += 0.2

        return [(p, s, self.reward) for s, p in final_transitions.items()]





In [3]:

def greedy_policy(Q, state, env):
    q_values = [Q[(state, a)] for a in range(env.num_actions)]
    best_q = np.max(q_values)
    return [a for a, q in enumerate(q_values) if abs(q - best_q) < 1e-6]

def choose_action(Q, state, env, epsilon, policy_type='epsilon-greedy'):
    if random.random() < epsilon and policy_type == 'epsilon-greedy':
        return random.choice(range(env.num_actions))
    else:
        best_actions = greedy_policy(Q, state, env)
        return random.choice(best_actions)

def generate_episode(env, Q, epsilon):
    episode = []; state = env.start_state()
    while not env.is_terminal(state):
        action_idx = choose_action(Q, state, env, epsilon, policy_type='epsilon-greedy')
        next_state, reward = env.step(state, action_idx)
        episode.append((state, action_idx, reward)); state = next_state
    return episode


def dp_control(env, theta=1e-6):
    V = {s: 0.0 for s in env.states}
    while True:
        delta = 0; V_old = V.copy()
        for s in env.states:
            if env.is_terminal(s): continue
            q_values = []
            for a_idx in range(env.num_actions):
                q_sa = sum(prob * (reward + env.gamma * V_old.get(next_s, 0.0))
                           for prob, next_s, reward in env.get_transition_prob(s, a_idx))
                q_values.append(q_sa)
            V[s] = max(q_values); delta = max(delta, abs(V[s] - V_old[s]))
        if delta < theta: break
    pi = {};
    for s in env.states:
        if env.is_terminal(s): pi[s] = 'T'; continue
        q_values = [sum(prob * (reward + env.gamma * V.get(next_s, 0.0))
                        for prob, next_s, reward in env.get_transition_prob(s, a_idx))
                    for a_idx in range(env.num_actions)]
        best_actions = [env.action_symbols[a] for a, q in enumerate(q_values) if q == max(q_values)]
        pi[s] = ' '.join(best_actions)
    return V, pi


def mc_control(env, num_episodes, epsilon, off_policy_type=None):
    Q = defaultdict(float); C = defaultdict(float)
    for _ in range(num_episodes):
        episode = generate_episode(env, Q, epsilon); G = 0; W = 1.0
        for t in reversed(range(len(episode))):
            S, A, R = episode[t]; G = R + env.gamma * G; s_a = (S, A)
            if off_policy_type is None: # ON-POLICY
                if s_a not in [(episode[i][0], episode[i][1]) for i in range(t)]:
                    C[s_a] += 1; Q[s_a] += (G - Q[s_a]) / C[s_a]
            else: # OFF-POLICY (IS)
                best_actions = greedy_policy(Q, S, env); pi_prob = 1.0 / len(best_actions) if A in best_actions else 0.0
                b_prob = epsilon / env.num_actions + (1 - epsilon) * (1.0 / env.num_actions if pi_prob == 0.0 else pi_prob)
                if pi_prob == 0.0: break
                C[s_a] += W
                if off_policy_type == 'weighted': Q[s_a] += W / C[s_a] * (G - Q[s_a])
                else: Q[s_a] += W * (G - Q[s_a]) / C[s_a]
                W = W * (pi_prob / b_prob)
    pi = {};
    for s in env.states:
        if env.is_terminal(s): pi[s] = 'T'; continue
        best_actions = greedy_policy(Q, s, env); pi[s] = ' '.join(env.action_symbols[a] for a in best_actions)
    return Q, pi


def td_control(env, num_episodes, alpha, epsilon, control_type='sarsa'):
    Q = defaultdict(float)
    for _ in range(num_episodes):
        S = env.start_state(); A = choose_action(Q, S, env, epsilon, policy_type='epsilon-greedy')
        while not env.is_terminal(S):
            S_prime, R = env.step(S, A)
            if control_type == 'sarsa': # ON-POLICY
                A_prime = choose_action(Q, S_prime, env, epsilon, policy_type='epsilon-greedy')
                q_next = Q[(S_prime, A_prime)] if not env.is_terminal(S_prime) else 0.0
                target = R + env.gamma * q_next
            else: # OFF-POLICY (Q-Learning)
                if env.is_terminal(S_prime): target = R
                else: max_q_next = max([Q[(S_prime, a)] for a in range(env.num_actions)]); target = R + env.gamma * max_q_next
                A_prime = choose_action(Q, S_prime, env, epsilon, policy_type='epsilon-greedy')
            Q[(S, A)] += alpha * (target - Q[(S, A)]); S = S_prime; A = A_prime
    pi = {};
    for s in env.states:
        if env.is_terminal(s): pi[s] = 'T'; continue
        best_actions = greedy_policy(Q, s, env); pi[s] = ' '.join(env.action_symbols[a] for a in best_actions)
    return Q, pi



def visualize_policy(env, pi, title):
    grid = np.full(env.shape, '', dtype=object)
    for r in range(env.H):
        for c in range(env.W):
            state = (r, c)
            if env.is_terminal(state): grid[r, c] = 'T'
            else: grid[r, c] = pi.get(state, '?').split()[0]
    print(f"\nPolicy: {title}")
    print("-" * 25)
    for row in grid: print("|" + "|".join(f"{item:^5s}" for item in row) + "|")
    print("-" * 25)

def value_iteration_step(env, V_k):

    V_k_plus_1 = V_k.copy()
    pi_k_plus_1 = {}
    for s in env.states:
        if env.is_terminal(s): pi_k_plus_1[s] = 'T'; continue
        q_values = []
        for a_idx in range(env.num_actions):
            q_sa = sum(prob * (reward + env.gamma * V_k.get(next_s, 0.0))
                       for prob, next_s, reward in env.get_transition_prob(s, a_idx))
            q_values.append(q_sa)
        V_k_plus_1[s] = max(q_values)
        best_q = V_k_plus_1[s]
        best_actions = [env.action_symbols[a] for a, q in enumerate(q_values) if abs(q - best_q) < 1e-6]
        pi_k_plus_1[s] = ' '.join(best_actions)
    return V_k_plus_1, pi_k_plus_1

def print_figure_4_1_replication(env, steps_to_show):
    V = {s: 0.0 for s in env.states}


    def format_v(v_dict, k):
        grid = np.full(env.shape, 0.0, dtype=float)
        for (r, c), val in v_dict.items():
            if not env.is_terminal((r, c)): grid[r, c] = val

        print(f"\n$v_k$ for k={k}:")

        if k == 0:
            print(np.around(grid, 0))
        elif k == 1:
            print(np.around(grid, 1))
        elif k <= 3:

            print(np.around(grid, 1))
        else:

            print(np.around(grid, 2))

    def format_pi(pi_dict, k):
        grid = np.full(env.shape, '', dtype=object)
        for r in range(env.H):
            for c in range(env.W):
                state = (r, c)
                if env.is_terminal(state): grid[r, c] = 'T'
                else:
                    actions = pi_dict.get(state, '↑ ↓ ← →').split()
                    grid[r, c] = "".join(actions) if len(actions) <= 1 else 'All'

        print(f"Greedy Policy w.r.t. $v_k$ for k={k}:")
        print(grid)


    V_k, pi_k = V.copy(), {s: '↑ ↓ ← →' for s in env.states if not env.is_terminal(s)}
    pi_k[(0, 0)] = 'T'; pi_k[(3, 3)] = 'T'
    format_v(V_k, 0); format_pi(pi_k, 0); print("-------------------------")


    for k in range(1, 11):
        V_next, pi_next = value_iteration_step(env, V)
        V = V_next

        if k in steps_to_show or k == 10:
            format_v(V, k)
            format_pi(pi_next, k)
            print("-------------------------")

> START MAIN CODE.


In [4]:


if __name__ == '__main__':

    env = GridworldFigure4_1(gamma=0.9)


    NUM_EPISODES = 100000
    ALPHA = 0.1
    EPSILON = 0.1

    print("--- 1. DP CONTROL: Figure 4.1 Replication (Value Iteration) ---")


    print_figure_4_1_replication(env, steps_to_show=[1, 2, 3])

    print("\n\n--- 2. RL CONTROL ALGORITHM RESULTS (Policy Visualizations) ---")


    V_dp_final, pi_dp_final = dp_control(env, theta=1e-6)
    visualize_policy(env, pi_dp_final, "1. DP Control (Optimal Policy)")


    _, pi_mc_on = mc_control(env, NUM_EPISODES, EPSILON, off_policy_type=None)
    visualize_policy(env, pi_mc_on, "2. MC On-Policy Control")


    _, pi_mc_off_unw = mc_control(env, NUM_EPISODES, EPSILON, off_policy_type='unweighted')
    visualize_policy(env, pi_mc_off_unw, "3. MC Off-Policy (Unweighted IS)")


    _, pi_mc_off_w = mc_control(env, NUM_EPISODES, EPSILON, off_policy_type='weighted')
    visualize_policy(env, pi_mc_off_w, "4. MC Off-Policy (Weighted IS)")


    _, pi_sarsa = td_control(env, NUM_EPISODES, ALPHA, EPSILON, control_type='sarsa')
    visualize_policy(env, pi_sarsa, "5. TD(0) On-Policy Control (SARSA)")


    _, pi_qlearn = td_control(env, NUM_EPISODES, ALPHA, EPSILON, control_type='q_learning')
    visualize_policy(env, pi_qlearn, "6. TD(0) Off-Policy Control (Q-Learning)")

--- 1. DP CONTROL: Figure 4.1 Replication (Value Iteration) ---

$v_k$ for k=0:
[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
Greedy Policy w.r.t. $v_k$ for k=0:
[['T' 'All' 'All' 'All']
 ['All' 'All' 'All' 'All']
 ['All' 'All' 'All' 'All']
 ['All' 'All' 'All' 'T']]
-------------------------

$v_k$ for k=1:
[[ 0. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1.  0.]]
Greedy Policy w.r.t. $v_k$ for k=1:
[['T' 'All' 'All' 'All']
 ['All' 'All' 'All' 'All']
 ['All' 'All' 'All' 'All']
 ['All' 'All' 'All' 'T']]
-------------------------

$v_k$ for k=2:
[[ 0.  -1.2 -1.9 -1.9]
 [-1.2 -1.9 -1.9 -1.9]
 [-1.9 -1.9 -1.9 -1.2]
 [-1.9 -1.9 -1.2  0. ]]
Greedy Policy w.r.t. $v_k$ for k=2:
[['T' '←' 'All' 'All']
 ['↑' 'All' 'All' 'All']
 ['All' 'All' 'All' '↓']
 ['All' 'All' '→' 'T']]
-------------------------

$v_k$ for k=3:
[[ 0.  -1.2 -2.2 -2.7]
 [-1.2 -2.2 -2.7 -2.2]
 [-2.2 -2.7 -2.2 -1.2]
 [-2.7 -2.2 -1.2  0. ]]
Greedy Policy w.r.t. $v_k$ for k=3:
[['T' '←' '←' 'All'