# マルチエージェントシミュレーション

## 必要なライブラリをインポート

In [None]:
import math
import random

import os
import csv
import pandas as pd

from typing import List, Tuple, Dict
from __future__ import annotations

## 歩行者エージェントクラス

In [None]:
class Pedestrian:
    """ 歩行者エージェントを表現するクラス """

    def __init__(self, id: int, lane_min: int, lane_max: int, start_time: int) -> None:
        """
        コンストラクタ

        Parameters
        ----------
        id : int
            識別番号
        x : float
            x座標
        lane : int
            歩いているレーン番号
        lane_min : int
            レーン番号の最小値
        lane_max : int
            レーン番号の最大値
        walking_speed : float
            通常時の歩行速度（0.8～1.7 m/s）
        walking_time : int
            歩行時間 [s]
        start_time : int
            歩行者エージェントの発生時刻
        """
        self.id = id
        self.x = 0.0
        self.lane = random.randint(lane_min, lane_max)
        self.walking_speed = random.randint(8, 17) / 10
        self.walking_time = 0
        self.start_time = start_time

    def update_walking_time(self) -> None:
        """ 歩行時間を更新 """
        self.walking_time += 1

## 歩行者の位置を更新するための関数

In [None]:
def get_front_agent(pedestrians: List[Pedestrian], x: float, lane: int) -> Pedestrian | None:
    """
    x座標とレーンから前を歩く歩行者エージェントを取得

    Parameters
    ----------
    pedestrians : List[Pedestrian]
        歩行者エージェントリスト（x座標が大きい順）
    x : float
        x座標
    lane : int
        レーン番号

    Returns
    -------
    front_agent : Pedestrian | None
        前を歩く歩行者エージェント
    """
    front_agent = None
    for pedestrian in pedestrians:
        if (pedestrian.lane == lane and pedestrian.x > x):
            front_agent = pedestrian
        elif (pedestrian.x < x):
            break

    return front_agent

In [None]:
def get_behind_agent(pedestrians: List[Pedestrian], x: float, lane: int) -> Pedestrian | None:
    """
    x座標とレーンから後ろを歩く歩行者エージェントを取得

    Parameters
    ----------
    pedestrians : List[Pedestrian]
        歩行者エージェントリスト（x座標が大きい順）
    x : float
        x座標
    lane : int
        レーン番号

    Returns
    -------
    behind_agent : Pedestrian | None
        後ろを歩く歩行者エージェント
    """
    behind_agent = None
    for pedestrian in pedestrians:
        if (pedestrian.lane == lane and pedestrian.x < x):
            behind_agent = pedestrian
            break

    return behind_agent

In [None]:
def is_change_lane(pedestrians: List[Pedestrian], pedestrian: Pedestrian, lane_min: int, lane_max: int) -> bool:
    """
    レーンを変更するかどうかを判定

    Parameters
    ----------
    pedestrians : List[Pedestrian]
        歩行者エージェントリスト（x座標が大きい順）
    pedestrian : Pedestrian
        歩行者エージェント
    lane_min : int
        レーン番号の最小値
    lane_max : int
        レーン番号の最大値

    Returns
    -------
    is_change : bool, default False
        レーンを変更したかどうか
    """
    is_change = False

    # レーンが1つしかない場合
    if (lane_max == 1):
        return is_change

    # 変更先のレーン
    change_lane = -1
    if (pedestrian.lane == lane_min):
        change_lane = pedestrian.lane + 1
    elif (pedestrian.lane == lane_max):
        change_lane = pedestrian.lane - 1
    else:
        if (random.randint(0, 1) == 0):
            change_lane = pedestrian.lane + 1
        else:
            change_lane = pedestrian.lane - 1

    # 変更先レーンで前を歩く歩行者
    front_agent = get_front_agent(pedestrians, pedestrian.x, change_lane)
    # 変更先レーンで後ろを歩く歩行者
    behind_agent = get_behind_agent(pedestrians, pedestrian.x, change_lane)

    # x方向に進む最大距離
    x_max = math.sqrt(pedestrian.walking_speed ** 2 - 0.5 ** 2)

    # レーン変更先で前または後ろを歩く歩行者がいない場合
    if ((front_agent is None) or (behind_agent is None)):
        is_change = True
        pedestrian.lane = change_lane
        pedestrian.x += x_max

    # 前後に歩行者がいる場合
    else:
        # レーン変更の条件
        # ① レーン変更先で前を歩く歩行者（更新後）と後ろを歩く歩行者（更新前）の距離が2.5 m以上
        # ② レーン変更する予定の歩行者とレーン変更先の後ろを歩く歩行者との距離が0.5 m以上
        if ((front_agent.x - behind_agent.x >= 2.5) and (pedestrian.x - behind_agent.x >= 0.5)):
            is_change = True
            pedestrian.lane = change_lane

            # 衝突判定の閾値
            distance_threshold = 0.5
            # 前を歩く歩行者との距離
            distance_to_front = front_agent.x - (pedestrian.x + x_max)

            # レーン変更先の前を歩く人に追いつく場合
            if (distance_to_front < distance_threshold):
                pedestrian.x = front_agent.x - distance_threshold

            # レーン変更先の前を歩く人に追いつかない場合
            else:
                pedestrian.x += x_max

    return is_change

In [None]:
def update_position(pedestrians: List[Pedestrian], lane_min: int, lane_max: int) -> None:
    """
    歩行者の位置を更新

    Parameters
    ----------
    pedestrians : List[Pedestrian]
        歩行者エージェントリスト（x座標が大きい順）
    lane_min : int
        レーン番号の最小値
    lane_max : int
        レーン番号の最大値
    """
    for pedestrian in pedestrians:
        # 前を歩く歩行者
        front_agent = get_front_agent(pedestrians, pedestrian.x, pedestrian.lane)
        # 後ろを歩く歩行者
        behind_agent = get_behind_agent(pedestrians, pedestrian.x, pedestrian.lane)

        # 前を歩く歩行者がいない場合
        if (front_agent is None):
            pedestrian.x += pedestrian.walking_speed

        # 前を歩く歩行者がいる場合
        else:
            # 衝突判定の閾値
            distance_threshold = 0.5
            # 前を歩く歩行者との距離
            distance_to_front = abs(front_agent.x - (pedestrian.x + pedestrian.walking_speed))

            # 前を歩く人に追いつく場合
            if (distance_to_front < distance_threshold):
                # 追い抜き判定
                # 追い抜く場合は、is_change_lane()で処理を実行
                flag = is_change_lane(pedestrians, pedestrian, lane_min, lane_max)
                # 追い抜けない場合は、前の歩行者を追従
                if (flag == False):
                    pedestrian.x = front_agent.x - distance_threshold

            # 前を歩く人に追いつかない場合
            else:
                pedestrian.x += pedestrian.walking_speed

## マルチエージェントシミュレーション

In [None]:
# シミュレーションの終了時刻
end_time = 100
# 歩行者が歩くレーンの最小値
lane_min = 1
# 歩行者が歩くレーンの最大値
lane_max = random.randint(2, 6)
# 歩行者エージェントの総数
N = random.randint(1 * lane_max, 100 * lane_max)
# 各レーンのエージェント発生数
N_lane = int(N / lane_max)

# 混雑度
crowd_level = -1
density = N_lane / end_time
if (density <= 0.2):
    crowd_level = 1
elif (density <= 0.4):
    crowd_level = 2
elif (density <= 0.6):
    crowd_level = 3
elif (density <= 0.8):
    crowd_level = 4
elif (density <= 1.0):
    crowd_level = 5

# 歩行者エージェントの各レーンでの発生時刻リストのリスト
start_time_lists = [sorted(random.sample(range(end_time), N_lane)) for _ in range(lane_max)]

# 歩行者エージェントを格納するリスト（x座標が大きい順）
pedestrians = []

# 各レーンで発生した歩行者エージェント数
numbers = [0 for _ in range(lane_max)]

In [None]:
# シミュレーション
for current_time in range(end_time):
    # 歩行者エージェントを発生時刻に作成
    for i in range(lane_max):
        if (numbers[i] < N_lane):
            if current_time == start_time_lists[i][numbers[i]]:
                numbers[i] += 1
                new_agent = Pedestrian(id=sum(numbers), lane_min=lane_min, lane_max=lane_max, start_time=current_time)
                pedestrians.append(new_agent)

    # 歩行者エージェントの位置を更新
    update_position(pedestrians, lane_min, lane_max)

    # x座標が大きい順にソート（降順）
    pedestrians.sort(key=lambda p: p.x, reverse=True)

    # print(f"Current time: {current_time + 1}")
    for pedestrian in pedestrians:
        # 歩行時間を更新
        pedestrian.update_walking_time()
        # シミュレーション結果を表示（20秒ごと）
        # if ((current_time + 1) % 20 == 0):
        #     print(f"id={pedestrian.id}, x={pedestrian.x}, lane={pedestrian.lane}, speed={pedestrian.walking_speed}")

In [None]:
# 結果の表示
print(f"crowd_level_{crowd_level}")
for pedestrian in pedestrians:
    print(f"id={pedestrian.id}, x={pedestrian.x}, lane={pedestrian.lane}, speed={pedestrian.walking_speed}, time={pedestrian.walking_time}")

crowd_level_2
id=13, x=150.97442304278158, lane=2, speed=1.7, time=91
id=12, x=147.03973683071388, lane=1, speed=1.6, time=92
id=16, x=143.91986841535683, lane=2, speed=1.6, time=90
id=31, x=135.27442304278168, lane=2, speed=1.7, time=81
id=14, x=134.92842712474618, lane=4, speed=1.5, time=91
id=28, x=130.0504764575346, lane=1, speed=1.6, time=82
id=2, x=128.69999999999982, lane=5, speed=1.3, time=99
id=3, x=127.29999999999981, lane=3, speed=1.3, time=98
id=18, x=124.41533936612461, lane=2, speed=1.4, time=89
id=36, x=122.81073962682049, lane=2, speed=1.6, time=77
id=8, x=122.09999999999981, lane=5, speed=1.3, time=94
id=30, x=121.15685424949238, lane=6, speed=1.5, time=81
id=11, x=119.59999999999982, lane=1, speed=1.3, time=92
id=23, x=118.78136476990903, lane=4, speed=1.4, time=86
id=6, x=115.09087121146375, lane=5, speed=1.2, time=96
id=7, x=114.00000000000018, lane=3, speed=1.2, time=95
id=48, x=113.44884608556326, lane=1, speed=1.7, time=67
id=38, x=113.10508477383675, lane=2, spe

## CSVファイルに書き込み

In [None]:
# ファイル名
file_name = "dataset.csv"
# 既存のファイルがある場合は追記モードで開き、ない場合は新規作成
mode = "a" if os.path.exists(file_name) else "w"

with open(file_name, mode=mode, newline="") as file:
    fieldnames = ["distance", "width", "speed", "crowd_level", "time"]
    writer = csv.DictWriter(file, fieldnames=fieldnames)

    # ファイルが新規作成された場合はヘッダを書き込む
    if mode == "w":
        writer.writeheader()

    # シミュレーション結果をCSVファイルに書き込む
    for pedestrian in pedestrians:
        writer.writerow({
            "distance": pedestrian.x,
            "width": 0.5 * lane_max,
            "speed": pedestrian.walking_speed,
            "crowd_level": crowd_level,
            "time": pedestrian.walking_time
        })

## データ量の調整

In [None]:
dataset = pd.read_csv("dataset.csv")
dataset = dataset[:-10]  # 最後のX行を削除
print(dataset.shape)
dataset.to_csv("dataset.csv", index=False)

(134, 5)


データセット作成完了