In [1]:
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Tuple, Optional, Iterable
import math
import random
import statistics
import itertools


In [2]:
class Environment:
    """
    Abstract Environment.
    - state: representation of the current situation (S_t)
    - action: decision taken by the agent (A_t)
    - reward: scalar feedback from env (R_{t+1})
    - transition function: P(s', r | s, a) -- implemented via step()
    """
    def reset(self) -> Any:
        """Start a new episode: returns initial state s0."""
        raise NotImplementedError

    def step(self, action: Any) -> Tuple[Any, float, bool, Dict]:
        """
        Take action in current state.
        Returns: (next_state, reward, done, info)
        - done indicates episode termination
        - this embodies the transition function and reward function
        """
        raise NotImplementedError

    def action_space(self) -> Iterable[Any]:
    
        """Available actions in the current state (or a fixed set)."""
        raise NotImplementedError


class Agent:
    """
    Abstract Agent mapping states to actions.
    - policy π(a|s) implemented by act(state)
    - may update internal knowledge by observe(...)
    """
    def act(self, state: Any) -> Any:
        raise NotImplementedError

    def observe(self, s: Any, a: Any, r: float, s_next: Any, done: bool):
        """Learning update after each transition."""
        pass



In [3]:
# ======================================
# Section 2: A tiny GridWorld Environment
# ======================================

@dataclass
class GridWorld(Environment):
    """
    GridWorld:
    - Grid of size H x W with walls, terminal states, rewards
    - Deterministic transitions (for simplicity)
    - Episode ends when terminal state is reached or max_steps exceeded
    """
    height: int = 4
    width: int = 4
    start: Tuple[int, int] = (0, 0)
    terminals: Dict[Tuple[int, int], float] = field(default_factory=lambda: {(3, 3): +1.0, (3, 0): -1.0})
    walls: set[Tuple[int, int]] = field(default_factory=set)
    step_reward: float = -0.04  # small penalty to encourage shortest path
    max_steps: int = 100


In [None]:
# Internal runtime state (not part of definition)
#       _state: Tuple[int, int] = field(init=False, default=(0, 0))
      _steps: int = field(init=False, default=0)

      def __post_init__(self):
        self._state = self.start
        self._steps = 0

      def reset(self) -> Tuple[int, int]:
        self._state = self.start
        self._steps = 0
        return self._state

    def in_bounds(self, pos: Tuple[int, int]) -> bool:
        r, c = pos
        return 0 <= r < self.height and 0 <= c < self.width

    def is_terminal(self, pos: Tuple[int, int]) -> bool:
        return pos in self.terminals

    def action_space(self) -> List[str]:
        return ["U", "D", "L", "R"]

    def _move(self, s: Tuple[int, int], a: str) -> Tuple[int, int]:
        r, c = s
        if a == "U":
            nxt = (r - 1, c)
        elif a == "D":
            nxt = (r + 1, c)
        elif a == "L":
            nxt = (r, c - 1)
        elif a == "R":
            nxt = (r, c + 1)
        else:
            raise ValueError("Unknown action")

        # Transition function T(s,a)->s' with constraints
        if not self.in_bounds(nxt) or nxt in self.walls:
            nxt = s  # bump into wall -> stay

        return nxt

    def step(self, action: str) -> Tuple[Tuple[int, int], float, bool, Dict]:
        assert action in self.action_space(), f"Invalid action {action}"
        if self.is_terminal(self._state):
            # If already terminal, keep returning terminal
            r = self.terminals[self._state]
            return self._state, r, True, {"reason": "already_terminal"}

        s = self._state
        s_next = self._move(s, action)
        self._steps += 1

        # Reward function R(s,a,s')
        r = self.step_reward
        done = False
        if self.is_terminal(s_next):
            r = self.terminals[s_next]
            done = True
        elif self._steps >= self.max_steps:
            done = True  # safety cap on episode length

        self._state = s_next
        return s_next, r, done, {}


# ===================================
# Section 3: A simple Q-learning Agent
# ===================================

@dataclass
class QLearningAgent(Agent):
    """
    Tabular Q-learning:
    Q(s,a) <- Q(s,a) + α [r + γ max_a' Q(s',a') - Q(s,a)]
    """
    actions: List[Any]
    alpha: float = 0.5          # learning rate
    gamma: float = 0.99         # discount factor γ (present value of future rewards)
    epsilon: float = 0.1        # ε-greedy exploration
    q: Dict[Any, Dict[Any, float]] = field(default_factory=dict)
    rng: random.Random = field(default_factory=random.Random)

    def _ensure_state(self, s: Any):
        if s not in self.q:
            self.q[s] = {a: 0.0 for a in self.actions}

    def act(self, state: Any) -> Any:
        self._ensure_state(state)
        # ε-greedy policy π(a|s)
        if self.rng.random() < self.epsilon:
            return self.rng.choice(self.actions)
        # greedy action with tie-breaking
        a_vals = self.q[state]
        max_q = max(a_vals.values())
        best = [a for a, v in a_vals.items() if v == max_q]
        return self.rng.choice(best)

    def observe(self, s: Any, a: Any, r: float, s_next: Any, done: bool):
        self._ensure_state(s)
        self._ensure_state(s_next)
        max_next = 0.0 if done else max(self.q[s_next].values())
        td_target = r + self.gamma * max_next
        td_error = td_target - self.q[s][a]
        self.q[s][a] += self.alpha * td_error


# =========================================
# Section 4: Running an Episode (loop driver)
# =========================================

def run_episode(env: Environment, agent: Agent, render: bool = False, max_steps: Optional[int] = None) -> float:
    """
    One full episode:
      s0 = env.reset()
      repeat:
        a = agent.act(s_t)
        s_{t+1}, r_{t+1}, done = env.step(a)
        agent.observe(...)
      until done
    Returns total discounted return G_0 = sum_t γ^t R_{t+1} (computed here for illustration).
    """
    s = env.reset()
    total_return = 0.0
    gamma = getattr(agent, "gamma", 1.0)  # discount from agent if available
    g_pow = 1.0
    steps = 0

    while True:
        a = agent.act(s)
        s_next, r, done, info = env.step(a)
        agent.observe(s, a, r, s_next, done)

        total_return += g_pow * r
        g_pow *= gamma
        s = s_next
        steps += 1

        if render:
            print(f"step={steps:02d} s={s} a={a} r={r:+.2f} done={done}")

        if done or (max_steps is not None and steps >= max_steps):
            break

    return total_return


# ======================================================
# Section 5: Multi-Armed Bandits (stateless RL subproblem)
# ======================================================

class BanditEnv:
    """
    Stationary K-armed bandit with Bernoulli rewards.
    No state transitions (single-state MDP), only actions (arms) and rewards.
    """

    def __init__(self, probs: List[float], rng: Optional[random.Random] = None):
        self.probs = probs
        self.k = len(probs)
        self.rng = rng or random.Random()

    def pull(self, arm: int) -> int:
        """Return 1 with probability p_arm, else 0."""
        return 1 if self.rng.random() < self.probs[arm] else 0


class EpsilonGreedyBanditAgent:
    """
    ε-greedy for bandits:
    - Estimates: Q[a] = average reward of arm a so far
    - With probability ε explore, else exploit best Q
    """

    def __init__(self, k: int, epsilon: float = 0.1, rng: Optional[random.Random] = None):
        self.k = k
        self.epsilon = epsilon
        self.counts = [0] * k
        self.values = [0.0] * k
        self.rng = rng or random.Random()

    def act(self) -> int:
        if self.rng.random() < self.epsilon:
            return self.rng.randrange(self.k)
        max_q = max(self.values)
        best = [i for i, v in enumerate(self.values) if v == max_q]
        return self.rng.choice(best)

    def observe(self, arm: int, reward: float):
        self.counts[arm] += 1
        # incremental mean update
        n = self.counts[arm]
        self.values[arm] += (reward - self.values[arm]) / n


class UCBBanditAgent:
    """
    UCB1 (Upper Confidence Bound):
      Select arm i maximizing: Q[i] + sqrt( (2 ln t) / N[i] )
      Encourages exploration based on uncertainty
    """
    def __init__(self, k: int):
        self.k = k
        self.counts = [0] * k
        self.values = [0.0] * k
        self.t = 0

    def act(self) -> int:
        self.t += 1
        # play each arm once initially
        for i in range(self.k):
            if self.counts[i] == 0:
                return i
        ucb_scores = [
            self.values[i] + math.sqrt(2.0 * math.log(self.t) / self.counts[i])
            for i in range(self.k)
        ]
        max_u = max(ucb_scores)
        best = [i for i, u in enumerate(ucb_scores) if u == max_u]
        return random.choice(best)

    def observe(self, arm: int, reward: float):
        self.counts[arm] += 1
        n = self.counts[arm]
        self.values[arm] += (reward - self.values[arm]) / n


class ThompsonBernoulliBanditAgent:
    """
    Thompson Sampling for Bernoulli bandits:
      Maintain Beta(α, β) posterior per arm; sample θ ~ Beta(α, β) and play argmax θ
    """
    def __init__(self, k: int, prior_alpha: float = 1.0, prior_beta: float = 1.0, rng: Optional[random.Random] = None):
        self.k = k
        self.alpha = [prior_alpha] * k
        self.beta = [prior_beta] * k
        self.rng = rng or random.Random()

    def act(self) -> int:
        samples = [random.betavariate(self.alpha[i], self.beta[i]) for i in range(self.k)]
        max_s = max(samples)
        best = [i for i, s in enumerate(samples) if s == max_s]
        return self.rng.choice(best)

    def observe(self, arm: int, reward: int):
        # reward must be 0/1 for Bernoulli
        if reward == 1:
            self.alpha[arm] += 1
        else:
            self.beta[arm] += 1


def run_bandit(env: BanditEnv, agent, steps: int = 1000) -> Dict[str, Any]:
    """
    Simulate bandit interaction and collect stats.
    Returns dict with total reward, average reward, arm counts, estimated values.
    """
    total = 0
    rewards = []
    arm_counts = [0] * env.k

    for t in range(steps):
        arm = agent.act()
        r = env.pull(arm)
        total += r
        rewards.append(r)
        arm_counts[arm] += 1
        agent.observe(arm, r)

    return {
        "total_reward": total,
        "avg_reward": sum(rewards) / len(rewards),
        "arm_counts": arm_counts,
        "estimates": getattr(agent, "values", None),
        "alpha": getattr(agent, "alpha", None),
        "beta": getattr(agent, "beta", None),
    }


# =====================================================
# Section 6: Demo runners (safe to run as __main__)
# =====================================================

def demo_gridworld_q_learning(episodes: int = 500):
    print("=== GridWorld + Q-learning (episodic) ===")
    env = GridWorld(height=4, width=4, start=(0, 0),
                    terminals={(3, 3): +1.0, (3, 0): -1.0}, step_reward=-0.04)
    agent = QLearningAgent(actions=env.action_space(), alpha=0.5, gamma=0.99, epsilon=0.1)
    returns = []
    for ep in range(1, episodes + 1):
        G = run_episode(env, agent, render=False)
        returns.append(G)
        if ep % (episodes // 5) == 0:
            print(f"Episode {ep:4d} | running avg return (last 50): {statistics.mean(returns[-50:]):+.3f}")

    # Show learned Q-values for a few states
    print("\nSample Q-values:")
    for s in [(0,0), (0,1), (1,1), (2,2), (3,2)]:
        if s in agent.q:
            print(s, agent.q[s])

def demo_bandits():
    print("\n=== Bandits: ε-greedy vs UCB1 vs Thompson (Bernoulli) ===")
    true_ps = [0.1, 0.2, 0.5, 0.3]
    env = BanditEnv(true_ps)

    steps = 5000
    eg = EpsilonGreedyBanditAgent(k=len(true_ps), epsilon=0.1)
    ucb = UCBBanditAgent(k=len(true_ps))
    ts = ThompsonBernoulliBanditAgent(k=len(true_ps))

    res_eg = run_bandit(env, eg, steps)
    res_ucb = run_bandit(env, ucb, steps)
    res_ts = run_bandit(env, ts, steps)

    def pretty(name, res):
        print(f"{name:10s} | total={res['total_reward']:4d}  avg={res['avg_reward']:.3f}  pulls={res['arm_counts']}")

    pretty("epsilon-g", res_eg)
    pretty("ucb1",      res_ucb)
    pretty("thompson",  res_ts)

if __name__ == "__main__":
    # Run both demos
    demo_gridworld_q_learning(episodes=600)
    demo_bandits()



IndentationError: unexpected indent (2489153160.py, line 2)