# Лабораторная работа: Моделирование «Билетные кассы на автовокзале»

Цель: смоделировать работу 4 касс с отдельными очередями, учетом технологических перерывов и выбором пассажира «приблизительно наименьшей» очереди. Реализован дискретно-событийный движок без внешних библиотек.

Ключевые элементы:
- Пассажиры приезжают по пуассоновскому потоку (экспоненциальные межприходы)
- Время обслуживания — экспоненциальное
- 4 кассы: циклы работы по 60 мин (45 мин работа + 15 мин перерыв), за `T_announce` минут до перерыва — объявление и возможная миграция пассажиров
- Выбор очереди — «приблизительно минимальная» (погрешность по длине ±delta)

Выходные показатели:
- Среднее время ожидания в очереди
- Среднее время пребывания в системе
- Средняя длина очереди по каждой кассе (временная средняя)
- Кол-во пассажиров, не успевших обслужиться до перерыва и ждавших его окончания
- Общее число обслуженных пассажиров
- Кол-во ушедших из системы из-за превышения лимита ожидания (необязательно)


In [None]:
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, List, Optional, Dict, Tuple, Any
import heapq
import random
import math

# -------------------------
# Глобальные параметры
# -------------------------
SEED = 42
random.seed(SEED)

MINUTES = 1.0
HOUR = 60.0 * MINUTES

PARAMS = {
    # Поток прибытия пассажиров (интенсивность лямбда, пассажиров в минуту)
    "arrival_rate": 1.0,  # λ
    # Скорость обслуживания одной кассой (μ, пассажиров в минуту)
    "service_rate_per_booth": 0.35,  # μ
    # Кол-во касс
    "num_booths": 4,
    # Цикл работы кассы: 45 мин работа, 15 мин перерыв
    "work_minutes": 45.0,
    "break_minutes": 15.0,
    # За сколько минут до перерыва оповещать
    "announce_minutes": 5.0,
    # Длительность моделирования (в минутах)
    "simulation_duration": 8 * HOUR,
    # Параметры выбора «приблизительно минимальной» очереди
    "approx_delta": 1,       # допустимая погрешность по длине очереди (шт.)
    "epsilon_random": 0.05,  # шанс выбрать не оптимальную очередь
    # Во время анонса пассажиры мигрируют с вероятностью:
    "announce_migrate_prob": 1.0,
    # Кара за "announce" для выбора (искусственно увеличиваем воспринимаемую длину)
    "announce_penalty": 2,
    # Отказ из-за ожидания (None отключает)
    "max_wait_time": None,  # в минутах; например, 60.0
}


In [None]:
@dataclass
class Passenger:
    id: int
    arrival_time: float
    chosen_booth_id: Optional[int] = None
    service_start_time: Optional[float] = None
    service_end_time: Optional[float] = None
    abandoned: bool = False  # ушёл из-за лимита ожидания
    waited_over_break: bool = False  # застал перерыв и дождался

@dataclass(order=True)
class Event:
    time: float
    priority: int
    kind: str
    payload: Any = field(compare=False, default=None)

class DES:
    def __init__(self):
        self.t: float = 0.0
        self.queue: List[Event] = []
        self.event_counter: int = 0

    def schedule(self, time: float, kind: str, payload: Any = None, priority: int = 0):
        self.event_counter += 1
        evt = Event(time=time, priority=priority, kind=kind, payload=payload)
        heapq.heappush(self.queue, evt)

    def run(self, until: float, on_event: Callable[[Event, float], None]):
        while self.queue and self.queue[0].time <= until:
            evt = heapq.heappop(self.queue)
            self.t = evt.time
            on_event(evt, self.t)
        # Продвинуть время до until
        self.t = until


In [None]:
class TicketBooth:
    def __init__(self, booth_id: int, params: Dict[str, Any]):
        self.booth_id = booth_id
        self.params = params
        self.queue: List[Passenger] = []
        self.is_on_break: bool = False
        self.is_in_announce: bool = False
        self.cycle_start_time: float = 0.0  # начало текущего цикла работы
        self.currently_serving: Optional[Passenger] = None
        self.next_cycle_break_start: float = 0.0
        self.next_cycle_break_end: float = 0.0
        self.next_announce_time: float = 0.0

    def perceived_queue_length(self) -> int:
        # Во время анонса добавляем штраф к воспринимаемой длине
        penalty = self.params["announce_penalty"] if self.is_in_announce else 0
        # Если касса в перерыве, считаем очередь как очень длинную
        if self.is_on_break:
            return 10**9
        return len(self.queue) + penalty

    def plan_cycle(self, now: float) -> Tuple[float, float, float]:
        work = self.params["work_minutes"]
        brk = self.params["break_minutes"]
        announce = self.params["announce_minutes"]
        # Цикл начинается в момент now, планируем ближайшие времена
        start = now
        break_start = start + work
        break_end = break_start + brk
        announce_time = max(start, break_start - announce)
        self.cycle_start_time = start
        self.next_cycle_break_start = break_start
        self.next_cycle_break_end = break_end
        self.next_announce_time = announce_time
        return announce_time, break_start, break_end

    def on_announce(self):
        self.is_in_announce = True

    def on_break_start(self):
        self.is_on_break = True
        self.is_in_announce = False

    def on_break_end(self, new_cycle_start: float):
        self.is_on_break = False
        self.is_in_announce = False
        self.cycle_start_time = new_cycle_start

class Metrics:
    def __init__(self, num_booths: int):
        self.num_booths = num_booths
        self.total_wait_time: float = 0.0
        self.total_system_time: float = 0.0
        self.num_completed: int = 0
        self.num_abandoned: int = 0
        self.num_waited_over_break: int = 0
        # Для временной средней длины очередей: интеграл длины по времени
        self.queue_area: List[float] = [0.0 for _ in range(num_booths)]
        self.last_queue_time: float = 0.0

    def accumulate_queue_lengths(self, booths: List[TicketBooth], now: float):
        dt = now - self.last_queue_time
        if dt < 0:
            return
        for i, booth in enumerate(booths):
            self.queue_area[i] += len(booth.queue) * dt
        self.last_queue_time = now

    def record_service_completion(self, p: Passenger):
        if p.service_start_time is not None:
            self.total_wait_time += (p.service_start_time - p.arrival_time)
        if p.service_end_time is not None:
            self.total_system_time += (p.service_end_time - p.arrival_time)
        self.num_completed += 1

    def record_abandon(self):
        self.num_abandoned += 1

    def record_waited_over_break(self):
        self.num_waited_over_break += 1


In [None]:
def expovariate(rate: float) -> float:
    # rate = 0 -> бесконечность, но в параметрах не используется
    u = random.random()
    return -math.log(1.0 - u) / rate

class BusStationModel:
    def __init__(self, params: Dict[str, Any]):
        self.params = params
        self.des = DES()
        self.booths: List[TicketBooth] = [TicketBooth(i, params) for i in range(params["num_booths"])]
        self.metrics = Metrics(params["num_booths"])
        self.now = 0.0
        self.next_passenger_id = 0

    # -------------------------
    # Инициализация
    # -------------------------
    def initialize(self):
        self.now = 0.0
        self.metrics.last_queue_time = 0.0
        # Планируем циклы для касс
        for booth in self.booths:
            announce_time, break_start, break_end = booth.plan_cycle(self.now)
            self.des.schedule(announce_time, kind="announce", payload={"booth": booth.booth_id})
            self.des.schedule(break_start, kind="break_start", payload={"booth": booth.booth_id})
            self.des.schedule(break_end, kind="break_end", payload={"booth": booth.booth_id})
        # Планируем первое прибытие пассажира
        ia = expovariate(self.params["arrival_rate"]) if self.params["arrival_rate"] > 0 else float("inf")
        self.des.schedule(self.now + ia, kind="arrival")

    # -------------------------
    # Выбор очереди (приблизительно минимальная)
    # -------------------------
    def choose_booth_for_passenger(self) -> int:
        delta = self.params["approx_delta"]
        epsilon = self.params["epsilon_random"]
        # Оцениваем воспринимаемую длину очереди
        perceived = [(booth.perceived_queue_length(), booth.booth_id) for booth in self.booths]
        perceived.sort()
        # лучший диапазон: все очереди чья длина <= min_len + delta
        min_len = perceived[0][0]
        candidates = [bid for (l, bid) in perceived if l <= min_len + delta]
        # с небольшой вероятностью возьмём любую кассу (модель нерациональности)
        if random.random() < epsilon:
            return random.choice([b.booth_id for b in self.booths])
        # иначе случайно среди лучших
        return random.choice(candidates)

    # -------------------------
    # Обработка событий
    # -------------------------
    def handle_event(self, evt: Event, now: float):
        self.metrics.accumulate_queue_lengths(self.booths, now)
        self.now = now
        kind = evt.kind
        payload = evt.payload or {}

        if kind == "arrival":
            self.on_arrival()
        elif kind == "service_end":
            self.on_service_end(payload["booth"], payload["passenger"]) 
        elif kind == "announce":
            self.on_announce(payload["booth"]) 
        elif kind == "break_start":
            self.on_break_start(payload["booth"]) 
        elif kind == "break_end":
            self.on_break_end(payload["booth"]) 
        else:
            pass

    def on_arrival(self):
        # Создаём пассажира
        p = Passenger(id=self.next_passenger_id, arrival_time=self.now)
        self.next_passenger_id += 1

        # Выбор кассы
        booth_id = self.choose_booth_for_passenger()
        p.chosen_booth_id = booth_id
        booth = self.booths[booth_id]

        # Проверка на отказ по лимиту ожидания (опционально)
        max_wait = self.params["max_wait_time"]
        if max_wait is not None and booth.is_on_break:
            # если касса на перерыве, ожидание как минимум до конца перерыва
            time_until_end = booth.next_cycle_break_end - self.now
            if time_until_end > max_wait:
                p.abandoned = True
                self.metrics.record_abandon()
                # не ставим в очередь
            else:
                booth.queue.append(p)
        else:
            booth.queue.append(p)

        # Если касса свободна и работает — запускаем обслуживание
        if not booth.is_on_break and booth.currently_serving is None and booth.queue:
            self.start_service(booth)

        # Планируем следующее прибытие
        ia = expovariate(self.params["arrival_rate"]) if self.params["arrival_rate"] > 0 else float("inf")
        next_t = self.now + ia
        if next_t <= self.params["simulation_duration"]:
            self.des.schedule(next_t, kind="arrival")

    def start_service(self, booth: TicketBooth):
        if booth.is_on_break or not booth.queue:
            return
        p = booth.queue.pop(0)
        booth.currently_serving = p
        p.service_start_time = self.now
        # Время обслуживания экспоненциальное
        service_time = expovariate(self.params["service_rate_per_booth"]) if self.params["service_rate_per_booth"] > 0 else float("inf")
        end_time = self.now + service_time
        # Если обслуживание пересекает перерыв, оно прерывается в момент break_start и продолжится после break_end
        if end_time > booth.next_cycle_break_start and not booth.is_on_break:
            # Считаем, что обслуживание прерывается на перерыв: остаток перенесём после перерыва
            time_before_break = booth.next_cycle_break_start - self.now
            remaining = service_time - max(0.0, time_before_break)
            # фиксируем, что человек будет ждать перерыв
            p.waited_over_break = True
            self.metrics.record_waited_over_break()
            # Планируем завершение после перерыва
            end_time = booth.next_cycle_break_end + remaining
        self.des.schedule(end_time, kind="service_end", payload={"booth": booth.booth_id, "passenger": p})

    def on_service_end(self, booth_id: int, p: Passenger):
        booth = self.booths[booth_id]
        # Завершение обслуживания
        booth.currently_serving = None
        p.service_end_time = self.now
        self.metrics.record_service_completion(p)
        # Запуск следующего, если возможно
        if not booth.is_on_break and booth.queue:
            self.start_service(booth)

    def on_announce(self, booth_id: int):
        booth = self.booths[booth_id]
        booth.on_announce()
        # Миграция пассажиров с вероятностью
        migrate_prob = self.params["announce_migrate_prob"]
        if migrate_prob <= 0 or not booth.queue:
            return
        # Пассажиры, не обслуживаемые (т.е. вся очередь) — могут мигрировать
        remaining_queue = []
        for p in booth.queue:
            if random.random() < migrate_prob:
                # выбрать новую кассу по тому же правилу
                new_booth_id = self.choose_booth_for_passenger()
                if new_booth_id != booth_id:
                    self.booths[new_booth_id].queue.append(p)
                    continue
            remaining_queue.append(p)
        booth.queue = remaining_queue
        # Если касса свободна и работает — запустить обслуживание, т.к. очередь изменилась
        if not booth.is_on_break and booth.currently_serving is None and booth.queue:
            self.start_service(booth)

    def on_break_start(self, booth_id: int):
        booth = self.booths[booth_id]
        booth.on_break_start()
        # Если кто-то обслуживался — это обслуживание прервётся в start_service логике при планировании
        # Остановить обслуживание: текущее завершение уже перенесено в будущий event, мы не отменяем событий
        # Новых запусков до конца перерыва не будет

    def on_break_end(self, booth_id: int):
        booth = self.booths[booth_id]
        booth.on_break_end(new_cycle_start=self.now)
        # Спланировать новый цикл
        announce_time, break_start, break_end = booth.plan_cycle(self.now)
        self.des.schedule(announce_time, kind="announce", payload={"booth": booth.booth_id})
        self.des.schedule(break_start, kind="break_start", payload={"booth": booth.booth_id})
        self.des.schedule(break_end, kind="break_end", payload={"booth": booth.booth_id})
        # Продолжить обслуживание, если очередь есть
        if booth.queue and booth.currently_serving is None:
            self.start_service(booth)

    # -------------------------
    # Запуск моделирования
    # -------------------------
    def run(self):
        self.initialize()
        self.des.run(until=self.params["simulation_duration"], on_event=self.handle_event)
        # Финальный апдейт площади очередей до конца моделирования
        self.metrics.accumulate_queue_lengths(self.booths, self.params["simulation_duration"])


In [None]:
def compute_results(model: BusStationModel):
    m = model.metrics
    sim_time = model.params["simulation_duration"]
    avg_wait = (m.total_wait_time / m.num_completed) if m.num_completed > 0 else 0.0
    avg_system = (m.total_system_time / m.num_completed) if m.num_completed > 0 else 0.0
    avg_queue_lengths = [area / sim_time for area in m.queue_area]
    return {
        "avg_wait_time": avg_wait,
        "avg_system_time": avg_system,
        "avg_queue_lengths": avg_queue_lengths,
        "num_waited_over_break": m.num_waited_over_break,
        "num_completed": m.num_completed,
        "num_abandoned": m.num_abandoned,
    }

# Демонстрационный прогон
model = BusStationModel(PARAMS)
model.run()
results = compute_results(model)
results
