In [53]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

# --- Параметры для модели NaSch c БАТС
# 1 cell = 7.5 метра

# Параметры дороги
N_CELLS = 10000  # Количество ячеек на кольцевой дороге
V_MAX = 5        # Максимальная скорость (клеток/шаг)

# Параметры симуляции
T_STABILIZE = 2000 # Шагов для стабилизации системы
T_AVERAGE = 100    # Шагов для усреднения результатов (Te)

# Параметры модели
P_SLOW_DEFAULT = 0.5 # Вероятность случайного торможения для АТС


# --- Параметры для модели S-NFS

N_CELLS_SNFS = 100       # Количество ячеек в S-NFS
V_MAX_SNFS_1 = 1         # Макс. скорость 1
V_MAX_SNFS_3 = 3         # Макс. скорость 2
P_PARAM_SNFS = 0.1       # Вероятность НЕ тормозить случайно
T_STABILIZE_SNFS = 50    # Время стабилизации для S-NFS
T_AVERAGE_SNFS = 50      # Время усреднения для S-NFS

In [17]:
def run_NaSch_sim_vectorized(
    density,
    sim_time,
    n_cells,
    v_max,
    p_slow,
    rhdv_ratio=1.0,
    history_needed=False
):
    """
    Args:
        density (float): Плотность автомобилей [0; 1].
        sim_time (int): Общее кол-во шагов симуляции.
        n_cells (int): Длина дороги в ячейках.
        v_max (int): Максимальная скорость.
        p_slow (float): Вероятность внезапного торможения.
        rhdv_ratio (float): Доля водителей-людей (R_HDV):
        1.0 - все люди, 0.0 - все беспилотники.

    Returns:
        tuple: (
            средний поток (flow),
            средняя скорость (avg_velocity),
            история позиций (для x-t диаграмм),
            история скоростей (для x-t диаграмм)
        )
    """
    # Инициализация
    n_vehicles = int(density * n_cells)
    if n_vehicles == 0:
        if history_needed:
            return 0, 0, np.array([]), np.array([])
        return 0, 0

    # Размещаем машины случайным образом. Сразу отсортируем для удобства.
    positions = np.sort(np.random.choice(n_cells, n_vehicles, replace=False))
    velocities = np.random.randint(0, v_max + 1, n_vehicles)

    n_hdv = int(n_vehicles * rhdv_ratio)
    is_human_driver = np.zeros(n_vehicles, dtype=bool)
    is_human_driver[:n_hdv] = True
    np.random.shuffle(is_human_driver)

    if history_needed:
        positions_history = np.zeros((sim_time, n_vehicles), dtype=int)
        velocities_history = np.zeros((sim_time, n_vehicles), dtype=int)

    velocity_data = []

    # Основной цикл симуляции
    for t in range(sim_time):
        if history_needed:
            positions_history[t] = positions
            velocities_history[t] = velocities

        # Векторизованное применение NaSch,
        # Находим расстояние до следующей машины для ВСЕХ машин сразу:

        # np.roll сдвигает массив так, что на месте i-той машины
        # оказывается позиция (i+1)-ой. Необходимо для кольцевой дороги.
        gaps = (np.roll(positions, -1) - positions - 1 + n_cells) % n_cells

        velocities = np.minimum(velocities + 1, v_max)  # Ускорение


        velocities = np.minimum(velocities, gaps)  # Торможение из-за помехи

        # Случайное торможение (только для водителей-людей)
        # Булевая маска-массив примения торможения
        slowdown_mask = (is_human_driver) & (np.random.rand(n_vehicles) < p_slow)
        # Уменьшаем там где маска True
        velocities[slowdown_mask] = np.maximum(0, velocities[slowdown_mask] - 1)


        positions = (positions + velocities) % n_cells  # Движение

        # После движения машины могут перегнать друг друга,
        # нарушив порядок в массиве. Сортируем их заново, т. к. это
        # необходимо для верного расчёта gaps на будущей итерации, и
        # не забываем пересортировать тип водителя!
        order = np.argsort(positions)
        positions = positions[order]
        velocities = velocities[order]
        is_human_driver = is_human_driver[order]

        if t >= T_STABILIZE:
            velocity_data.extend(velocities)

    # Расчёт результатов
    if not velocity_data:
        avg_velocity = np.mean(velocities) if n_vehicles > 0 else 0
    else:
        avg_velocity = np.mean(velocity_data)

    flow = density * avg_velocity

    if history_needed:
        return flow, avg_velocity, positions_history, velocities_history
    else:
        return flow, avg_velocity

In [None]:
# Фундаментальная диаграмма

print("Строим фундаментальную диаграмму...")

densities = np.arange(0.01, 1.01, 0.02)
flows = []

# tqdm для отслеживания
for d in tqdm(densities):
    flow, _ = run_NaSch_sim_vectorized(
        density=d,
        sim_time=T_STABILIZE + T_AVERAGE,
        n_cells=N_CELLS,
        v_max=V_MAX,
        p_slow=P_SLOW_DEFAULT,
        history_needed=False
    )
    flows.append(flow)

# Рисуем график
plt.figure(figsize=(10, 6))
plt.plot(
    densities,
    flows,
    marker='o',
    linestyle='-',
    markersize=4,
    color='darkblue',
    label=f'pslow = {P_SLOW_DEFAULT}'
)
plt.title('Фундаментальная диаграмма (модель NaSch)')
plt.xlabel('Density [Vehicle/cell]')
plt.ylabel('Flow [Vehicle/time step]')
plt.grid(True)
plt.legend()
plt.show()

In [None]:
# Пробные x-t диаграммы разной плотности

def plot_xt_diagram(density, title):
    print(f"Моделируем для плотности {density}...")

    local_n_cells = 100
    local_sim_time = 150

    _, _, positions_history, velocities_history = run_NaSch_sim_vectorized(
        density=density,
        sim_time=local_sim_time,
        n_cells=local_n_cells,
        v_max=V_MAX,
        p_slow=P_SLOW_DEFAULT,
        history_needed=True
    )

    plt.figure(figsize=(10, 8))

    # Создаем сетку для отображения
    time_steps = np.arange(local_sim_time)
    for t in time_steps:
        # Для каждой машины в данный момент времени t своя точка
        plt.scatter(
            positions_history[t],
            np.full_like(positions_history[t], t),
            c=velocities_history[t],
            cmap='winter',
            vmin=0,
            vmax=V_MAX,
            s=5)

    plt.title(title)
    plt.xlabel('Позиция на дороге (ед. cells)')
    plt.ylabel('Время (с.)')
    plt.xlim(0, local_n_cells)
    plt.ylim(0, local_sim_time)

    cbar = plt.colorbar()
    cbar.set_label('Скорость')

    plt.show()

# Строим диаграммы для двух значений плотности: свободный поток и затор
plot_xt_diagram(
    density=0.03,
    title='x-t диаграмма при density = 0.03'
)
plot_xt_diagram(
    density=0.1,
    title='x-t диаграмма при density = 0.1'
)

In [None]:
# Сравнительный график по случайному торможению

print("Строим фундаметнальные диаграммы для разных p_slow...")

plt.figure(figsize=(12, 7))
densities = np.arange(0.01, 1.01, 0.02)
p_slow_values = [0.2, 0.4, 0.6, 0.8]
cmap = plt.cm.get_cmap('RdYlGn_r')
p_min, p_max = min(p_slow_values), max(p_slow_values)


for p_val in p_slow_values:
    flows = []
    print(f"  Расчёт для p_slow = {p_val}...")
    color = cmap((p_val - p_min) / (p_max - p_min))

    for d in tqdm(densities):
        flow, _ = run_NaSch_sim_vectorized(
            density=d,
            sim_time=T_STABILIZE + T_AVERAGE,
            n_cells=N_CELLS,
            v_max=V_MAX,
            p_slow=p_val,
            history_needed=False
        )
        flows.append(flow)

    plt.plot(
        densities,
        flows,
        linewidth=3,
        color=color,

        label=f'pslow = {p_val}'
    )

plt.title('Влияние вероятности случайного торможения на поток')
plt.xlabel('Density [Vehicle/cell]')
plt.ylabel('Flow [Vechicle/time_step]')
plt.grid(True)
plt.legend()
plt.show()

In [None]:
# Серия фундаментальных диаграмм с разной долей БАТС

print("Строим диаграммы, варьируя долю БАТС...")

plt.figure(figsize=(12, 7))
densities = np.arange(0.01, 1.01, 0.02)
rhdv_ratios = [1.0, 0.8, 0.6, 0.4, 0.2, 0.0]
cmap = plt.cm.get_cmap('RdYlGn_r')
r_min, r_max = min(rhdv_ratios), max(rhdv_ratios)

for ratio in rhdv_ratios:
    flows = []
    print(f"  Расчёт для R_HDV = {ratio}...")
    color = cmap((ratio - r_min) / (r_max - r_min))

    for d in tqdm(densities):
        flow, _ = run_NaSch_sim_vectorized(
            density=d,
            sim_time=T_STABILIZE + T_AVERAGE,
            n_cells=N_CELLS,
            v_max=V_MAX,
            p_slow=P_SLOW_DEFAULT, # p_slow для людей фиксирован
            rhdv_ratio=ratio,
            history_needed=False
        )
        flows.append(flow)
    plt.plot(
        densities,
        flows,
        color = color,
        linewidth=2,
        label=f'R_HDV = {ratio:.1f}')

plt.title('Влияние доли БАТС на транспортный поток')
plt.xlabel('Density [Vehicle/cell]')
plt.ylabel('Flow [Vechicle/time_step]')
plt.grid(True)
plt.legend()
plt.show()

In [None]:
# Сравнение x-t диаграмм

def plot_xt_diagram_rhdv(density, rhdv_ratio, title):
    print(f"Моделируем для плотности {density}, R_HDV = {rhdv_ratio}...")

    local_n_cells = 250
    local_sim_time = 400

    _, _, positions_history, velocities_history = run_NaSch_sim_vectorized(
        density=density,
        sim_time=local_sim_time,
        n_cells=local_n_cells,
        v_max=V_MAX,
        p_slow=P_SLOW_DEFAULT,
        rhdv_ratio=rhdv_ratio,
        history_needed=True
    )

    plt.figure(figsize=(12, 8))
    time_steps = np.arange(local_sim_time)
    for t in time_steps:
        plt.scatter(
            positions_history[t],
            np.full_like(positions_history[t], t),
            c=velocities_history[t],
            cmap='RdYlGn',
            vmin=0,
            vmax=V_MAX,
            s=5
        )

    plt.title(title)
    plt.xlabel('Позиция на дороге (ед. cells)')
    plt.ylabel('Время (с.)')
    plt.xlim(0, local_n_cells)
    plt.ylim(0, local_sim_time)
    cbar = plt.colorbar()
    cbar.set_label('Скорость')
    plt.show()

# БАТС отсутствуют
plot_xt_diagram_rhdv(
    density=0.4,
    rhdv_ratio=1.0,
    title='x-t диаграмма, density = 0.4, R_HDV = 1.0'
)

# БАТС - 50%
plot_xt_diagram_rhdv(
    density=0.4,
    rhdv_ratio=0.5,
    title='x-t диаграмма, density = 0.4, R_HDV = 0.5')

In [51]:
def run_snfs_simulation(density, sim_time, n_cells, v_max, p, q, r):
    # Инициализация
    n_vehicles = int(density * n_cells)
    if n_vehicles == 0:
        return 0, 0


    if v_max == 1 and density > 0:  # Равномерное распр для ветвей при v_max=1
        gap = n_cells / n_vehicles
        positions = np.floor(np.arange(n_vehicles) * gap).astype(int)
    else:  # Случайное распр для v_max=3
        positions = np.sort(np.random.choice(n_cells, n_vehicles, replace=False))

    velocities = np.zeros(n_vehicles, dtype=int)
    velocity_data = []

    for t in range(sim_time):
        new_velocities = velocities.copy()  # Для обновления разом

        for i in range(n_vehicles):
            v_new = min(velocities[i] + 1, v_max)  # Ускорение

            # Упреждение, на сколько машин вперед смотреть (S)
            s_lookahead = 2 if np.random.rand() < r else 1
            target_car_index = (i + s_lookahead) % n_vehicles  # Ищем S-ое авто
            gap_to_s_car = (positions[target_car_index] - positions[i] + n_cells) % n_cells

            # Медленный старт (торможение из-за машины впереди) с вероятностью q
            if np.random.rand() < q:
                v_new = min(v_new, gap_to_s_car - s_lookahead)

            # Случайное торможение с вероятностью 1-p
            if np.random.rand() < p:
                v_new = max(0, v_new - 1)

            # Избежание столкновений
            gap_to_next_car = (positions[(i + 1) % n_vehicles] - positions[i] - 1 + n_cells) % n_cells
            v_new = min(v_new, gap_to_next_car)

            new_velocities[i] = v_new

        velocities = new_velocities

        positions = (positions + velocities) % n_cells  # Движение


        order = np.argsort(positions)  # Для верной следующей итерации
        positions = positions[order]
        velocities = velocities[order]

        if t >= T_STABILIZE_SNFS:
            velocity_data.extend(velocities)


    if not velocity_data:  # Расчёт результатов
        avg_velocity = np.mean(velocities) if n_vehicles > 0 else 0
    else:
        avg_velocity = np.mean(velocity_data)

    flow = density * avg_velocity

    return flow, avg_velocity

In [None]:
# Диаграммы S-NFS для большой максимальной скорости

print("Строим диаграммы S-NFS для V_max = 3...")

# Создаем сетку 3x3
fig, axes = plt.subplots(3, 3, figsize=(15, 15), sharex=True, sharey=True)
fig.suptitle('Диаграммы S-NFS при Vmax = 3', fontsize=16)

q_params = [0.0, 0.5, 1.0]
r_params = [0.0, 0.5, 1.0]
densities = np.linspace(0, 1, 50)

for i, r in enumerate(r_params):
    for j, q in enumerate(q_params):
        ax = axes[i, j]

        print(f"  Расчет для q={q}, r={r}...")

        for d in tqdm(densities):
            # Запускаем симуляцию неоднократно для каждой точки плотности
            # Наносим все результаты, чтобы увидеть разброс
            for _ in range(10):
                flow, _ = run_snfs_simulation(
                    density=d,
                    sim_time=T_STABILIZE_SNFS + T_AVERAGE_SNFS,
                    n_cells=N_CELLS_SNFS,
                    v_max=V_MAX_SNFS_3,
                    p=P_PARAM_SNFS,
                    q=q,
                    r=r
                )
                # Рисуем каждую точку отдельно
                ax.plot(
                    d,
                    flow,
                    '.',
                    markersize=3,
                    color='darkblue')

        ax.set_title(f'q = {q}, r = {r}')
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1.2)
        ax.grid(True)
        if j == 2: ax.set_xlabel('Density')
        if i == 0: ax.set_ylabel('Flow')

plt.tight_layout(rect=[0, 0.03, 1, 0.96])
plt.show()

In [None]:
# Диаграммы S-NFS для малой максимальной скорости

print("Строим диаграммы S-NFS для V_max = 1...")

fig, axes = plt.subplots(3, 3, figsize=(15, 15))
fig.suptitle('Диаграммы S-NFS при Vmax = 1 (наша модель)', fontsize=16)

q_params = [0.0, 0.5, 1.0]
r_params = [0.0, 0.5, 1.0]
densities = np.linspace(0, 1, 50)

for i, q in enumerate(q_params):
    for j, r in enumerate(r_params):
        ax = axes[i, j]

        print(f"  Расчет для q={q}, r={r}...")
        # Здесь не усредняем, чтобы увидеть разброс и ветви
        for d in tqdm(densities):
            for _ in range(10): # 10 прогонов для каждой плотности
                flow, _ = run_snfs_simulation(
                    density=d,
                    sim_time=T_STABILIZE_SNFS + T_AVERAGE_SNFS,
                    n_cells=N_CELLS_SNFS,
                    v_max=V_MAX_SNFS_1,
                    p=P_PARAM_SNFS,
                    q=q,
                    r=r
                )
                ax.plot(
                    d,
                    flow,
                    '.',
                    markersize=3,
                    color='darkgreen')

        ax.set_title(f'q = {q}, r = {r}')
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 0.7)
        ax.grid(True)
        if i == 2: ax.set_xlabel('Density')
        if j == 0: ax.set_ylabel('Flow')

plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()

In [64]:
# Ячейка 11: Создание анимированной GIF-ки

import os
import imageio # Библиотека для создания GIF

# --- Шаг 1: Настройка параметров для анимации ---
# Нам нужна небольшая дорога, чтобы пробка была хорошо видна
ANIM_N_CELLS = 100
# Плотность, при которой легко образуются заторы
ANIM_DENSITY = 0.15
# Количество кадров в нашей гифке
ANIM_FRAMES = 200
# Максимальная скорость
ANIM_V_MAX = 5
# Вероятность торможения
ANIM_P_SLOW = 0.3

# Создадим папку для временного хранения кадров
if not os.path.exists('gif_frames'):
    os.makedirs('gif_frames')

# --- Шаг 2: Функция для отрисовки одного кадра ---
def draw_traffic_circle(positions, velocities, frame_num):
    """Рисует текущее состояние трафика на кольцевой дороге."""
    fig, ax = plt.subplots(figsize=(8, 8))

    # Убираем оси для красоты
    ax.set_aspect('equal')
    ax.axis('off')

    # Рисуем "дорогу" - серый круг
    road = plt.Circle((0, 0), 1, color='lightgray', fill=False, linewidth=15)
    ax.add_artist(road)

    # Рассчитываем углы для каждой машины
    # Позиция от 0 до N_CELLS-1 -> угол от 0 до 2*PI
    angles = (positions / ANIM_N_CELLS) * 2 * np.pi

    # Координаты на круге
    x = np.cos(angles)
    y = np.sin(angles)

    # Рисуем машины
    # Цвет зависит от скорости, используем красивую цветовую карту 'viridis' или 'plasma'
    scatter = ax.scatter(x, y, c=velocities, cmap='plasma', vmin=0, vmax=ANIM_V_MAX, s=100, zorder=10)

    # Добавляем номер кадра
    ax.text(0, 0, f't = {frame_num}', ha='center', va='center', fontsize=20)

    # Устанавливаем границы, чтобы круг не обрезался
    ax.set_xlim(-1.2, 1.2)
    ax.set_ylim(-1.2, 1.2)

    # Сохраняем кадр
    filepath = f'gif_frames/frame_{frame_num:04d}.png'
    plt.savefig(filepath, dpi=90) # dpi поменьше, чтобы файлы были не очень большими
    plt.close(fig)
    return filepath

# --- Шаг 3: Запуск симуляции и сохранение кадров ---
print("Запускаем симуляцию для создания GIF...")

# Получаем всю историю движения
_, _, positions_history, velocities_history = run_nasch_simulation_vectorized(
    density=ANIM_DENSITY,
    sim_time=ANIM_FRAMES,
    n_cells=ANIM_N_CELLS,
    v_max=ANIM_V_MAX,
    p_slow=ANIM_P_SLOW,
    history_needed=True
)

print("Генерируем кадры...")
frame_files = []
for t in tqdm(range(ANIM_FRAMES)):
    filepath = draw_traffic_circle(positions_history[t], velocities_history[t], t)
    frame_files.append(filepath)

# --- Шаг 4: Сборка GIF ---
print("Собираем GIF-анимацию...")
with imageio.get_writer('traffic_animation.gif', mode='I', fps=20) as writer:
    for filename in frame_files:
        image = imageio.imread(filename)
        writer.append_data(image)

print("\nГотово! Гифка 'traffic_animation.gif' сохранена в текущей папке.")

# (Опционально) Очистка временных файлов
import shutil
shutil.rmtree('gif_frames')
print("Временные файлы кадров удалены.")

Запускаем симуляцию для создания GIF...
Генерируем кадры...


  0%|          | 0/200 [00:00<?, ?it/s]

Собираем GIF-анимацию...


  image = imageio.imread(filename)



Готово! Гифка 'traffic_animation.gif' сохранена в текущей папке.
Временные файлы кадров удалены.
