In [1]:
import random
from typing import Dict
import time
from collections import deque

In [3]:
class SlidingWindowRateLimiter:
    def __init__(self, window_size: int = 10, max_requests: int = 1):
        """
        Ініціалізація Rate Limiter
        """
        self.window_size = window_size
        self.max_requests = max_requests
        # Словник для зберігання історії: ключ - user_id, значення - черга timestamp'ів
        self.user_history: Dict[str, deque] = {}

    def _cleanup_window(self, user_id: str, current_time: float):
        """
        Очищає застарілі запити з вікна для конкретного користувача.
        Видаляє запис про користувача, якщо його історія порожня.
        """
        if user_id not in self.user_history:
            return

        history = self.user_history[user_id]
        # Поріг часу: все, що старіше за (поточний час - розмір вікна) видаляємо
        expire_time = current_time - self.window_size

        # Видаляємо старі записи з початку черги
        while history and history[0] <= expire_time:
            history.popleft()

        # Якщо історія користувача порожня, видаляємо ключ згідно критеріїв оцінки завдання
        if not history:
            del self.user_history[user_id]

    def can_send_message(self, user_id: str) -> bool:
        """
        Перевіряє, чи може користувач відправити повідомлення в даний момент.
        """
        current_time = time.time()
        self._cleanup_window(user_id, current_time)

        # Якщо користувача немає в базі, він може писати
        if user_id not in self.user_history:
            return True

        # Перевіряємо кількість актуальних повідомлень у вікні
        return len(self.user_history[user_id]) < self.max_requests

    def record_message(self, user_id: str) -> bool:
        """
        Реєструє нове повідомлення, якщо ліміт дозволяє.
        Повертає True, якщо повідомлення записано, False - якщо ліміт перевищено.
        """
        if self.can_send_message(user_id):
            # Якщо користувача ще немає (або був видалений cleanup'ом), створюємо нову чергу
            if user_id not in self.user_history:
                self.user_history[user_id] = deque()

            # Додаємо поточний час у кінець черги
            self.user_history[user_id].append(time.time())
            return True

        return False

    def time_until_next_allowed(self, user_id: str) -> float:
        """
        Розраховує час очікування до можливості відправлення наступного повідомлення.
        """
        current_time = time.time()
        self._cleanup_window(user_id, current_time)

        # Якщо ліміт не перевищено, чекати не потрібно
        if user_id not in self.user_history or len(self.user_history[user_id]) < self.max_requests:
            return 0.0

        # Час очікування = час найстарішого повідомлення у вікні + розмір вікна - поточний час
        oldest_message_time = self.user_history[user_id][0]
        wait_time = (oldest_message_time + self.window_size) - current_time

        return max(0.0, wait_time)

In [4]:
# Демонстрація роботи
def test_rate_limiter():
    # Створюємо rate limiter: вікно 10 секунд, 1 повідомлення
    limiter = SlidingWindowRateLimiter(window_size=10, max_requests=1)

    # Симулюємо потік повідомлень від користувачів (послідовні ID від 1 до 20)
    print("\n=== Симуляція потоку повідомлень ===")
    for message_id in range(1, 11):
        # Симулюємо різних користувачів (ID від 1 до 5)
        user_id = message_id % 5 + 1

        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))

        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")

        # Невелика затримка між повідомленнями для реалістичності
        # Випадкова затримка від 0.1 до 1 секунди
        time.sleep(random.uniform(0.1, 1.0))

    # Чекаємо, поки вікно частково очиститься
    print("\nОчікуємо 4 секунди...")
    time.sleep(4)

    print("\n=== Нова серія повідомлень після очікування ===")
    for message_id in range(11, 21):
        user_id = message_id % 5 + 1
        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))
        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")
        # Випадкова затримка від 0.1 до 1 секунди
        time.sleep(random.uniform(0.1, 1.0))

if __name__ == "__main__":
    test_rate_limiter()


=== Симуляція потоку повідомлень ===
Повідомлення  1 | Користувач 2 | ✓
Повідомлення  2 | Користувач 3 | ✓
Повідомлення  3 | Користувач 4 | ✓
Повідомлення  4 | Користувач 5 | ✓
Повідомлення  5 | Користувач 1 | ✓
Повідомлення  6 | Користувач 2 | × (очікування 7.3с)
Повідомлення  7 | Користувач 3 | × (очікування 6.9с)
Повідомлення  8 | Користувач 4 | × (очікування 7.2с)
Повідомлення  9 | Користувач 5 | × (очікування 7.2с)
Повідомлення 10 | Користувач 1 | × (очікування 7.3с)

Очікуємо 4 секунди...

=== Нова серія повідомлень після очікування ===
Повідомлення 11 | Користувач 2 | × (очікування 0.6с)
Повідомлення 12 | Користувач 3 | × (очікування 0.5с)
Повідомлення 13 | Користувач 4 | × (очікування 0.5с)
Повідомлення 14 | Користувач 5 | × (очікування 0.1с)
Повідомлення 15 | Користувач 1 | × (очікування 0.6с)
Повідомлення 16 | Користувач 2 | ✓
Повідомлення 17 | Користувач 3 | ✓
Повідомлення 18 | Користувач 4 | ✓
Повідомлення 19 | Користувач 5 | ✓
Повідомлення 20 | Користувач 1 | ✓
