In [2]:
import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import plotly.express as px
import plotly.graph_objs as go
import numpy as np
import pandas as pd
from dash.exceptions import PreventUpdate

In [3]:
# 定义 MAB 类
class MAB:
    def __init__(self, config, reward_data_manager, load_reward_method, data_type, algorithm_type='epsilon_greedy'):
        self.config = config
        self.reward_data_manager = reward_data_manager
        self.load_reward_method = load_reward_method  # 'reward_0' or 'reward_1'
        self.data_type = data_type  # 'iid' or 'ar1'
        self.algorithm_type = algorithm_type  # 'epsilon_greedy' or 'adaptive_epsilon_greedy'

        # 参数初始化
        self.lambda_load = self.config['lambda_load']  # 加载奖励的权重
        self.top_k = self.config['top_k']  # Top-k 准确率的 k 值
        self.N = self.config['N']  # 节点数量
        self.T_test = self.config['T_test']  # 测试时间步数

        # 加载数据
        self.iid_load_reward_0 = reward_data_manager['iid_load_reward_0']
        self.iid_load_reward_1 = reward_data_manager['iid_load_reward_1']
        self.iid_latency_reward_1 = reward_data_manager['iid_latency_reward_1']
        self.ar1_load_reward_0 = reward_data_manager['ar1_load_reward_0']
        self.ar1_load_reward_1 = reward_data_manager['ar1_load_reward_1']
        self.ar1_latency_reward_1 = reward_data_manager['ar1_latency_reward_1']

        # 定义映射字典
        self.reward_mapping = {
            ('iid', 'reward_0'): (self.iid_load_reward_0, self.iid_latency_reward_1),
            ('iid', 'reward_1'): (self.iid_load_reward_1, self.iid_latency_reward_1),
            ('ar1', 'reward_0'): (self.ar1_load_reward_0, self.ar1_latency_reward_1),
            ('ar1', 'reward_1'): (self.ar1_load_reward_1, self.ar1_latency_reward_1),
        }

        try:
            self.load_reward, self.latency_reward = self.reward_mapping[(self.data_type, self.load_reward_method)]
        except KeyError:
            raise ValueError(f'Invalid load_reward_method: {self.load_reward_method}, data_type: {self.data_type}')

        # 计算组合奖励
        self.combine_reward = self.lambda_load * self.load_reward + (1 - self.lambda_load) * self.latency_reward
        self.combine_reward_mean = np.mean(self.combine_reward, axis=1)
        self.combine_reward_optimal_node = np.argmax(self.combine_reward_mean)
        self.combine_reward_optimal_mean = np.max(self.combine_reward_mean)
        self.combine_reward_sorted_mean = np.argsort(self.combine_reward_mean)[::-1]

    def calculate_top_k_accuracy(self, time_counts, k_list):
        top_k_accuracy = {}
        T = len(time_counts)

        for k in k_list:
            correct_count = 0
            optimal_nodes = self.combine_reward_sorted_mean[:k]  # 选择前 k 个最佳节点
            for t in range(T):
                if time_counts[t] in optimal_nodes:
                    correct_count += 1
            accuracy = correct_count / T
            top_k_accuracy[k] = accuracy

        return top_k_accuracy

    def run_epsilon_greedy(self, epsilon):
        def choose_node(epsilon, estimated_means):
            if np.random.rand() < epsilon:
                return np.random.randint(self.N)
            else:
                return np.argmax(estimated_means)

        estimated_means = np.zeros(self.N)
        time_counts = np.zeros(self.T_test)
        nodes_counts = np.zeros(self.N)
        single_step_regret = np.zeros(self.T_test)

        for t in range(self.T_test):
            chosen_node = choose_node(epsilon, estimated_means)
            time_counts[t] = chosen_node
            nodes_counts[chosen_node] += 1
            reward = self.combine_reward[chosen_node, t]
            estimated_means[chosen_node] += (reward - estimated_means[chosen_node]) / nodes_counts[chosen_node]
            single_step_regret[t] = self.combine_reward_optimal_mean - reward

        cumulative_regret = np.cumsum(single_step_regret)
        top_k_accuracy = self.calculate_top_k_accuracy(time_counts, [self.top_k])
        return time_counts, nodes_counts, single_step_regret, cumulative_regret, top_k_accuracy

# 初始化 Dash 应用
app = dash.Dash(__name__)

# 模拟的配置和数据管理器
config = {
    'lambda_load': 0.5,
    'top_k': 3,
    'N': 10,
    'T_test': 1000
}

reward_data_manager = {
    'iid_load_reward_0': np.random.rand(10, 1000),
    'iid_load_reward_1': np.random.rand(10, 1000),
    'iid_latency_reward_1': np.random.rand(10, 1000),
    'ar1_load_reward_0': np.random.rand(10, 1000),
    'ar1_load_reward_1': np.random.rand(10, 1000),
    'ar1_latency_reward_1': np.random.rand(10, 1000)
}

mab_instance = MAB(config, reward_data_manager, 'reward_0', 'iid')

# 设置应用的布局
app.layout = html.Div([
    html.H1("MAB Algorithm Visualization"),
    dcc.Dropdown(
        id='algorithm-type',
        options=[
            {'label': 'Epsilon-Greedy', 'value': 'epsilon_greedy'}
        ],
        value='epsilon_greedy',
        style={'width': '50%'}
    ),
    dcc.Slider(
        id='epsilon-slider',
        min=0.01,
        max=1.0,
        step=0.01,
        value=0.1,
        marks={i: f'{i:.2f}' for i in np.arange(0.0, 1.1, 0.1)}
    ),
    dcc.Graph(id='algorithm-graph'),
])

# 定义回调函数
@app.callback(
    Output('algorithm-graph', 'figure'),
    [Input('epsilon-slider', 'value')]
)
def update_graph(epsilon):
    time_counts, nodes_counts, single_step_regret, cumulative_regret, top_k_accuracy = mab_instance.run_epsilon_greedy(epsilon)

    # 绘制累积遗憾图
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=np.arange(len(cumulative_regret)),
        y=cumulative_regret,
        mode='lines',
        name='Cumulative Regret'
    ))

    fig.update_layout(
        title=f'Epsilon-Greedy Algorithm with ε={epsilon:.2f}',
        xaxis_title='Time Step',
        yaxis_title='Cumulative Regret'
    )

    return fig

# 运行应用
if __name__ == '__main__':
    app.run_server(debug=True)


OSError: Address 'http://127.0.0.1:8050' already in use.
    Try passing a different port to run_server.