In [1]:
import time
import random
from typing import Dict
from collections import deque

In [2]:
class SlidingWindowRateLimiter:
    def __init__(self, window_size: int = 10, max_requests: int = 1):
        self.window_size = window_size
        self.max_requests = max_requests
        self.user_requests: Dict[str, deque] = {}

    def _cleanup_window(self, user_id: str, current_time: float) -> None:
        if user_id not in self.user_requests:
            return

        user_deque = self.user_requests[user_id]
        cutoff_time = current_time - self.window_size

        while user_deque and user_deque[0] < cutoff_time:
            user_deque.popleft()

        if not user_deque:
            del self.user_requests[user_id]

    def can_send_message(self, user_id: str) -> bool:
        current_time = time.time()
        self._cleanup_window(user_id, current_time)

        if user_id not in self.user_requests:
            return True

        return len(self.user_requests[user_id]) < self.max_requests

    def record_message(self, user_id: str) -> bool:
        if not self.can_send_message(user_id):
            return False

        current_time = time.time()

        if user_id not in self.user_requests:
            self.user_requests[user_id] = deque()

        self.user_requests[user_id].append(current_time)

        return True

    def time_until_next_allowed(self, user_id: str) -> float:
        current_time = time.time()
        self._cleanup_window(user_id, current_time)

        if len(self.user_requests[user_id]) < self.max_requests:
            return 0.0

        oldest_time = self.user_requests[user_id][0]
        time_passed = oldest_time + self.window_size - current_time
        return max(0.0, time_passed)

In [3]:
# Демонстрація роботи
def test_rate_limiter():
    # Створюємо rate limiter: вікно 10 секунд, 1 повідомлення
    limiter = SlidingWindowRateLimiter(window_size=10, max_requests=1)

    # Симулюємо потік повідомлень від користувачів (послідовні ID від 1 до 20)
    print("\\n=== Симуляція потоку повідомлень ===")
    for message_id in range(1, 11):
        # Симулюємо різних користувачів (ID від 1 до 5)
        user_id = message_id % 5 + 1

        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))

        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")

        # Невелика затримка між повідомленнями для реалістичності
        # Випадкова затримка від 0.1 до 1 секунди
        time.sleep(random.uniform(0.1, 1.0))

    # Чекаємо, поки вікно очиститься
    print("\\nОчікуємо 4 секунди...")
    time.sleep(4)

    print("\\n=== Нова серія повідомлень після очікування ===")
    for message_id in range(11, 21):
        user_id = message_id % 5 + 1
        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))
        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")
        # Випадкова затримка від 0.1 до 1 секунди
        time.sleep(random.uniform(0.1, 1.0))

In [4]:
test_rate_limiter()

\n=== Симуляція потоку повідомлень ===
Повідомлення  1 | Користувач 2 | ✓
Повідомлення  2 | Користувач 3 | ✓
Повідомлення  3 | Користувач 4 | ✓
Повідомлення  4 | Користувач 5 | ✓
Повідомлення  5 | Користувач 1 | ✓
Повідомлення  6 | Користувач 2 | × (очікування 7.0с)
Повідомлення  7 | Користувач 3 | × (очікування 6.3с)
Повідомлення  8 | Користувач 4 | × (очікування 6.9с)
Повідомлення  9 | Користувач 5 | × (очікування 6.9с)
Повідомлення 10 | Користувач 1 | × (очікування 7.5с)
\nОчікуємо 4 секунди...
\n=== Нова серія повідомлень після очікування ===
Повідомлення 11 | Користувач 2 | × (очікування 1.0с)
Повідомлення 12 | Користувач 3 | × (очікування 0.3с)
Повідомлення 13 | Користувач 4 | × (очікування 0.4с)
Повідомлення 14 | Користувач 5 | ✓
Повідомлення 15 | Користувач 1 | × (очікування 0.4с)
Повідомлення 16 | Користувач 2 | ✓
Повідомлення 17 | Користувач 3 | ✓
Повідомлення 18 | Користувач 4 | ✓
Повідомлення 19 | Користувач 5 | × (очікування 7.0с)
Повідомлення 20 | Користувач 1 | ✓
