In [4]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim

class GridWorld:
    def __init__(self):
        self.rows = 4
        self.cols = 4
        self.state = (0, 0)
        self.goal = (3, 3)
        self.actions = [0, 1, 2, 3] # Up, Right, Down, Left

    def reset(self):
        self.state = (0, 0)
        return self.state

    def step(self, action):
        x, y = self.state
        if action == 0:   x = max(0, x - 1)
        elif action == 1: y = min(self.cols - 1, y + 1)
        elif action == 2: x = min(self.rows - 1, x + 1)
        elif action == 3: y = max(0, y - 1)
        
        self.state = (x, y)
        if self.state == self.goal: return self.state, 10, True
        else: return self.state, -1, False

def run_reinforce_advantage():
    env = GridWorld()
    
    # Policy Network
    policy = nn.Sequential(
        nn.Linear(2, 64), 
        nn.ReLU(), 
        nn.Linear(64, 4), 
        nn.Softmax(dim=-1)
    )
    
    # Value Network (Critic)
    value_net = nn.Sequential(
        nn.Linear(2, 64), 
        nn.ReLU(), 
        nn.Linear(64, 1)
    )
    
    opt_p = optim.Adam(policy.parameters(), lr=0.001)
    opt_v = optim.Adam(value_net.parameters(), lr=0.001)
    
    episodes = 1000
    
    print("-----------------------------------")
    print(f"Training REINFORCE (Advantage) for {episodes} episodes...")
    print("-----------------------------------")
    
    for episode in range(episodes):
        state = env.reset()
        traj = []
        done = False
        total_reward = 0
        
        # 1. Collect Trajectory
        while not done:
            state_t = torch.FloatTensor(state)
            
            # Sample Action
            probs = policy(state_t)
            dist = torch.distributions.Categorical(probs)
            action = dist.sample().item()
            
            next_state, reward, done = env.step(action)
            traj.append((state, action, reward))
            state = next_state
            total_reward += reward
            
            if len(traj) > 50: break # Avoid infinite loops early on
            
        # 2. Update Networks
        G = 0
        for s, a, r in reversed(traj):
            G = r + 0.99 * G
            s_t = torch.FloatTensor(s)
            
            # Estimate Value
            val = value_net(s_t)
            
            # Advantage = Return - Value
            advantage = G - val.item()
            
            # Policy Update (Maximize Advantage)
            log_prob = torch.log(policy(s_t)[a])
            p_loss = -log_prob * advantage
            
            # Value Update (Minimize Prediction Error)
            v_loss = (val - G).pow(2)
            
            opt_p.zero_grad(); p_loss.backward(); opt_p.step()
            opt_v.zero_grad(); v_loss.backward(); opt_v.step()

        # Log Progress
        if (episode + 1) % 100 == 0:
            print(f"Episode {episode+1}: Total Reward = {total_reward}, Trajectory Length = {len(traj)}")

    print("\nTraining Finished.")

    # --- TESTING PHASE ---
    print("\n-----------------------------------")
    print("Testing Learned Policy (Greedy Mode)")
    print("-----------------------------------")
    
    state = env.reset()
    path = [state]
    done = False
    steps = 0
    
    while not done and steps < 20:
        state_t = torch.FloatTensor(state)
        with torch.no_grad():
            probs = policy(state_t)
            # Greedy Action
            action = torch.argmax(probs).item()
            
        state, _, done = env.step(action)
        path.append(state)
        steps += 1
        
    print("Final Path Taken:", path)
    if path[-1] == (3,3):
        print("RESULT: SUCCESS - Goal Reached!")
    else:
        print("RESULT: FAILED - Did not reach goal.")

if __name__ == "__main__":
    run_reinforce_advantage()

-----------------------------------
Training REINFORCE (Advantage) for 1000 episodes...
-----------------------------------
Episode 100: Total Reward = 4, Trajectory Length = 7
Episode 200: Total Reward = 5, Trajectory Length = 6
Episode 300: Total Reward = 5, Trajectory Length = 6
Episode 400: Total Reward = 5, Trajectory Length = 6
Episode 500: Total Reward = 5, Trajectory Length = 6
Episode 600: Total Reward = 5, Trajectory Length = 6
Episode 700: Total Reward = 5, Trajectory Length = 6
Episode 800: Total Reward = 5, Trajectory Length = 6
Episode 900: Total Reward = 5, Trajectory Length = 6
Episode 1000: Total Reward = 4, Trajectory Length = 7

Training Finished.

-----------------------------------
Testing Learned Policy (Greedy Mode)
-----------------------------------
Final Path Taken: [(0, 0), (0, 1), (1, 1), (1, 2), (2, 2), (3, 2), (3, 3)]
RESULT: SUCCESS - Goal Reached!


In [3]:
# exp11_reinforce_advantage.py

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

class GridWorld4x4:
    def __init__(self, start_state=0, goal_state=15, max_steps=50):
        self.n_rows = 4
        self.n_cols = 4
        self.n_states = self.n_rows * self.n_cols
        self.n_actions = 4
        self.start_state = start_state
        self.goal_state = goal_state
        self.max_steps = max_steps

    def state_to_xy(self, s):
        return (s // self.n_cols, s % self.n_cols)

    def xy_to_state(self, r, c):
        return r * self.n_cols + c

    def reset(self):
        self.state = self.start_state
        self.steps = 0
        return self.state

    def step(self, action):
        r, c = self.state_to_xy(self.state)
        if action == 0:
            r = max(0, r - 1)
        elif action == 1:
            c = min(self.n_cols - 1, c + 1)
        elif action == 2:
            r = min(self.n_rows - 1, r + 1)
        elif action == 3:
            c = max(0, c - 1)
        ns = self.xy_to_state(r, c)
        self.state = ns
        self.steps += 1
        done = (ns == self.goal_state) or (self.steps >= self.max_steps)
        reward = 0 if ns == self.goal_state else -1
        return ns, reward, done, {}

class PolicyNet(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, action_dim)
        )
    def forward(self, x):
        return torch.softmax(self.fc(x), dim=-1)

class ValueNet(nn.Module):
    def __init__(self, state_dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
    def forward(self, x):
        return self.fc(x).squeeze(-1)

def one_hot_state(s, n_states):
    v = np.zeros(n_states, dtype=np.float32)
    v[s] = 1.0
    return v

def reinforce_advantage(env, episodes=2000, gamma=1.0, lr=1e-3):
    device = torch.device("cpu")
    policy = PolicyNet(env.n_states, env.n_actions).to(device)
    value = ValueNet(env.n_states).to(device)
    p_opt = optim.Adam(policy.parameters(), lr=lr)
    v_opt = optim.Adam(value.parameters(), lr=lr)

    for ep in range(episodes):
        s = env.reset()
        states = []
        actions = []
        rewards = []
        done = False
        while not done:
            x = torch.tensor(one_hot_state(s, env.n_states), dtype=torch.float32)
            probs = policy(x)
            dist = torch.distributions.Categorical(probs)
            a = dist.sample().item()
            ns, r, done, _ = env.step(a)
            states.append(x)
            actions.append(a)
            rewards.append(r)
            s = ns

        G = 0.0
        returns = []
        for r in reversed(rewards):
            G = r + gamma * G
            returns.insert(0, G)
        returns = torch.tensor(returns, dtype=torch.float32)
        states_t = torch.stack(states)
        actions_t = torch.tensor(actions, dtype=torch.int64)

        # critic: fit V(s) to returns
        v_opt.zero_grad()
        values = value(states_t)
        loss_v = nn.MSELoss()(values, returns)
        loss_v.backward()
        v_opt.step()

        # advantages
        with torch.no_grad():
            values_detached = value(states_t)
        advantages = returns - values_detached

        # actor loss
        p_opt.zero_grad()
        probs_all = policy(states_t)
        log_probs = torch.log(probs_all + 1e-8)
        selected_log_probs = log_probs[range(len(actions)), actions_t]
        loss_p = -(selected_log_probs * advantages).mean()
        loss_p.backward()
        p_opt.step()

        if (ep + 1) % 200 == 0:
            print(f"Episode {ep+1}, mean return {returns.mean().item():.2f}")

    # derive greedy policy
    policy_table = np.zeros(env.n_states, dtype=int)
    for s in range(env.n_states):
        x = torch.tensor(one_hot_state(s, env.n_states), dtype=torch.float32)
        with torch.no_grad():
            probs = policy(x)
        policy_table[s] = int(torch.argmax(probs).item())
    return policy_table

if __name__ == "__main__":
    env = GridWorld4x4()
    pi = reinforce_advantage(env, episodes=1500)
    print("REINFORCE with advantage policy (0:U,1:R,2:D,3:L):")
    print(pi.reshape(4, 4))


Episode 200, mean return -10.50
Episode 400, mean return -2.50
Episode 600, mean return -3.00
Episode 800, mean return -2.50
Episode 1000, mean return -2.50
Episode 1200, mean return -2.50
Episode 1400, mean return -2.50
REINFORCE with advantage policy (0:U,1:R,2:D,3:L):
[[1 2 2 2]
 [1 2 2 2]
 [1 1 2 2]
 [1 1 1 2]]
