In [6]:
import numpy as np
from datetime import timedelta, datetime
import random
import time

In [7]:
# Параметры задачи
TOTAL_BUSES = 8
ROUTE_DURATION_MINUTES = 70  # 1 час 10 минут
WORK_HOURS = 21  # Часы работы (с 6:00 до 3:00)
PEAK_HOURS = [(7, 9), (17, 19)]  # Часы пик
PEAK_LOAD = 0.7  # 70% автобусов в час пик
OFF_PEAK_LOAD = 0.3  # 30% вне пика

DRIVER_TYPE_A_HOURS = 8  # Максимальная смена типа A
DRIVER_TYPE_B_HOURS = 24  # Максимальная смена типа B

np.random.seed(42)  # Для воспроизводимости

# ======== ПОДХОД 1: Решение "в лоб" с NumPy ========

def brute_force_schedule():
    """
    Решение задачи распределения автобусов и водителей с помощью NumPy.
    """
    # Общее число часов (6:00 до 3:00 следующего дня)
    hours = np.arange(6, 6 + WORK_HOURS) % 24  # В 24-часовом формате
    buses_schedule = np.zeros_like(hours)  # Автобусы на каждый час

    # Распределяем нагрузку
    for i, hour in enumerate(hours):
        is_peak = any(start <= hour < end for start, end in PEAK_HOURS)
        load = PEAK_LOAD if is_peak else OFF_PEAK_LOAD
        buses_schedule[i] = np.ceil(TOTAL_BUSES * load)

    # Считаем минимальное число водителей
    total_routes_per_bus = (WORK_HOURS * 60) // ROUTE_DURATION_MINUTES
    drivers_type_a = np.ceil(buses_schedule.sum() / (DRIVER_TYPE_A_HOURS * total_routes_per_bus))
    drivers_type_b = np.ceil(buses_schedule.sum() / (DRIVER_TYPE_B_HOURS * total_routes_per_bus))

    return buses_schedule, drivers_type_a, drivers_type_b

# ======== ПОДХОД 2: Генетический алгоритм ========

def fitness(schedule):
    """
    Целевая функция для оценки расписания.
    Минимизируем количество водителей и максимизируем маршруты.
    """
    drivers_type_a = np.ceil(schedule.sum() / DRIVER_TYPE_A_HOURS)
    drivers_type_b = np.ceil(schedule.sum() / DRIVER_TYPE_B_HOURS)
    total_drivers = drivers_type_a + drivers_type_b

    # Чем меньше водителей, тем лучше
    return 1 / total_drivers

def mutate(schedule):
    """
    Мутация расписания: небольшие изменения в расписании.
    """
    index = random.randint(0, len(schedule) - 1)
    schedule[index] = max(1, schedule[index] + random.choice([-1, 1]))  # Изменяем число автобусов
    return schedule

def crossover(parent1, parent2):
    """
    Кроссовер (скрещивание) двух родительских расписаний.
    """
    point = random.randint(1, len(parent1) - 1)
    child1 = np.concatenate((parent1[:point], parent2[point:]))
    child2 = np.concatenate((parent2[:point], parent1[point:]))
    return child1, child2

def genetic_algorithm(num_generations=100, population_size=50):
    """
    Решение задачи с помощью генетического алгоритма.
    """
    # Инициализируем популяцию случайных расписаний
    population = [np.random.randint(1, TOTAL_BUSES + 1, WORK_HOURS) for _ in range(population_size)]

    for generation in range(num_generations):
        # Оцениваем каждое расписание
        fitness_scores = [fitness(schedule) for schedule in population]

        # Селекция: выбираем лучших
        sorted_population = [x for _, x in sorted(zip(fitness_scores, population), key=lambda pair: pair[0], reverse=True)]

        population = sorted_population[:population_size // 2]

        # Кроссовер: создаём новое поколение
        children = []
        for i in range(len(population) // 2):
            child1, child2 = crossover(population[i], population[len(population) - i - 1])
            children.extend([child1, child2])

        # Мутация
        children = [mutate(child) for child in children]

        # Обновляем популяцию
        population.extend(children)

    # Возвращаем лучшее расписание
    best_schedule = max(population, key=fitness)
    return best_schedule, fitness(best_schedule)

# ======== СРАВНЕНИЕ ПОДХОДОВ ========

def compare_approaches():
    # Замеряем время выполнения подхода "в лоб"
    start = time.time()
    brute_schedule, drivers_a, drivers_b = brute_force_schedule()
    brute_time = time.time() - start

    # Замеряем время выполнения генетического алгоритма
    start = time.time()
    genetic_schedule, genetic_fitness = genetic_algorithm()
    genetic_time = time.time() - start

    # Выводим результаты
    print("Подход 'в лоб':")
    print(f"Расписание: {brute_schedule}")
    print(f"Водителей типа A: {drivers_a}, типа B: {drivers_b}")
    print(f"Время выполнения: {brute_time:.4f} секунд\n")

    print("Генетический алгоритм:")
    print(f"Лучшее расписание: {genetic_schedule}")
    print(f"Фитнесс расписания: {genetic_fitness:.4f}")
    print(f"Время выполнения: {genetic_time:.4f} секунд")

# ======== ЗАПУСК ПРОГРАММЫ ========


compare_approaches()


In [7]:
import random
import numpy as np

class Driver:
    def __init__(self, name: str, shift_duration: int, break_duration: int):
        self.name = name
        self.shift_duration = shift_duration
        self.break_duration = break_duration
        self.time_worked = 0
        self.on_break = False
        self.break_time_remaining = 0

    def work(self):
        if self.on_break:
            self.break_time_remaining -= 1
            if self.break_time_remaining <= 0:
                self.on_break = False
            return False

        self.time_worked += 1
        if self.time_worked >= self.shift_duration:
            self.start_break()
        return True

    def start_break(self):
        self.on_break = True
        self.break_time_remaining = self.break_duration
        self.time_worked = 0


class Bus:
    def __init__(self, capacity: int = 20, driver: Driver = None):
        self.capacity = capacity
        self.currently_onboard = 0
        self.driver = driver

    def load_passengers(self, bus_stop):
        if self.driver and not self.driver.work():
            print(f"Driver {self.driver.name} is on break. Bus cannot load passengers.")
            return 0

        available_seats = self.capacity - self.currently_onboard
        boarding_passengers = bus_stop.board_passengers(available_seats=available_seats)
        self.currently_onboard += boarding_passengers
        return boarding_passengers

    def unload_all_passengers(self):
        exiting_passengers = self.currently_onboard
        self.currently_onboard = 0
        return exiting_passengers

class BusStop:
    def __init__(self, passengers_waiting: int):
        self.passengers_waiting = passengers_waiting

    def board_passengers(self, available_seats: int) -> int:
        board_count = min(self.passengers_waiting, available_seats)
        self.passengers_waiting -= board_count
        return board_count

    def add_passengers(self, peak: bool):
        chance = random.random()
        if peak:
            if chance < 0.7:  # Higher probability during peak
                self.passengers_waiting += random.randint(3, 7)
        else:
            if chance < 0.3:  # Lower probability off-peak
                self.passengers_waiting += random.randint(1, 3)

class Route:
    def __init__(self, bus_stops_num: int, route_duration: int, peak: bool, upper_limit: int):
        self.num_stops = bus_stops_num
        self.time_between_stops = route_duration // bus_stops_num  # Calculate average time between stops
        self.bus_stops = [
            BusStop(random.randint(0, int(upper_limit * (0.7 if peak else 0.3))))
            for _ in range(bus_stops_num)
        ]

class Simulation:
    def __init__(
        self,
        bus_num: int,
        route_duration: int,
        bus_stops_num: int,
        start_time: np.datetime64,
        end_time: np.datetime64,
        tag: str,
        driver_shift_duration: int,
        driver_break_duration: int,
    ):
        self.tag = tag
        self.tick = 0
        self.route = Route(bus_stops_num, route_duration, tag in {"monday", "tuesday", "wednesday", "thursday", "friday"}, 100)
        self.time_between_stops = self.route.time_between_stops
        self.end_tick = ((end_time - start_time).astype("timedelta64[m]").item().seconds // self.time_between_stops)
        self.drivers = [
            Driver(name=f"Driver {i}", shift_duration=driver_shift_duration, break_duration=driver_break_duration)
            for i in range(bus_num)
        ]
        self.buses = [Bus(driver=self.drivers[i]) for i in range(bus_num)]
        self.bus_positions = [0 for _ in range(bus_num)]  # Index of the stop for each bus
        self.bus_intervals = route_duration // bus_num  # Time interval between buses starting
        self.start_time = start_time

        # Schedule tracker
        self.schedule = {i: [] for i in range(self.route.num_stops)}
        self.bus_start_ticks = [i * self.bus_intervals for i in range(bus_num)]  # Staggered start times

    def play_simulation(self):
        total_passengers_dropped = 0
        while self.tick < self.end_tick:
            # Passengers arrive at stops based on the peak flag
            current_time_in_seconds = int((self.start_time + np.timedelta64(self.tick * self.time_between_stops, 'm')).astype("timedelta64[s]").item().seconds % 86400)
            current_hour = current_time_in_seconds // 3600
            is_peak = (7 <= current_hour < 9) or (17 <= current_hour < 19)

            for stop in self.route.bus_stops:
                stop.add_passengers(peak=is_peak if self.tag not in {"saturday", "sunday"} else random.random() < 0.5)

            for i, bus in enumerate(self.buses):
                # Skip buses that haven't started yet
                if self.tick < self.bus_start_ticks[i]:
                    continue

                # Check if the bus is at a stop
                stop_index = self.bus_positions[i]

                # Record the schedule
                current_time = self.start_time + np.timedelta64(self.tick * self.time_between_stops, 'm')
                self.schedule[stop_index].append(str(current_time))

                # Unload all passengers at the last stop
                if stop_index == len(self.route.bus_stops) - 1:
                    total_passengers_dropped += bus.unload_all_passengers()

                # Load passengers at the current stop
                boarding = bus.load_passengers(self.route.bus_stops[stop_index])
                print(f"Tick {self.tick}: Bus {i} at Stop {stop_index} loaded {boarding} passengers.")

                # Move to the next stop
                self.bus_positions[i] = (stop_index + 1) % self.route.num_stops

            # Increment simulation time
            self.tick += 1

        # Check if all passengers were dropped off
        remaining_passengers = sum(stop.passengers_waiting for stop in self.route.bus_stops)
        print(f"Simulation finished. Total passengers dropped: {total_passengers_dropped}. Remaining passengers: {remaining_passengers}.")

    def print_schedule(self):
        print("\nBus Schedule:")
        for stop, times in self.schedule.items():
            print(f"Stop {stop}: {', '.join(times)}")


# Run simulation example
start_time = np.datetime64('2024-12-14T06:00')
end_time = np.datetime64('2024-12-15T03:00')
sim = Simulation(bus_num=8, route_duration=70, bus_stops_num=7, start_time=start_time, end_time=end_time, tag="monday", driver_shift_duration=180, driver_break_duration=30)
sim.play_simulation()
sim.print_schedule()


In [9]:


BUS_NUM = 8
TOTAL_BUSES = 8
ROUTE_DURATION_MINUTES = SHIFT = end_time - start_time.astype('timedelta64[h]')
SHIFT
BUS_STOPS_NUM = 7
#Буду считать что между всеми остановками равное время езды
TIME_ON_THE_WAY = np.timedelta64(ROUTE_DURATION_MINUTES//BUS_STOPS_NUM, 'm')

#начало работы в 06:00 конец в 03:00 
start_time = np.datetime64(6,'h')
end_time = np.timedelta64(27, 'h')
SHIFT = end_time - start_time.astype('timedelta64[h]')

time_table = np.zeros((BUS_NUM,round(ROUTE_DURATION_MINUTES/BUS_STOPS_NUM)))

total_elements = BUS_NUM * round(ROUTE_DURATION_MINUTES/BUS_STOPS_NUM)

# Генерация расписания
schedule = start_time + np.arange(total_elements) * TIME_ON_THE_WAY

# Преобразуем в матрицу n x m
schedule_matrix = schedule.reshape(BUS_NUM, round(ROUTE_DURATION_MINUTES/BUS_STOPS_NUM))

schedule_matrix

In [None]:
import random
import numpy as np

def simulate_with_parameters(bus_num, route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration):
    sim = Simulation(
        bus_num=bus_num,
        route_duration=route_duration,
        bus_stops_num=bus_stops_num,
        start_time=start_time,
        end_time=end_time,
        tag="monday",  # Используем рабочий день для пиковых нагрузок
        driver_shift_duration=driver_shift_duration,
        driver_break_duration=driver_break_duration,
    )
    sim.play_simulation()
    
    # Анализ времени ожидания
    wait_times = [stop.passengers_waiting for stop in sim.route.bus_stops]
    wait_times.sort()
    
    # Доверительный интервал (например, 95% случаев ожидания)
    percentile_95 = np.percentile(wait_times, 95)
    return percentile_95 <= 2  # Условие: ожидание <= 2 автобусов

# Brute Force Algorithm
def brute_force_minimize_buses(route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration):
    for bus_num in range(1, 21):  # Проверяем от 1 до 20 автобусов
        if simulate_with_parameters(bus_num, route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration):
            return bus_num
    return None  # Если не нашли подходящее число автобусов

# Genetic Algorithm
def genetic_algorithm_minimize_buses(route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration, population_size=10, generations=50):
    # Генерация начальной популяции
    population = [random.randint(1, 20) for _ in range(population_size)]
    
    for generation in range(generations):
        # Оценка фитнеса (чем меньше автобусов, тем лучше, но только если критерий выполнен)
        fitness = []
        for bus_num in population:
            if simulate_with_parameters(bus_num, route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration):
                fitness.append((bus_num, 1 / bus_num))  # Обратное число автобусов как фитнес
            else:
                fitness.append((bus_num, 0))  # Неприемлемые решения
        
        # Сортировка по фитнесу
        fitness.sort(key=lambda x: x[1], reverse=True)
        
        # Отбор лучших (топ-50%)
        top_half = [f[0] for f in fitness[:population_size // 2]]
        
        # Скрещивание и мутация
        new_population = []
        while len(new_population) < population_size:
            parent1, parent2 = random.sample(top_half, 2)
            child = (parent1 + parent2) // 2  # Среднее значение как ребенок
            if random.random() < 0.1:  # Мутация с вероятностью 10%
                child += random.choice([-1, 1])
                child = max(1, min(child, 20))  # Ограничиваем число автобусов
            new_population.append(child)
        
        population = new_population

    # Лучший результат из последней популяции
    for bus_num in sorted(population):
        if simulate_with_parameters(bus_num, route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration):
            return bus_num
    return None  # Если не нашли подходящее число автобусов

# Параметры маршрута
route_duration = 70
bus_stops_num = 7
start_time = np.datetime64('2024-12-14T06:00')
end_time = np.datetime64('2024-12-15T03:00')
driver_shift_duration = 180
driver_break_duration = 30

# Запуск алгоритмов
brute_force_result = brute_force_minimize_buses(route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration)
print(f"Brute Force Result: Minimum buses = {brute_force_result}")

ga_result = genetic_algorithm_minimize_buses(route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration)
print(f"Genetic Algorithm Result: Minimum buses = {ga_result}")



In [None]:
def genetic_algorithm_minimize_buses(route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration, population_size=10, generations=50):
    start_time_execution = time.time()
    max_time = 300  # 5 минут на выполнение алгоритма

    # Генерация начальной популяции
    population = [random.randint(1, 20) for _ in range(population_size)]
    
    for generation in range(generations):
        if time.time() - start_time_execution > max_time:
            print("Genetic algorithm terminated early due to timeout.")
            break

        # Оценка фитнеса
        fitness = []
        for bus_num in population:
            if simulate_with_parameters(bus_num, route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration):
                fitness.append((bus_num, 1 / bus_num))
            else:
                fitness.append((bus_num, 0))
        
        fitness.sort(key=lambda x: x[1], reverse=True)
        top_half = [f[0] for f in fitness[:population_size // 2]]

        new_population = []
        while len(new_population) < population_size:
            parent1, parent2 = random.sample(top_half, 2)
            child = (parent1 + parent2) // 2
            if random.random() < 0.1:
                child += random.choice([-1, 1])
                child = max(1, min(child, 20))
            new_population.append(child)

        population = new_population

    for bus_num in sorted(population):
        if simulate_with_parameters(bus_num, route_duration, bus_stops_num, start_time, end_time, driver_shift_duration, driver_break_duration):
            return bus_num
    return None
