# Завдання 1. Реалізація Rate Limiter з використанням алгоритму Sliding Window для обмеження частоти повідомлень у чаті

In [56]:
import random
from typing import Dict
import time
from collections import deque

In [57]:


class SlidingWindowRateLimiter:
    def __init__(self, window_size: int = 10, max_requests: int = 1):
        self.window_size = window_size
        self.max_requests = max_requests
        self.history: Dict[str, deque] = {}

    def _cleanup_window(self, user_id: str, current_time: float) -> None:
        if user_id not in self.history:
            self.history[user_id] = deque()

        window = self.history[user_id]
        while window and current_time - window[0] > self.window_size:
            window.popleft()

    def can_send_message(self, user_id: str) -> bool:
        current_time = time.time()
        self._cleanup_window(user_id, current_time)
        return len(self.history[user_id]) < self.max_requests

    def record_message(self, user_id: str) -> bool:
        current_time = time.time()
        self._cleanup_window(user_id, current_time)
        if len(self.history[user_id]) < self.max_requests:
            self.history[user_id].append(current_time)
            return True
        return False

    def time_until_next_allowed(self, user_id: str) -> float:
        current_time = time.time()
        self._cleanup_window(user_id, current_time)
        if len(self.history[user_id]) < self.max_requests:
            return 0.0
        oldest = self.history[user_id][0]
        return max(0.0, self.window_size - (current_time - oldest))


In [58]:
# Демонстрація роботи
def test_rate_limiter():
    # Створюємо rate limiter: вікно 10 секунд, 1 повідомлення
    limiter = SlidingWindowRateLimiter(window_size=10, max_requests=1)

    # Симулюємо потік повідомлень від користувачів (послідовні ID від 1 до 20)
    print("\n=== Симуляція потоку повідомлень ===")
    for message_id in range(1, 11):
        # Симулюємо різних користувачів (ID від 1 до 5)
        user_id = message_id % 5 + 1

        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))

        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")

        # Невелика затримка між повідомленнями для реалістичності
        time.sleep(random.uniform(0.1, 1.0))

    # Чекаємо, поки вікно очиститься
    print("\nОчікуємо 4 секунди...")
    time.sleep(4)

    print("\n=== Нова серія повідомлень після очікування ===")
    for message_id in range(11, 21):
        user_id = message_id % 5 + 1
        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))
        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")
        time.sleep(random.uniform(0.1, 1.0))

In [59]:
test_rate_limiter()


=== Симуляція потоку повідомлень ===
Повідомлення  1 | Користувач 2 | ✓
Повідомлення  2 | Користувач 3 | ✓
Повідомлення  3 | Користувач 4 | ✓
Повідомлення  4 | Користувач 5 | ✓
Повідомлення  5 | Користувач 1 | ✓
Повідомлення  6 | Користувач 2 | × (очікування 8.1с)
Повідомлення  7 | Користувач 3 | × (очікування 7.9с)
Повідомлення  8 | Користувач 4 | × (очікування 7.3с)
Повідомлення  9 | Користувач 5 | × (очікування 7.1с)
Повідомлення 10 | Користувач 1 | × (очікування 7.5с)

Очікуємо 4 секунди...

=== Нова серія повідомлень після очікування ===
Повідомлення 11 | Користувач 2 | × (очікування 1.0с)
Повідомлення 12 | Користувач 3 | × (очікування 0.6с)
Повідомлення 13 | Користувач 4 | × (очікування 0.2с)
Повідомлення 14 | Користувач 5 | × (очікування 0.5с)
Повідомлення 15 | Користувач 1 | × (очікування 0.9с)
Повідомлення 16 | Користувач 2 | ✓
Повідомлення 17 | Користувач 3 | ✓
Повідомлення 18 | Користувач 4 | ✓
Повідомлення 19 | Користувач 5 | ✓
Повідомлення 20 | Користувач 1 | ✓


# Завдання 2. Реалізація Rate Limiter з використанням алгоритму Throttling для обмеження частоти повідомлень у чаті

In [60]:
import time
from typing import Dict
import random

In [61]:
class ThrottlingRateLimiter:
    def __init__(self, min_interval: float = 10.0):
        self.min_interval = min_interval
        self.last_message_time: Dict[str, float] = {}

    def can_send_message(self, user_id: str) -> bool:
        current_time = time.time()
        last_time = self.last_message_time.get(user_id)
        if last_time is None:
            return True
        return (current_time - last_time) >= self.min_interval

    def record_message(self, user_id: str) -> bool:
        if self.can_send_message(user_id):
            self.last_message_time[user_id] = time.time()
            return True
        return False

    def time_until_next_allowed(self, user_id: str) -> float:
        current_time = time.time()
        last_time = self.last_message_time.get(user_id)
        if last_time is None:
            return 0.0
        wait_time = self.min_interval - (current_time - last_time)
        return max(0.0, wait_time)

In [62]:
# Демонстрація роботи
def test_throttling_limiter():
    limiter = ThrottlingRateLimiter(min_interval=10.0)

    print("\n=== Симуляція потоку повідомлень (Throttling) ===")
    for message_id in range(1, 11):
        user_id = message_id % 5 + 1

        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))

        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")

        # Випадкова затримка між повідомленнями
        time.sleep(random.uniform(0.1, 1.0))

    print("\nОчікуємо 10 секунд...")
    time.sleep(10)

    print("\n=== Нова серія повідомлень після очікування ===")
    for message_id in range(11, 21):
        user_id = message_id % 5 + 1
        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))
        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")
        time.sleep(random.uniform(0.1, 1.0))

In [63]:
test_throttling_limiter()


=== Симуляція потоку повідомлень (Throttling) ===
Повідомлення  1 | Користувач 2 | ✓
Повідомлення  2 | Користувач 3 | ✓
Повідомлення  3 | Користувач 4 | ✓
Повідомлення  4 | Користувач 5 | ✓
Повідомлення  5 | Користувач 1 | ✓
Повідомлення  6 | Користувач 2 | × (очікування 7.3с)
Повідомлення  7 | Користувач 3 | × (очікування 7.4с)
Повідомлення  8 | Користувач 4 | × (очікування 7.6с)
Повідомлення  9 | Користувач 5 | × (очікування 7.7с)
Повідомлення 10 | Користувач 1 | × (очікування 7.8с)

Очікуємо 10 секунд...

=== Нова серія повідомлень після очікування ===
Повідомлення 11 | Користувач 2 | ✓
Повідомлення 12 | Користувач 3 | ✓
Повідомлення 13 | Користувач 4 | ✓
Повідомлення 14 | Користувач 5 | ✓
Повідомлення 15 | Користувач 1 | ✓
Повідомлення 16 | Користувач 2 | × (очікування 6.7с)
Повідомлення 17 | Користувач 3 | × (очікування 7.5с)
Повідомлення 18 | Користувач 4 | × (очікування 7.4с)
Повідомлення 19 | Користувач 5 | × (очікування 7.1с)
Повідомлення 20 | Користувач 1 | × (очікування 7.2