<a href="https://colab.research.google.com/github/aslestia/ACS_2025/blob/main/Week7_CVaR_PG_CliffWalking.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Week 7 Practice 1 — CVaR Policy Gradient on CliffWalking

Tujuan:
- Membandingkan **REINFORCE (risk‑neutral)** vs **CVaR‑Policy‑Gradient (risk‑averse)**.
- Metrik evaluasi: mean return, VaR$_\alpha$, CVaR$_\alpha$, dan frekuensi jatuh (cliff).

> Catatan: Notebook ini menggunakan *environment* CliffWalking sederhana yang diimplementasikan manual agar dapat berjalan tanpa instalasi eksternal.


In [None]:

import math, random, statistics, numpy as np
import matplotlib.pyplot as plt
from typing import Tuple, List, Dict
np.random.seed(42)
random.seed(42)

# ====== Utility: VaR and CVaR (empirical) ======
def var_cvar(samples: np.ndarray, alpha: float=0.95):
    samples = np.asarray(samples).copy()
    samples.sort()
    # Assuming larger value = better return (rewards). For tail risk of losses,
    # flip sign accordingly. Here we work with returns, so we compute lower-tail VaR.
    q_idx = int(math.floor(alpha * (len(samples)-1)))
    var = samples[q_idx]
    tail = samples[:q_idx+1]  # lower tail up to VaR (risk of poor returns)
    cvar = tail.mean() if len(tail) else var
    return var, cvar

# ====== Simple CliffWalking Environment (gridworld) ======
# Grid (rows x cols); start at (rows-1,0), goal at (rows-1, cols-1)
# Cells between start and goal on bottom row are CLIFF (terminal with large negative reward)
class CliffWalking:
    def __init__(self, rows=4, cols=12, step_reward=-1.0, cliff_reward=-100.0, goal_reward=0.0, max_steps=200):
        self.rows, self.cols = rows, cols
        self.start = (rows-1, 0)
        self.goal = (rows-1, cols-1)
        self.step_reward = step_reward
        self.cliff_reward = cliff_reward
        self.goal_reward = goal_reward
        self.max_steps = max_steps
        self.reset()

    def reset(self):
        self.s = self.start
        self.t = 0
        return self.state_id(self.s)

    def state_id(self, pos: Tuple[int,int]):
        r,c = pos
        return r*self.cols + c

    @property
    def nS(self): return self.rows*self.cols
    @property
    def nA(self): return 4  # up, right, down, left

    def step(self, a: int):
        r,c = self.s
        if a==0: r = max(0, r-1)
        elif a==1: c = min(self.cols-1, c+1)
        elif a==2: r = min(self.rows-1, r+1)
        elif a==3: c = max(0, c-1)

        self.t += 1
        # Cliff check
        if r==self.rows-1 and 0 < c < self.cols-1:
            # Fell into cliff: terminal
            self.s = self.start
            return self.state_id(self.s), self.cliff_reward, True, {"fell": True}
        # Goal check
        if (r,c)==self.goal:
            self.s = (r,c)
            return self.state_id(self.s), self.goal_reward, True, {"fell": False}
        # Normal step
        self.s = (r,c)
        done = self.t >= self.max_steps
        return self.state_id(self.s), self.step_reward, done, {"fell": False}



## Policy: Softmax tabular
Parameter $\theta \in \mathbb{R}^{|S|\times|A|}$;  
$\pi_\theta(a|s) = \mathrm{softmax}(\theta_{s, :})[a]$.


In [None]:

class SoftmaxPolicy:
    def __init__(self, nS, nA, init_std=0.01):
        self.nS, self.nA = nS, nA
        self.theta = np.random.randn(nS, nA) * init_std

    def probs(self, s):
        z = self.theta[s] - self.theta[s].max()
        exp = np.exp(z)
        return exp/exp.sum()

    def sample(self, s):
        p = self.probs(s)
        return np.random.choice(self.nA, p=p)

    def grad_logp(self, s, a):
        p = self.probs(s)
        g = -np.outer(np.ones(self.nA), p).T  # placeholder to keep shape
        # Simpler: grad for softmax logp = onehot(a) - p
        grad = -p
        grad[a] += 1.0
        out = np.zeros_like(self.theta)
        out[s] = grad
        return out



## REINFORCE (Risk‑Neutral) and CVaR‑PG (Risk‑Averse)

- **REINFORCE** update: $\theta \leftarrow \theta + \beta\; \nabla_\theta \log \pi(a|s)\; G$  
- **CVaR surrogate (Rockafellar–Uryasev)** with parameter $\eta$ (approx VaR):  
  $$L_\alpha(\theta,\eta) = \eta + \tfrac{1}{1-\alpha}\, \mathbb{E}\big[(G-\eta)^+\big]$$
  $$\nabla_\theta L = \tfrac{1}{1-\alpha} \mathbb{E}[\nabla\log\pi\; (G-\eta)^+]$$
  $$\nabla_\eta L = 1 - \tfrac{1}{1-\alpha} \Pr(G \ge \eta)$$


In [None]:

def run_episode(env: CliffWalking, policy: SoftmaxPolicy, gamma=1.0):
    s = env.reset()
    states, actions, rewards, fell_flags = [], [], [], []
    done = False
    while not done:
        a = policy.sample(s)
        s2, r, done, info = env.step(a)
        states.append(s); actions.append(a); rewards.append(r); fell_flags.append(info.get("fell", False))
        s = s2
    # return-to-go G (sum of rewards)
    G = sum(rewards[i]* (gamma**i) for i in range(len(rewards)))
    return states, actions, rewards, G, any(fell_flags)

def train_reinforce(episodes=2000, alpha=0.01, gamma=1.0):
    env = CliffWalking()
    pi = SoftmaxPolicy(env.nS, env.nA)
    returns, fell_hist = [], []
    for ep in range(episodes):
        states, actions, rewards, G, fell = run_episode(env, pi, gamma)
        returns.append(G); fell_hist.append(fell)
        # REINFORCE update
        for s,a in zip(states, actions):
            grad = pi.grad_logp(s,a)
            pi.theta += alpha * G * grad
        if (ep+1)%200==0:
            print(f"[REINFORCE] ep {ep+1}, mean G(last200)={np.mean(returns[-200:]):.2f}")
    return pi, np.array(returns), np.array(fell_hist)

def train_cvar_pg(episodes=2000, alpha_theta=0.01, alpha_eta=0.05, gamma=1.0, alpha=0.95):
    env = CliffWalking()
    pi = SoftmaxPolicy(env.nS, env.nA)
    eta = -10.0  # initial VaR proxy
    returns, fell_hist, etas = [], [], []
    for ep in range(episodes):
        states, actions, rewards, G, fell = run_episode(env, pi, gamma)
        returns.append(G); fell_hist.append(fell); etas.append(eta)
        # CVaR-PG update
        tail_term = max(G - eta, 0.0)
        for s,a in zip(states, actions):
            grad = pi.grad_logp(s,a)
            pi.theta += (alpha_theta/(1.0 - alpha)) * tail_term * grad
        # eta update (stochastic ascent on -L -> descent on L)
        # gradient of L wrt eta: 1 - (1/(1-alpha)) * 1{G >= eta}
        indicator = 1.0 if G >= eta else 0.0
        grad_eta = 1.0 - indicator/(1.0 - alpha)
        eta -= alpha_eta * grad_eta  # gradient descent on L
        if (ep+1)%200==0:
            print(f"[CVaR-PG] ep {ep+1}, eta={eta:.2f}, mean G(last200)={np.mean(returns[-200:]):.2f}")
    return pi, np.array(returns), np.array(fell_hist), np.array(etas)


In [None]:

# ====== Train both agents ======
reinforce_pi, r_returns, r_fell = train_reinforce(episodes=1500, alpha=0.02)
cvar_pi, c_returns, c_fell, c_etas = train_cvar_pg(episodes=1500, alpha_theta=0.02, alpha_eta=0.1, alpha=0.95)

def summarize(name, returns, fell, alpha=0.95):
    mean = returns.mean()
    var, cvar = var_cvar(returns, alpha=1-alpha)  # lower-tail (poor returns): use 1-alpha quantile
    print(f"{name}: mean={mean:.2f}, lower-tail VaR_{1-alpha:.2f}={var:.2f}, CVaR_{1-alpha:.2f}={cvar:.2f}, fell_rate={fell.mean():.3f}")

print()
summarize("REINFORCE", r_returns, r_fell, alpha=0.95)
summarize("CVaR-PG", c_returns, c_fell, alpha=0.95)


In [None]:

# ====== Visualization: Returns histogram ======
plt.figure(figsize=(7,4))
plt.hist(r_returns, bins=40, alpha=0.6, label="REINFORCE")
plt.hist(c_returns, bins=40, alpha=0.6, label="CVaR-PG")
plt.axvline(np.mean(r_returns), linestyle="--", label="Mean RN")
plt.axvline(np.mean(c_returns), linestyle="-.", label="Mean CVaR")
plt.title("Distribution of returns")
plt.xlabel("Episode return")
plt.ylabel("Count")
plt.legend()
plt.show()


In [None]:

# ====== Visualization: Eta trajectory (CVaR-PG) ======
plt.figure(figsize=(7,3.5))
plt.plot(c_etas)
plt.title("Eta (VaR proxy) during CVaR-PG training")
plt.xlabel("Episode")
plt.ylabel("eta")
plt.show()
