In [1]:
# вимикаємо зайві попередження
import warnings
warnings.filterwarnings("ignore")

# друк всіх результатів в одній комірці а не тільки останнього
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [3]:
import random
from typing import Dict
import time
from collections import deque

class ThrottlingRateLimiter:
    """Клас для обмеження швидкості запитів користувачів за допомогою підходу Throttling.
    Увага!!!: цей клас не є потокобезпечним і призначений для використання в однопоточному середовищі.
    Використовується для обмеження кількості запитів від користувачів протягом певного часу.
    Не зберігає повідомлення, які були відхилені через обмеження частоти запитів (не вимагається умовами завдання).
    Тож для обробки відхилених повідомлень використовуйте інші додаткові механізми, наприклад, чергу або логування.
    Реалізація дуже схожа на код з Завдання 1, але повністю відповідає вимогам Завдання 2. Всі претензії - до методолога.
    """

    def __init__(self, min_interval: float = 10.0):
        self.min_interval = min_interval  # розмір інтервалу в секундах
        self.user_requests: Dict[str, float] = {} # словник для зберігання інформації про час останнього повідомлення користувачів

    def can_send_message(self, user_id: str) -> bool:
        """Перевіряє, чи може користувач надіслати повідомлення."""
        if self.user_requests.get(user_id) is None:
            return True # якщо користувача немає в словнику, то можемо дозволити надсилання повідомлення
        elif time.time() > (self.user_requests[user_id] + self.min_interval): # якщо поточний час більше, ніж час останнього запиту + мінімальний інтервал, то дозволяємо надсилання
            return True
        else:
            return False

    def record_message(self, user_id: str, message_id: str) -> bool:
        """Записує повідомлення користувача в історію, якщо дозволено його надіслати.
          якщо ні - повертає False, повідомлення не записується."""
        if self.can_send_message(user_id):
            message_deque.append((user_id, time.time(), message_id))
            self.user_requests[user_id] = time.time() # оновлюємо час останнього запиту користувача
            return True
        else:
            return False

    def time_until_next_allowed(self, user_id: str) -> float:
        """Повертає час до наступного дозволеного запиту для користувача."""
        if not self.can_send_message(user_id):
            return (self.user_requests[user_id] + self.min_interval) - time.time()
        return 0.0

# Демонстрація роботи
def test_throttling_limiter():
    # Створюємо rate limiter: вікно 10 секунд, 1 повідомлення
    limiter = ThrottlingRateLimiter(min_interval=10.0)

    # Симулюємо потік повідомлень від користувачів (послідовні ID від 1 до 20)
    print("\n=== Симуляція потоку повідомлень (Throttling) ===")

    for message_id in range(1, 11):
        # Симулюємо різних користувачів (ID від 1 до 5)
        user_id = message_id % 5 + 1

        result = limiter.record_message(str(user_id), str(message_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))

        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")

        # Невелика затримка між повідомленнями для реалістичності
        # Випадкова затримка від 0.1 до 1 секунди
        time.sleep(random.uniform(0.1, 1.0))

    print("\nОчікуємо 10 секунд...")
    time.sleep(10)

    print("\n=== Нова серія повідомлень після очікування ===")

    for message_id in range(11, 21):
        user_id = message_id % 5 + 1
        result = limiter.record_message(str(user_id), str(message_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))

        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")
        # Випадкова затримка від 0.1 до 1 секунди
        time.sleep(random.uniform(0.1, 0.5))

    print("\n=== Кінець симуляції ===")

if __name__ == "__main__":

	# черга з історією повідомлень
    message_deque = deque(maxlen=100)

    test_throttling_limiter()

    print("\n=== Повідомлення в історії ===")
    for user_id, timestamp, message_id in message_deque:
        print(f"Користувач {user_id} | Час: {time.ctime(timestamp)} | Повідомлення: {message_id}")



=== Симуляція потоку повідомлень (Throttling) ===
Повідомлення  1 | Користувач 2 | ✓
Повідомлення  2 | Користувач 3 | ✓
Повідомлення  3 | Користувач 4 | ✓
Повідомлення  4 | Користувач 5 | ✓
Повідомлення  5 | Користувач 1 | ✓
Повідомлення  6 | Користувач 2 | × (очікування 6.9с)
Повідомлення  7 | Користувач 3 | × (очікування 6.6с)
Повідомлення  8 | Користувач 4 | × (очікування 6.6с)
Повідомлення  9 | Користувач 5 | × (очікування 6.9с)
Повідомлення 10 | Користувач 1 | × (очікування 7.5с)

Очікуємо 10 секунд...

=== Нова серія повідомлень після очікування ===
Повідомлення 11 | Користувач 2 | ✓
Повідомлення 12 | Користувач 3 | ✓
Повідомлення 13 | Користувач 4 | ✓
Повідомлення 14 | Користувач 5 | ✓
Повідомлення 15 | Користувач 1 | ✓
Повідомлення 16 | Користувач 2 | × (очікування 8.9с)
Повідомлення 17 | Користувач 3 | × (очікування 8.8с)
Повідомлення 18 | Користувач 4 | × (очікування 8.5с)
Повідомлення 19 | Користувач 5 | × (очікування 8.2с)
Повідомлення 20 | Користувач 1 | × (очікування 8.1