# Домашнє завдання до модуля “Алгоритми оптимізації та керування ресурсами” (теми 7 та 8)

## Завдання 1. Оптимізація доступу до даних за допомогою LRU-кешу



Крок 1. Імпорти та допоміжні функції форматування часу

In [1]:
# Крок 1. Імпорти та хелпери
import time
import random
import itertools
from collections import OrderedDict
from typing import List, Tuple, Iterable

def pretty_seconds(sec: float) -> str:
    """
    Повертає час у зручному форматі: сек (якщо >=1с) або мс, округлення для читабельності.
    """
    if sec >= 1.0:
        return f"{sec:6.2f} c"
    return f"{sec*1000:6.0f} ms"

Крок 2. Реалізація класу LRUCache (ємність K = 1000)

In [2]:
# Крок 2. Реалізація LRU-кешу на базі OrderedDict
class LRUCache:
    """
    Простіший LRU-кеш.
    - Ключ: кортеж (left, right) — межі діапазону для суми.
    - Значення: обчислена сума на цьому діапазоні.
    """
    def __init__(self, capacity: int = 1000):
        self.capacity = capacity
        self._od = OrderedDict()

    def get(self, key: Tuple[int, int]) -> int:
        """
        Повертає значення з кешу або -1 у випадку промаху (miss).
        Одночасно оновлює «свіжість» ключа (MRU).
        """
        if key not in self._od:
            return -1
        value = self._od.pop(key)
        self._od[key] = value  # переміщаємо в кінець як «найсвіжіший»
        return value

    def put(self, key: Tuple[int, int], value: int) -> None:
        """
        Додає/оновлює значення; за переповнення — викидає найстаріший (LRU) елемент.
        """
        if key in self._od:
            self._od.pop(key)
        elif len(self._od) >= self.capacity:
            self._od.popitem(last=False)  # видаляємо LRU (перший)
        self._od[key] = value

    def keys(self) -> Iterable[Tuple[int, int]]:
        """Поточні ключі (використаємо для інвалідації)."""
        return self._od.keys()

    def delete_key(self, key: Tuple[int, int]) -> None:
        """Видаляє ключ, якщо він є в кеші."""
        if key in self._od:
            del self._od[key]

    def __len__(self):
        return len(self._od)

Крок 3. Функція генерації запитів

In [3]:
# Крок 3. Генерація запитів за заданою моделлю (гарячі/випадкові діапазони + рідкі оновлення)
def make_queries(n, q, hot_pool=30, p_hot=0.95, p_update=0.03):
    """
    Повертає список запитів довжини q для масиву розміру n:
      - ~3%: ("Update", index, value)
      - ~97%: ("Range", left, right), із яких 95% — з «гарячого» пулу
    """
    hot = [(random.randint(0, n//2), random.randint(n//2, n-1)) for _ in range(hot_pool)]
    queries = []
    for _ in range(q):
        if random.random() < p_update:     # ~3% — Update
            idx = random.randint(0, n-1)
            val = random.randint(1, 100)
            queries.append(("Update", idx, val))
        else:                               # ~97% — Range
            if random.random() < p_hot:     # 95% — з «гарячого» пулу
                left, right = random.choice(hot)
            else:                           # 5% — повністю випадкові
                left = random.randint(0, n-1)
                right = random.randint(left, n-1)
            queries.append(("Range", left, right))
    return queries

Крок 4. Функції без кешу (range_sum_no_cache, update_no_cache)

In [4]:
# Крок 4. Базові функції без кешування

def range_sum_no_cache(array: List[int], left: int, right: int) -> int:
    """
    Обчислює суму на [left, right] без кешу.
    Важливо: уникаємо зрізів списку (які копіюють пам'ять) — беремо islice.
    """
    return sum(itertools.islice(array, left, right + 1))

def update_no_cache(array: List[int], index: int, value: int) -> None:
    """
    Оновлює елемент у масиві без будь-якої інвалідації (бо кешу немає).
    """
    array[index] = value

Крок 5. Функції з кешем (range_sum_with_cache, update_with_cache + інвалідація)

In [5]:
# Крок 5. Функції з кешем (використовують LRUCache)

def range_sum_with_cache(array: List[int], left: int, right: int, cache: LRUCache) -> int:
    """
    1) Пробує взяти значення з LRU-кешу за ключем (left, right).
    2) Якщо промах — рахує суму, кладе в кеш, повертає результат.
    """
    key = (left, right)
    cached = cache.get(key)
    if cached != -1:
        return cached
    s = sum(itertools.islice(array, left, right + 1))
    cache.put(key, s)
    return s

def update_with_cache(array: List[int], index: int, value: int, cache: LRUCache) -> None:
    """
    1) Оновлює елемент масиву.
    2) Інвалідовує ВСІ кешовані діапазони, що накривають змінений index.
       Інвалідація — лінійний прохід по ключах кешу (за вимогою ТЗ).
    """
    array[index] = value
    to_delete = []
    # list(...) щоб "заморозити" поточні ключі під час ітерації
    for (l, r) in list(cache.keys()):
        if l <= index <= r:
            to_delete.append((l, r))
    for k in to_delete:
        cache.delete_key(k)

Крок 6. Бенчмарк: формуємо дані, запускаємо послідовність запитів без кешу і з кешем

In [6]:
# Крок 6. Запуск продуктивності й коректності (XOR-агрегація результатів для чесності порівняння)

def run_benchmark(n=100_000, q=50_000, seed=42, cache_capacity=1000):
    """
    Створює масив, генерує запити, потім виконує:
      - Послідовність БЕЗ кешу: заміри часу.
      - Послідовність З кешем: заміри часу.
    Друк підсумків у зручному вигляді + перевірка тотожності результатів (XOR).
    """
    random.seed(seed)

    # 1) Дані
    array = [random.randint(1, 100) for _ in range(n)]
    queries = make_queries(n, q)

    # 2) Окремі копії масиву для кожного режиму (щоб оновлення не «змішувались»)
    arr_no_cache = list(array)
    arr_cache = list(array)

    # 3) Без кешу
    t0 = time.perf_counter()
    acc_no_cache = 0  # XOR, щоб «використати» результат і уникнути оптимізацій
    for typ, a, b in queries:
        if typ == "Range":
            acc_no_cache ^= range_sum_no_cache(arr_no_cache, a, b)
        else:  # Update
            update_no_cache(arr_no_cache, a, b)
    t1 = time.perf_counter()
    t_no = t1 - t0

    # 4) З кешем
    cache = LRUCache(cache_capacity)
    t2 = time.perf_counter()
    acc_cache = 0
    for typ, a, b in queries:
        if typ == "Range":
            acc_cache ^= range_sum_with_cache(arr_cache, a, b, cache)
        else:
            update_with_cache(arr_cache, a, b, cache)
    t3 = time.perf_counter()
    t_with = t3 - t2

    # 5) Перевірка коректності
    ok = (acc_no_cache == acc_cache)

    # 6) Вивід результатів
    print("=== Результати (Завдання 1: LRU-кеш) ===")
    print(f"Без кешу : {pretty_seconds(t_no)}")
    print(f"LRU-кеш  : {pretty_seconds(t_with)}  (прискорення ×{(t_no / t_with):.2f})")
    print(f"Перевірка тотожності результатів (XOR): {'OK' if ok else 'FAILED'}")
    print(f"Розмір кешу після виконання: {len(cache)} ключів")

Крок 7. Швидка міні-перевірка (маленькі параметри) + повний прогін

In [7]:
# Крок 7. Швидкий прогін + повний тест
def quick_sanity():
    """
    Невелика швидка перевірка на малих n, q — щоб миттєво побачити формат виводу.
    """
    print("Швидка перевірка на малих параметрах (n=2_000, q=2_000)...")
    run_benchmark(n=2_000, q=2_000, seed=123, cache_capacity=100)
    print("Готово. Запускаємо великий тест...\n")

# ---- Запуск у Colab (спершу mini, потім full) ----
quick_sanity()
run_benchmark(n=100_000, q=50_000, seed=42, cache_capacity=1000)

Швидка перевірка на малих параметрах (n=2_000, q=2_000)...
=== Результати (Завдання 1: LRU-кеш) ===
Без кешу :     37 ms
LRU-кеш  :     12 ms  (прискорення ×3.04)
Перевірка тотожності результатів (XOR): OK
Розмір кешу після виконання: 51 ключів
Готово. Запускаємо великий тест...

=== Результати (Завдання 1: LRU-кеш) ===
Без кешу :  44.44 c
LRU-кеш  :  16.18 c  (прискорення ×2.75)
Перевірка тотожності результатів (XOR): OK
Розмір кешу після виконання: 59 ключів


## Завдання 2. Реалізація Rate Limiter з використанням алгоритму Sliding Window для обмеження частоти повідомлень у чаті


Крок 1. Імпорти

In [8]:
# Крок 1. Імпорти
import time
import random
from collections import deque
from typing import Dict, Deque

Крок 2. Реалізація класу SlidingWindowRateLimiter

In [9]:
# Крок 2. Реалізація алгоритму Sliding Window
class SlidingWindowRateLimiter:
    """
    Обмеження частоти повідомлень на користувача в межах ковзного вікна.
    - window_size: довжина вікна у секундах (за замовчуванням 10с)
    - max_requests: скільки повідомлень дозволено у вікні (за замовчуванням 1)
    - history: user_id -> deque з таймстемпами надсилань
    """
    def __init__(self, window_size: int = 10, max_requests: int = 1):
        self.window_size = int(window_size)
        self.max_requests = int(max_requests)
        self.history: Dict[str, Deque[float]] = {}

    def _cleanup_window(self, user_id: str, current_time: float) -> None:
        """
        Видаляє з черги користувача всі події, що вийшли за межі
        [current_time - window_size, current_time].
        Якщо після очищення подій не залишилось — видаляємо ключ користувача.
        """
        dq = self.history.get(user_id)
        if not dq:
            return
        threshold = current_time - self.window_size
        while dq and dq[0] <= threshold:
            dq.popleft()
        if not dq:
            # якщо історія спорожніла — прибираємо запис про користувача
            del self.history[user_id]

    def can_send_message(self, user_id: str) -> bool:
        """
        Перевіряє, чи може користувач відправити повідомлення просто зараз.
        Повертає True, якщо кількість подій у вікні < max_requests.
        """
        now = time.time()
        self._cleanup_window(user_id, now)
        dq = self.history.get(user_id, deque())
        return len(dq) < self.max_requests

    def record_message(self, user_id: str) -> bool:
        """
        Фіксує нове повідомлення, якщо ліміт не перевищено.
        - Якщо можна — додає таймстемп і повертає True.
        - Якщо не можна — нічого не записує і повертає False.
        """
        now = time.time()
        self._cleanup_window(user_id, now)
        dq = self.history.get(user_id)
        if dq is None:
            dq = deque()
            self.history[user_id] = dq

        if len(dq) < self.max_requests:
            dq.append(now)
            return True
        return False

    def time_until_next_allowed(self, user_id: str) -> float:
        """
        Скільки секунд залишилось чекати до наступного дозволеного повідомлення.
        Якщо обмеження не діє — 0.0
        """
        now = time.time()
        self._cleanup_window(user_id, now)
        dq = self.history.get(user_id)
        if not dq:
            return 0.0
        oldest = dq[0]
        wait = (oldest + self.window_size) - now
        return max(0.0, wait)

Крок 3. Демонстраційна функція з прикладу

In [10]:
# Крок 3. Демонстрація з умови (значення часу можуть трохи відрізнятися через випадкові затримки)
def test_rate_limiter():
    limiter = SlidingWindowRateLimiter(window_size=10, max_requests=1)

    print("\n=== Симуляція потоку повідомлень ===")
    for message_id in range(1, 11):
        user_id = message_id % 5 + 1  # користувачі 1..5 по колу
        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))
        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")
        time.sleep(random.uniform(0.1, 1.0))  # невелика випадкова пауза

    print("\nОчікуємо 4 секунди...")
    time.sleep(4)

    print("\n=== Нова серія повідомлень після очікування ===")
    for message_id in range(11, 21):
        user_id = message_id % 5 + 1
        result = limiter.record_message(str(user_id))
        wait_time = limiter.time_until_next_allowed(str(user_id))
        print(f"Повідомлення {message_id:2d} | Користувач {user_id} | "
              f"{'✓' if result else f'× (очікування {wait_time:.1f}с)'}")
        time.sleep(random.uniform(0.1, 1.0))

Крок 4. Міні-перевірки критеріїв прийняття

In [12]:
# Крок 4. Автоматичні перевірки критеріїв
def run_criteria_checks():
    limiter = SlidingWindowRateLimiter(window_size=2, max_requests=1)  # вікно коротше, щоб тести були швидкі
    uid = "u1"

    # 2) Перше повідомлення завжди True
    assert limiter.can_send_message(uid) is True, "Перше can_send_message має бути True"
    assert limiter.record_message(uid) is True, "Перше record_message має бути True"

    # 1) Раніше, ніж через window_size сек — має бути False
    assert limiter.can_send_message(uid) is False, "Повторне can_send_message в межах вікна має бути False"
    assert limiter.record_message(uid) is False, "Повторне record_message в межах вікна має бути False"
    t_wait = limiter.time_until_next_allowed(uid)
    assert t_wait > 0.0, "time_until_next_allowed має показувати позитивний час очікування"

    # Чекаємо, щоб подія вийшла з вікна
    time.sleep(limiter.time_until_next_allowed(uid) + 0.05)

    # 3) Після очищення вікна запис про користувача має зникнути
    # (це перевіримо непрямо: відправимо знову — повинно бути True, і в history створиться нова черга)
    assert limiter.record_message(uid) is True, "Після очікування повідомлення має дозволятись"
    assert uid in limiter.history and len(limiter.history[uid]) == 1, "Історія має містити 1 новий запис"

    # Ще раз дочекаємось очищення та перевіримо, що запис видаляється зі структури
    time.sleep(limiter.time_until_next_allowed(uid) + limiter.window_size + 0.05)
    # викличемо cleanup непрямо
    _ = limiter.can_send_message(uid)
    assert uid not in limiter.history, "Порожній користувач має бути видалений з history"

    print("Усі критерії прийняття пройдені.")

Крок 5. Запуск

In [13]:
# Крок 5. Запуск перевірок та демо
run_criteria_checks()
test_rate_limiter()

Усі критерії прийняття пройдені.

=== Симуляція потоку повідомлень ===
Повідомлення  1 | Користувач 2 | ✓
Повідомлення  2 | Користувач 3 | ✓
Повідомлення  3 | Користувач 4 | ✓
Повідомлення  4 | Користувач 5 | ✓
Повідомлення  5 | Користувач 1 | ✓
Повідомлення  6 | Користувач 2 | × (очікування 7.3с)
Повідомлення  7 | Користувач 3 | × (очікування 7.3с)
Повідомлення  8 | Користувач 4 | × (очікування 7.7с)
Повідомлення  9 | Користувач 5 | × (очікування 7.2с)
Повідомлення 10 | Користувач 1 | × (очікування 6.9с)

Очікуємо 4 секунди...

=== Нова серія повідомлень після очікування ===
Повідомлення 11 | Користувач 2 | × (очікування 0.4с)
Повідомлення 12 | Користувач 3 | × (очікування 0.2с)
Повідомлення 13 | Користувач 4 | × (очікування 0.9с)
Повідомлення 14 | Користувач 5 | × (очікування 0.8с)
Повідомлення 15 | Користувач 1 | × (очікування 1.2с)
Повідомлення 16 | Користувач 2 | ✓
Повідомлення 17 | Користувач 3 | ✓
Повідомлення 18 | Користувач 4 | ✓
Повідомлення 19 | Користувач 5 | ✓
Повідомлення