In [1]:
import gymnasium as gym # type: ignore
from gymnasium import spaces
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import make_interp_spline
from stable_baselines3 import SAC
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import BaseCallback

In [2]:
class MultiArmedBanditEnv(gym.Env):
    def __init__(self, probs):
        """
        初始化多臂老虎机环境
        probs: 每个臂的成功概率
        """
        super(MultiArmedBanditEnv, self).__init__()
        self.k = len(probs)
        self.action_space = spaces.Box(low=1, high=100, shape=(1,), dtype=np.float32)  # 动作空间，连续空间 [1, 100]
        self.observation_space = spaces.Discrete(1)  # 观察空间，单一状态
        self.probs = probs  # 每个臂的成功概率
        self.best_idx = np.argmax(self.probs)  # 最优臂的索引

    def reset(self, seed=None, options=None):
        """
        重置环境
        返回: 初始状态
        """
        super().reset(seed=seed)
        observation = 0  # 示例：返回一个整数
        info = {}  # 示例：返回一个空字典作为信息
        return observation, info

    def step(self, action):
        """
        执行动作，返回结果
        action: 选择的探索因子 Alpha
        返回: (状态, 奖励, 是否结束, 额外信息)
        """
        alpha = action[0]  # 获取探索因子 Alpha
        ucb_agent = UCB(self.k, alpha)  # 使用 Alpha 初始化 UCB 策略
        total_rewards = 0  # 记录总收益
        optimal_counts = 0  # 记录最优臂被选择的次数
        optimal_arm = self.best_idx

        for t in range(1000):
            arm_index = ucb_agent.select_arm(t)  # 选择臂
            reward_temp = 1 if np.random.rand() < self.probs[arm_index] else 0  # 根据概率决定是否获得奖励
            if arm_index == optimal_arm:
                optimal_counts += 1
            total_rewards += reward_temp
            ucb_agent.update(arm_index, reward_temp)  # 更新智能体的状态

        terminated = True  # 每次拉动臂后都结束
        return 0, total_rewards, terminated, False, {}

    def render(self, mode='human'):
        """
        渲染环境（此处不需要实现）
        """
        pass

    def close(self):
        """
        关闭环境（此处不需要实现）
        """
        pass


In [3]:

class UCB:
    def __init__(self, K, Alpha):
        """
        初始化UCB策略
        K: 臂的数目
        alpha: 探索因子
        """
        self.K = K
        self.Alpha = Alpha 
        self.Arm_counts = np.zeros(K)  # 每个臂被拉的次数
        self.Arm_average_reward = np.zeros(K)  # 每个臂的平均收益统计量

    def select_arm(self, t):
        """
        UCB策略选择臂
        t: 当前的时间步
        返回: 所选择的臂的索引
        """
        # 头K次直接选择对应的臂
        if t < self.K:
            return t
        # 计算UCB下的评分
        ucb_values = self.Arm_average_reward + np.sqrt(self.Alpha * np.log(t + 1) / (2 * self.Arm_counts + 1e-5))
        return np.argmax(ucb_values)  # 选择UCB值最大的臂

    def update(self, arm_index, reward_temp):
        """
        更新对应臂的状态
        arm_index: 所选择的臂的索引
        reward_temp: 当前获得的收益
        """
        self.Arm_counts[arm_index] += 1
        n = self.Arm_counts[arm_index]
        value = self.Arm_average_reward[arm_index]
        new_value = ((n - 1) / n) * value + (1 / n) * reward_temp
        self.Arm_average_reward[arm_index] = new_value

In [None]:
# 创建环境
np.random.seed(1)
probs = np.random.rand(10)
formatted_probs = [f"{prob:.4f}" for prob in probs]
print("伯努利多臂老虎机的概率为：", formatted_probs)
env = MultiArmedBanditEnv(probs)
check_env(env)

# 创建模型
model = SAC("MlpPolicy", env, verbose=1)

# 自定义回调函数以记录 alpha 值和奖励
class AlphaCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(AlphaCallback, self).__init__(verbose)
        self.alphas = []
        self.rewards = []

    def _on_step(self) -> bool:
        # 确保 'action' 键存在于 self.locals 字典中
        if 'actions' in self.locals:
            action = self.locals['actions']
            self.alphas.append(action[0])
        # 记录奖励
        if 'rewards' in self.locals:
            reward = self.locals['rewards']
            self.rewards.append(reward)
        return True

# 创建 AlphaCallback 实例
alpha_callback = AlphaCallback()

# 训练模型并使用回调函数记录 alpha 值和奖励
model.learn(total_timesteps=10000, log_interval=4, callback=alpha_callback)

# 保存模型
model.save("sac_multi_armed_bandit")

# 假设 alphas 和 total_rewards 是在训练过程中记录的数据
alphas = alpha_callback.alphas
total_rewards = alpha_callback.rewards

# 创建平滑曲线函数
def smooth_curve(x, y):
    x_new = np.linspace(min(x), max(x), 300)
    spl = make_interp_spline(x, y, k=3)  # B-spline 拟合
    y_smooth = spl(x_new)
    return x_new, y_smooth


# 绘制 total_reward 的实际折线图和拟合的平滑曲线
plt.figure(figsize=(12, 6))

# 绘制 total_reward 的变化曲线
plt.subplot(2, 1, 1)
plt.plot(total_rewards, label='Actual Total Reward', color='orange')
x_smooth, y_smooth = smooth_curve(range(len(total_rewards)), total_rewards)
plt.plot(x_smooth, y_smooth, label='Smoothed Total Reward', color='blue')
plt.xlabel('Train Times')
plt.ylabel('Total Reward')
plt.title('The Change of Total Reward with Training Times')
plt.legend()

# 绘制 alpha 的变化曲线
plt.subplot(2, 1, 2)
plt.plot(alphas, label='Actual Alpha', color='green')
x_smooth, y_smooth = smooth_curve(range(len(alphas)), alphas)
plt.plot(x_smooth, y_smooth, label='Smoothed Alpha', color='red')
plt.xlabel('Train Times')
plt.ylabel('Alpha')
plt.title('The Change of Alpha with Training Times')
plt.legend()

plt.tight_layout()
plt.show()


伯努利多臂老虎机的概率为： ['0.4170', '0.7203', '0.0001', '0.3023', '0.1468', '0.0923', '0.1863', '0.3456', '0.3968', '0.5388']
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1        |
|    ep_rew_mean     | 399      |
| time/              |          |
|    episodes        | 4        |
|    fps             | 199      |
|    time_elapsed    | 0        |
|    total_timesteps | 4        |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1        |
|    ep_rew_mean     | 398      |
| time/              |          |
|    episodes        | 8        |
|    fps             | 161      |
|    time_elapsed    | 0        |
|    total_timesteps | 8        |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1        |
|    ep_rew_mean     | 400      |
| time/              |          |
|    episodes        | 12       |
|    fps             | 150      |
|    time_elapsed    | 0        |
|    total_timesteps | 12       |
--------------

In [176]:
smooth_count = 750  # 平滑数据点的数量
smooth_alphas = alphas[-smooth_count:] # 取最后 smooth_count 个数据点
smooth_total_rewards = total_rewards[-smooth_count:] # 取最后 smooth_count 个数据点
average_smooth_alphas = np.mean(smooth_alphas)  # 计算平均 alpha 值
average_smooth_total_rewards = np.mean(smooth_total_rewards)  # 计算平均奖励
print("最后 %d 个数据点的平均 alpha 值为：" % smooth_count, average_smooth_alphas)
print("最后 %d 个数据点的平均奖励为：" % smooth_count, average_smooth_total_rewards)

最后 750 个数据点的平均 alpha 值为： 10.471509
最后 750 个数据点的平均奖励为： 624.46936
