In [None]:
from bdi_test.sf import load_graph
import random

# 定义OD对
od_pairs = [
    (14, 4), (14, 5), (15, 6), (15, 8),  # 更多的OD对可以根据需求添加
    (22, 9), (22, 10), (23, 11), (23, 16)
]
G, pos_coord, pos_xy,sioux_falls_df = load_graph()

In [None]:
import networkx as nx


def find_all_paths(graph, origin, destination, k=None):
    """找到从origin到destination的所有可行路径，或前k条最短路径"""
    if k is not None:
        # 使用K条最短路径
        return list(nx.shortest_simple_paths(graph, origin, destination, weight='length'))[:k]
    else:
        # 找到所有简单路径
        return list(nx.all_simple_paths(graph, origin, destination))

def precompute_path_combinations(graph, od_pairs, k=None):
    """预先计算指定OD对的所有可行路径组合"""
    path_combinations = {}

    for origin, destination in od_pairs:
        paths = find_all_paths(graph, origin, destination, k)
        path_combinations[(origin, destination)] = paths

    return path_combinations

# 将 MultiDiGraph 转换为 DiGraph
G_simple = nx.DiGraph(G)

# 继续使用 G_simple 进行路径计算
path_combinations = precompute_path_combinations(G_simple, od_pairs)

In [None]:
path_combinations[(14, 4)]

In [None]:
exploration_rate = 0.01  # 探索行为的概率
memory_level = 0.8  # ψ: 旅行者的记忆水平，控制权重衰减
class BDI_Agent:
    next_id = 0
    def __init__(self, learning_rate,initial_aspiration, paths, origin, destination):
        self.id = BDI_Agent.next_id
        BDI_Agent.next_id += 1
        self.beliefs = {path: 1 / len(paths) for path in paths}  # 初始化为均匀概率分布
        self.intention = random.choice(list(self.beliefs.keys()))  # 初始化随机选择一条路径
        self.travel_history = {p: [] for p in paths}  # 记录每条路径的旅行时间历史
        self.learning_rate = learning_rate  # 学习速率，控制信念更新的速度
        self.aspiration = initial_aspiration  # 初始化全局 Aspiration
        self.origin = origin  # 分配的起点
        self.destination = destination  # 分配的终点
        self.current_location = origin  # 当前的位置，开始时在起点

    def calculate_stimulus(self, PT, current_path):
        """根据全局 Aspiration 和 PT 计算刺激值"""
        diff = self.aspiration - PT  # 使用全局 Aspiration 计算差异

        if diff >= 0:
            # 正面反馈：当前路径表现优于预期，计算最大可能的收益
            max_benefit = max([self.aspiration - self.calculate_PT(p) for p in paths if p != current_path])
            stimulus = diff / max(max_benefit, 1e-10)  # 防止除以零
        else:
            # 负面反馈：当前路径表现不如预期，计算最小可能的损失
            min_loss = min([self.aspiration - self.calculate_PT(p) for p in paths if p != current_path])
            stimulus = diff / max(abs(min_loss), 1e-10)  # 防止除以零

        return stimulus

    def calculate_PT(self, path):
        """计算指定路径的感知旅行时间（PT）"""
        times = self.travel_history[path]
        if times:
            # 使用记忆衰减权重计算感知旅行时间
            weights = [memory_level ** (len(times) - 1 - j) for j in range(len(times))]
            PT = sum(t * w for t, w in zip(times, weights)) / sum(weights)
        else:
            # 如果没有历史数据，使用路径的自由流时间作为PT
            PT = paths[path]['free_flow_time']
        return PT

    def update_aspiration(self, PT):
        """基于最近的 PT 动态更新全局 Aspiration"""
        self.aspiration = self.aspiration + self.learning_rate * (PT - self.aspiration)

    def update_probabilities(self, stimulus):
        for path in self.beliefs:
            if path == self.intention:
                if stimulus >= 0:
                    self.beliefs[path] += (1 - self.beliefs[path]) * self.learning_rate * stimulus
                else:
                    self.beliefs[path] -= self.beliefs[path] * self.learning_rate * abs(stimulus)
            else:
                self.beliefs[path] -= self.learning_rate * abs(stimulus) * self.beliefs[path]

            # 避免信念值为负
            self.beliefs[path] = max(0, self.beliefs[path])

        # 确保信念总和为正
        total_belief = sum(self.beliefs.values())
        if total_belief == 0:
            # 如果总和为零，重新分配信念值
            self.beliefs = {path: 1 / len(paths) for path in self.beliefs}
            total_belief = 1

        # 确保概率总和为1
        for path in self.beliefs:
            self.beliefs[path] /= total_belief

    def choose_path(self):
        # 引入探索行为
        if random.random() < exploration_rate:
            self.intention = random.choice(list(self.beliefs.keys()))
        else:
            self.intention = random.choices(list(self.beliefs.keys()), weights=list(self.beliefs.values()))[0]
        return self.intention
    
    def calculate_total_travel_time(self, graph, path_edges):
        """计算一条路径的总旅行时间"""
        total_travel_time = 0
        for edge in path_edges:
            u, v, k = edge
            if graph.has_edge(u, v, k):
                flow = graph[u][v][k].get('flow', 0)
                capacity = graph[u][v][k].get('capacity', 1)  # 避免除零错误
                free_flow_time = graph[u][v][k].get('free_flow_time', 0)  # 默认值为0
                travel_time = free_flow_time * (1 + 0.15 * (flow / capacity) ** 4)
                total_travel_time += travel_time
            else:
                print(f"Warning: Edge {edge} not found in the graph.")
                total_travel_time += float('inf')  # 使用无穷大表示不可达路径
        return total_travel_time

    def move_to_destination(self, graph):
        """执行一次OD旅行"""
        chosen_path = self.choose_path()
        path_edges = self.paths[chosen_path]
        total_travel_time = self.calculate_total_travel_time(graph, path_edges)

        # 更新当前路径的旅行历史
        self.travel_history[chosen_path].append(total_travel_time)
        
        # 更新信念和aspiration
        PT = self.calculate_PT(chosen_path)
        stimulus = self.calculate_stimulus(PT, chosen_path)
        self.update_probabilities(stimulus)
        self.update_aspiration(PT)

        # 移动到目的地
        self.current_location = self.destination
        
        # 回到起点，为下一次旅行做准备
        self.reset_to_origin()

        return chosen_path, total_travel_time

    
    def reset_to_origin(self):
        """回到起点"""
        self.current_location = self.origin

In [None]:
# 初始化路径和Agent
paths = list(sioux_falls_df['edge'])

# 初始化多个Agent
agents = [BDI_Agent(learning_rate=0.1, initial_aspiration=10, paths=paths, 
                    origin=origin, destination=destination) 
          for origin, destination in od_pairs]

# 模拟多个Episode
for episode in range(10):
    for agent in agents:
        chosen_path, total_travel_time = agent.move_to_destination(G)
        
        # 更新路径上的流量数据
        for edge in paths[chosen_path]:
            u, v, k = edge
            G[u][v][k]['flow'] = G[u][v][k].get('flow', 0) + 1


In [None]:
def simulate_agents_movement(agents, graph):
    # 1. 总结 Agent 的路径选择
    path_flows = {path: 0 for path in graph.edges(keys=True)}
    agent_choices = []

    for agent in agents:
        chosen_path = agent.choose_path()
        path_edges = agent.paths[chosen_path]
        agent_choices.append((agent, chosen_path, path_edges))

        # 统计每条路径上的流量
        for edge in path_edges:
            path_flows[edge] += 1

    # 2. 计算 BPR 函数并更新路径的旅行时间
    for edge, flow in path_flows.items():
        u, v, k = edge
        capacity = graph[u][v][k].get('capacity', 1)
        free_flow_time = graph[u][v][k].get('free_flow_time', 0)
        
        # 使用BPR函数计算实际通行时间
        travel_time = free_flow_time * (1 + 0.15 * (flow / capacity) ** 4)
        graph[u][v][k]['travel_time'] = travel_time

    # 3. 计算每个 Agent 的行走时间，并更新其信念和aspiration
    for agent, chosen_path, path_edges in agent_choices:
        total_travel_time = agent.calculate_total_travel_time(graph, path_edges)
        
        # 更新Agent的旅行历史
        agent.travel_history[chosen_path].append(total_travel_time)
        
        # 计算PT和刺激值，更新信念和aspiration
        PT = agent.calculate_PT(chosen_path)
        stimulus = agent.calculate_stimulus(PT, chosen_path)
        agent.update_probabilities(stimulus)
        agent.update_aspiration(PT)

        # Agent 完成旅行，准备下一次旅行
        agent.reset_to_origin()

    return path_flows  # 返回每条路径的流量
