In [1]:
import numpy as np
from typing import Any
from pydantic import BaseModel, Field
import math
from enum import Enum


class CartPoleMini:
    """Cart Pole 极简版的自行实现"""

    def __init__(self, seed, max_steps=5000) -> None:
        pass

    def reset(self, seed) -> Any:
        pass

    def step(self, action) -> Any:
        pass


class State(BaseModel):
    x: float = Field(default=0, description="小车位置")
    x_dot: float = Field(default=0, description="小车速度")
    theta: float = Field(default=0, description="杆子角度")
    theta_dot: float = Field(default=0, description="杆子角速度")


class Feature(BaseModel):
    x: float = Field(default=0, description="小车位置")
    x2: float = Field(default=0, description="小车位置平方")
    x_dot: float = Field(default=0, description="小车速度")
    x_dot2: float = Field(default=0, description="小车速度平方")
    theta: float = Field(default=0, description="杆子角度")
    theta2: float = Field(default=0, description="杆子角度平方")
    theta_dot: float = Field(default=0, description="杆子角速度")
    theta_dot2: float = Field(default=0, description="杆子角速度平方")
    bias: float = Field(default=1.0, description="偏置项")


class Step(BaseModel):
    state: State
    reward: float = Field(default=1.0, description="奖励")
    done: bool = Field(default=False, description="是否结束，真正的任务完成/失败")
    terminated: bool = Field(
        default=False,
        description="是否终止，比如达到目标，或游戏失败，可能进行重置或下一局等",
    )
    truncated: bool = Field(
        default=False, description="是否截断，人为截断，比如达到最大步数，时间限制等"
    )
    info: dict = Field(default_factory=dict, description="额外信息")


class Action(str, Enum):
    LEFT = "left"
    RIGHT = "right"


class CartPoleConfig(BaseModel):
    state_dim: int = Field(default=len(State().model_dump()), description="状态维度")
    phi_dim: int = Field(
        default=len(Feature().model_dump()), description="手工特征维度"
    )
    n_actions: int = Field(default=len(Action), description="动作个数")
    alpha: float = Field(default=0.01, description="学习率")
    gamma: float = Field(default=0.99, description="折扣因子，未来奖励的当前价值")
    eps_start: float = Field(
        default=1.0,
        description="epsilon-贪婪策略的初始epsilon，初期高探索，快速了解环境和哪些动作好",
    )
    eps_end: float = Field(
        default=0.01,
        description="epsilon-贪婪策略的最终epsilon值，后期低探索，更多利用已学知识",
    )
    eps_steps: int = Field(
        default=5000,
        description="epsilon-贪婪策略的epsilon衰减步数，  从eps_start 线性衰减到 eps_end 需要多少步",
    )
    bias: float = Field(default=1.0, description="手工特征中的偏置项")


class LinearQNet:
    """
    Q(s,a) = w_a^T * phi(s)  （phi: 特征映射）
    这里使用简单的手工特征： [s, s^2, 1] 以提高表达力，但仍保持超小型。
    """

    def __init__(self, cfg: CartPoleConfig, seed=42) -> None:
        pass

    def phi(self, s) -> np.ndarray | Any:
        raise NotImplementedError

    def epsilon(self) -> float:
        raise NotImplementedError

    def q_values(self, s) -> np.ndarray | Any:
        raise NotImplementedError

    def act(self, s) -> Action:
        raise NotImplementedError

    def update_td0(self, s, a, r, s_, done):
        pass

In [2]:
class CartPoleMini(CartPoleMini):
    def __init__(self, seed: int, max_steps=5000) -> None:
        super().__init__(seed, max_steps)
        self.gravity = 9.8  # 重力加速度
        self.cart_mass = 1.0  # 小车质量
        self.pole_mass = 0.1  # 杆子质量
        self.total_mass = self.cart_mass + self.pole_mass  # 总质量
        self.pole_length = 0.5  # 杆子长度的一半
        self.pole_mass_length = (
            self.pole_mass * self.pole_length
        )  # 杆子质量与长度的乘积
        self.force_mag = 1  # 作用在小车上的力的大小
        self.tau = 10e-3  # 1000Hz 采样频率，可以理解为帧率的倒数

        self.theta_threshold_radians: float = (
            30 * 2 * math.pi / 360
        )  # 30度，杆子最大倾角，超过就算失败
        self.x_threshold: float = 5  # 小车最大移动距离，超过就算失败

        self.np_random = np.random.RandomState(seed)  # 环境随机数种子
        self.max_steps = max_steps  # 每个回合的最大步数
        self.state: State = State()  # 环境状态
        self.steps: int = 0  # 当前步数

    def reset(self, seed: int) -> State:
        """重置环境，返回初始状态"""
        if seed is not None:
            self.np_random.seed(seed)

        state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))
        self.state = State(
            x=state[0], x_dot=state[1], theta=state[2], theta_dot=state[3]
        )
        self.steps = 0

        return self.state.model_copy()

    def step(self, action: Action) -> Step:
        """每一步骤的环境交互"""
        x = self.state.x
        x_dot = self.state.x_dot
        theta = self.state.theta
        theta_dot = self.state.theta_dot

        force = self.force_mag if action is Action.RIGHT else -self.force_mag

        cos_theta = math.cos(self.state.theta)
        sin_theta = math.sin(self.state.theta)

        temp = (
            force + self.pole_mass_length * theta_dot**2 * sin_theta
        ) / self.total_mass
        theta_acc = (self.gravity * sin_theta - cos_theta * temp) / (
            self.pole_length
            * (4.0 / 3.0 - self.pole_mass * cos_theta**2 / self.total_mass)
        )

        x_acc = temp - self.pole_mass_length * theta_acc * cos_theta / self.total_mass

        x = x + self.tau * x_dot
        x_dot = x_dot + self.tau * x_acc
        theta = theta + self.tau * theta_dot
        theta_dot = theta_dot + self.tau * theta_acc

        self.state = State(x=x, x_dot=x_dot, theta=theta, theta_dot=theta_dot)
        self.steps += 1

        terminated = bool(
            x < -self.x_threshold
            or x > self.x_threshold
            or theta < -self.theta_threshold_radians
            or theta > self.theta_threshold_radians
        )
        truncated = self.steps >= self.max_steps
        done = terminated or truncated
        if terminated:
            reward = 0.0
        else:
            reward = 1.0

        return Step(
            state=self.state.model_copy(),
            reward=reward,
            done=done,
            terminated=terminated,
            truncated=truncated,
            info={},
        )


# 用经典阈值近似归一化尺度
X_MAX, X_DOT_MAX = 2.4, 3.0  # 3.0 是经验上限
THETA_MAX, THETA_DOT_MAX = 12 * 2 * np.pi / 360, 3.5


class LinearQNet(LinearQNet):
    def __init__(self, cfg: CartPoleConfig, seed=42) -> None:
        super().__init__(cfg, seed)
        self.cfg = cfg
        self.rng = np.random.RandomState(seed)
        self.step = 0

        # 先构造一个“占位状态”，用来推断 phi 维度
        _dummy = State()
        _phi = self._phi_vec(_dummy)  # <-- 统一使用 _phi_vec
        self.phi_dim = _phi.shape[0]  # <-- 自动推断维度
        self.W = self.rng.randn(cfg.n_actions, self.phi_dim) * 0.01

    # === 新增：统一产出 numpy 向量的函数（你可以把缩放/特征工程都放这里）===
    def _phi_vec(self, s: State) -> np.ndarray:
        # 经典阈值做归一化（可按需调整）
        X_MAX, X_DOT_MAX = 2.4, 3.0
        THETA_MAX, THETA_DOT_MAX = 12 * 2 * np.pi / 360, 3.5

        sx = np.clip(s.x / X_MAX, -1, 1)
        sxd = np.clip(s.x_dot / X_DOT_MAX, -1, 1)
        st = np.clip(s.theta / THETA_MAX, -1, 1)
        std = np.clip(s.theta_dot / THETA_DOT_MAX, -1, 1)

        # 与你采用的“8 维特征”保持一致（顺序固定！）
        return np.array(
            [
                sx,
                sxd,
                np.sin(s.theta),
                np.cos(s.theta),
                std,
                sx * sxd,
                st * std,
                self.cfg.bias,  # bias
            ],
            dtype=float,
        )

    # === 仍然保留你写的 Pydantic 特征（如果还用得上），但 q_values 不再依赖它的顺序 ===
    def phi(self, s: State) -> Feature:
        return Feature(
            x=s.x,
            x2=s.x**2,
            x_dot=s.x_dot,
            x_dot2=s.x_dot**2,
            theta=s.theta,
            theta2=s.theta**2,
            theta_dot=s.theta_dot,
            theta_dot2=s.theta_dot**2,
            bias=self.cfg.bias,
        )

    def epsilon(self) -> float:
        t = min(self.step, self.cfg.eps_steps)
        return (
            self.cfg.eps_start
            + t * (self.cfg.eps_end - self.cfg.eps_start) / self.cfg.eps_steps
        )

    def q_values(self, s: State) -> np.ndarray:
        phi_vec = self._phi_vec(s)  # <-- 统一使用同一份特征
        return self.W @ phi_vec

    def act(self, s: State) -> Action:
        self.step += 1
        if self.rng.rand() < self.epsilon():
            return self.rng.choice([Action.LEFT, Action.RIGHT])
        best_a = int(np.argmax(self.q_values(s)))
        return Action.RIGHT if best_a == 1 else Action.LEFT

    def update_td0(self, s: State, a: Action, r: float, s_: State, done: bool):
        phi_vec = self._phi_vec(s)  # <-- 同一份特征
        q_vals = self.q_values(s)
        q_sa = q_vals[1] if a == Action.RIGHT else q_vals[0]

        if done:
            target = r
        else:
            target = r + self.cfg.gamma * np.max(self.q_values(s_))

        td_err = target - q_sa
        a_idx = 1 if a == Action.RIGHT else 0
        self.W[a_idx] += self.cfg.alpha * td_err * phi_vec


from collections import deque


class Replay:
    def __init__(self, cap=50000, rng=None):
        self.buf = deque(maxlen=cap)
        self.rng = np.random.RandomState(0) if rng is None else rng

    def push(self, s, a, r, s_, done):
        self.buf.append((s, a, r, s_, done))

    def can_sample(self, bs):
        return len(self.buf) >= bs

    def sample(self, bs):
        idx = self.rng.choice(len(self.buf), size=bs, replace=False)
        batch = [self.buf[i] for i in idx]
        return batch

In [3]:
from typing import List


def train(
    cfg: CartPoleConfig = CartPoleConfig(), episodes: int = 300
) -> tuple[LinearQNet, List[float]]:
    env = CartPoleMini(seed=42, max_steps=5000)
    agent = LinearQNet(cfg, seed=42)

    # 初始化经验回放
    replay = Replay(cap=10000)  # 设置合适的容量
    batch_size = 32  # 批量大小

    n_episodes = episodes
    rewards: List[float] = []

    for episode in range(n_episodes):
        state = env.reset(seed=episode)
        total_reward = 0

        while True:
            action = agent.act(state)
            step_result = env.step(action)
            state_, reward, done = (
                step_result.state,
                step_result.reward,
                step_result.done,
            )

            # 存储经验到回放 buffer
            replay.push(state, action, reward, state_, done)

            # 从回放 buffer 中采样学习
            if replay.can_sample(batch_size):
                batch = replay.sample(batch_size)
                for s, a, r, s_, d in batch:
                    agent.update_td0(s, a, r, s_, d)  # 使用采样的经验更新

            state = state_
            total_reward += reward

            if done:
                break

        rewards.append(total_reward)
        if (episode + 1) % 10 == 0:
            avg_reward = np.mean(rewards[-10:])
            print(
                f"Episode {episode + 1}, Average Reward: {avg_reward:.2f}, Epsilon: {agent.epsilon():.3f}"
            )

    return agent, rewards

In [4]:
def evaluate(agent: LinearQNet, episodes: int = 20, deterministic=True) -> List[float]:
    rets: List[float] = []

    for ep in range(episodes):
        env = CartPoleMini(seed=ep, max_steps=5000)
        state = env.reset(seed=ep)
        total_reward = 0

        while True:
            if deterministic:
                q_vals = agent.q_values(state)
                best_a = np.argmax(q_vals)
                action = Action.RIGHT if best_a == 1 else Action.LEFT
            else:
                action = agent.act(state)

            step_result = env.step(action)
            state_, reward, done = (
                step_result.state,
                step_result.reward,
                step_result.done,
            )

            state = state_
            total_reward += reward

            if done:
                break

        rets.append(total_reward)

    return rets

In [6]:
env = CartPoleMini(seed=42, max_steps=5000)
cfg = CartPoleConfig(alpha=5e-3, gamma=0.99, eps_steps=2000, eps_end=0.05)
agent = LinearQNet(cfg, seed=0)

print("开始训练（无回放、无目标网络、在线 TD(0)、线性 Q）...")
agent, rewards = train(cfg, 500)

print("评估...")
results = evaluate(agent, 20, deterministic=True)
print(results)

开始训练（无回放、无目标网络、在线 TD(0)、线性 Q）...
Episode 10, Average Reward: 66.90, Epsilon: 0.677
Episode 20, Average Reward: 68.60, Epsilon: 0.347
Episode 30, Average Reward: 65.30, Epsilon: 0.050
Episode 40, Average Reward: 65.80, Epsilon: 0.050
Episode 50, Average Reward: 66.60, Epsilon: 0.050
Episode 60, Average Reward: 69.50, Epsilon: 0.050
Episode 70, Average Reward: 67.30, Epsilon: 0.050
Episode 80, Average Reward: 62.70, Epsilon: 0.050
Episode 90, Average Reward: 66.50, Epsilon: 0.050
Episode 100, Average Reward: 63.90, Epsilon: 0.050
Episode 110, Average Reward: 68.20, Epsilon: 0.050
Episode 120, Average Reward: 65.60, Epsilon: 0.050
Episode 130, Average Reward: 73.20, Epsilon: 0.050
Episode 140, Average Reward: 67.20, Epsilon: 0.050
Episode 150, Average Reward: 73.50, Epsilon: 0.050
Episode 160, Average Reward: 71.30, Epsilon: 0.050
Episode 170, Average Reward: 66.80, Epsilon: 0.050
Episode 180, Average Reward: 66.90, Epsilon: 0.050
Episode 190, Average Reward: 65.40, Epsilon: 0.050
Episode