In [None]:
# імпорт базових модулей
from tqdm import tqdm
import numpy as np

# вимикаємо зайві попередження
import warnings
warnings.filterwarnings("ignore")

# друк всіх результатів в одній комірці а не тільки останнього
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"


In [None]:
# імпорт додаткових модулів
import random
import array
import time
from copy import deepcopy

In [3]:
def make_queries(n, q, hot_pool=30, p_hot=0.95, p_update=0.03):
    """Функція для генерації запитів до масиву."""
    hot = [
        (random.randint(0, n // 2), random.randint(n // 2, n - 1))
        for _ in range(hot_pool)
    ]
    queries = []
    for _ in range(q):
        if random.random() < p_update:  # ~3% запитів — Update
            idx = random.randint(0, n - 1)
            val = random.randint(1, 100)
            queries.append(("Update", idx, val))
        else:  # ~97% — Range
            if random.random() < p_hot:  # 95% — «гарячі» діапазони
                left, right = random.choice(hot)
            else:  # 5% — випадкові діапазони
                left = random.randint(0, n - 1)
                right = random.randint(left, n - 1)
            queries.append(("Range", left, right))
    return queries

# Створення масиву запитів
queries = make_queries(100000, 50000,)
# перевірка створеня запитів
len(queries), queries[:10]

(50000,
 [('Range', 23556, 72533),
  ('Range', 31931, 72603),
  ('Range', 49065, 59476),
  ('Range', 24882, 55050),
  ('Range', 49065, 59476),
  ('Range', 49065, 59476),
  ('Range', 18365, 77213),
  ('Range', 49065, 59476),
  ('Range', 9644, 61246),
  ('Range', 39054, 99662)])

In [4]:
# кастомний клас для реалізації LRU-кешу з конспекту
class Node:
    def __init__(self, key, value):
        self.data = (key, value)
        self.next = None
        self.prev = None


class DoublyLinkedList:
    def __init__(self):
        self.head = None
        self.tail = None

    def push(self, key, value):
        new_node = Node(key, value)
        new_node.next = self.head
        if self.head:
            self.head.prev = new_node
        else:
            self.tail = new_node
        self.head = new_node
        return new_node

    def remove(self, node):
        if node.prev:
            node.prev.next = node.next
        else:
            self.head = node.next
        if node.next:
            node.next.prev = node.prev
        else:
            self.tail = node.prev
        node.prev = None
        node.next = None

    def move_to_front(self, node):
        if node != self.head:
            self.remove(node)
            node.next = self.head
            self.head.prev = node
            self.head = node

    def remove_last(self):
        if self.tail:
            last = self.tail
            self.remove(last)
            return last
        return None


class LRUCache:
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = {}
        self.list = DoublyLinkedList()

    def get(self, key):
        if key in self.cache:
            node = self.cache[key]
            self.list.move_to_front(node)
            return node.data[1]
        return -1

    def put(self, key, value):
        if key in self.cache:
            node = self.cache[key]
            node.data = (key, value)
            self.list.move_to_front(node)
        else:
            if len(self.cache) >= self.capacity:
                last = self.list.remove_last()
                if last:
                    del self.cache[last.data[0]]
            new_node = self.list.push(key, value)
            self.cache[key] = new_node

In [5]:
# клас-наслідувач для реалізації LRU-кешу з видаленням діапазону
class LRUCacheWithRangeDeleting(LRUCache):
    def __init__(self, capacity):
        super().__init__(capacity)

    def delete_range(self, index):
        current = self.list.head
        while current:
            if current.data[0][0] <= index <= current.data[0][1]:
                next_node = current.next
                self.list.remove(current)
                del self.cache[current.data[0]]
                current = next_node
            else:
                current = current.next

In [6]:
def range_sum_no_cache(array, left, right):
    """Функція для обчислення суми елементів масиву в діапазоні [left, right] без використання кешу.   """
    return sum(array[left:right + 1])

In [7]:
def update_no_cache(array, index, value):
    """Функція для оновлення значення елемента масиву без використання кешу."""
    array[index] = value
    return array

In [8]:
def range_sum_with_cache(array, left, right):
    """Функція для обчислення суми елементів масиву в діапазоні [left, right] з використанням кешу."""
    from_cache = arr_cache.get((left, right))
    if from_cache == -1:
        new_value = sum(array[left : right + 1])
        arr_cache.put((left, right), new_value)
        return new_value
    else:
        return from_cache

In [9]:
def update_with_cache(array, index, value):
    """Функція для оновлення значення елемента масиву з видаленням з кешу всіх елементів в діапазон яких входить індекс."""
    arr_cache.delete_range(index)
    array[index] = value
    return array

In [10]:
# створення масиву з випадковими значеннями
arr = array.array("B", np.random.randint(0, 255, size=100000))
# створення кешу
arr_cache = LRUCacheWithRangeDeleting(capacity=1000)

# перевірка створення масиву
arr[:10]

array('B', [186, 83, 87, 228, 129, 179, 135, 12, 5, 148])

In [11]:
# замір часу виконання запитів без кешу

# створення копії масиву для роботи без кешу
no_cache_arr = deepcopy(arr)

nocache_start_time = time.time()

for query in tqdm(queries):
    if query[0] == "Update":
        arr = update_no_cache(no_cache_arr, query[1], query[2])
    elif query[0] == "Range":
        _ = range_sum_no_cache(no_cache_arr, query[1], query[2])

nocache_end_time = time.time()
# print("No cache time:", nocache_end_time - nocache_start_time)

100%|██████████| 50000/50000 [00:12<00:00, 4151.02it/s]


In [12]:
# замір часу виконання запитів з кешем

# створення копії масиву для роботи з кешем
cache_arr = deepcopy(arr)

cache_start_time = time.time()

for query in tqdm(queries):
    if query[0] == "Update":
        arr = update_with_cache(cache_arr, query[1], query[2])
    elif query[0] == "Range":
        _ = range_sum_with_cache(cache_arr, query[1], query[2])

cache_end_time = time.time()
# print("Cache time:", cache_end_time - cache_start_time)

100%|██████████| 50000/50000 [00:04<00:00, 11658.15it/s]


In [13]:
# вивід результатів замірів часу

print(f"Час обробки {len(queries)} запитів складає:")
print(f"Без кешу    : {nocache_end_time - nocache_start_time:.2f} секунд")
print(
    f"З LRU-кешем : {cache_end_time - cache_start_time:.2f} секунд (прискорення x{((nocache_end_time - nocache_start_time) / (cache_end_time - cache_start_time)):.1f})"
)

Час обробки 50000 запитів складає:
Без кешу    : 12.05 секунд
З LRU-кешем : 4.29 секунд (прискорення x2.8)
