# Caching. Time based least recently used algorithm

In [1]:
import time
import string
import random
import functools
import sys

from collections import OrderedDict
from cachetools import cached, TTLCache

sys.setrecursionlimit(5000)

## 1. Реализация

In [2]:
class TLRU:
    
    def __init__(self, size=100, time_to_use=60):
        self.cache = OrderedDict()
        self.size = size
        self.time_to_use = time_to_use
        
    def create_value(self , value):
        return {'value': value, 'ts': time.time()}
        
    def store(self, key, value):
        # если размер кэша превышает заданное максимальное значение
        if len(self.cache) >= self.size:
            # удаляем первый элемент в кэше вне зависимоти от его валидности по времени
            self.cache.popitem(last=False)
        # если элемент в кэше присутствует
        if key in self.cache:
            # удаляем его для переноса в конец кэша
            del self.cache[key]
        self.cache[key] = self.create_value(value)
                
    def get(self, key):
        # очистка кэша от невалидных значений
        self.update_cache()
        if key not in self.cache:
            raise KeyError(f'Key {key} is not in the cache.')
        # переносим запрашиваемый элемент из начала в конец кэша
        value = self.cache.pop(key)['value']
        self.cache[key] = self.create_value(value)
        return value
    
    def update_cache(self):
        ts_now = time.time()
        while True:
            try:
                # найдем первый элемент в кэше
                key = next(iter(self.cache))
            # если кэш пуст
            except StopIteration:
                break
            # оценка валидности кэша по времени
            if ts_now - self.cache[key]['ts'] > self.time_to_use:
                # удаляем элемент в случае невалидности
                del self.cache[key]
            else:
                break

## 2. Проверка корректности работы

In [3]:
def get_initial_cache():
    cache = TLRU(size=3, time_to_use=3)
    keys = ['A', 'B', 'C']
    values = [1, 2, 3]
    timeouts = [1, 1, 0]

    for key, value, timeout in zip(keys, values, timeouts):
        cache.store(key, value)
        time.sleep(timeout)
    return cache

### Тест №1

- Корректность работы очищения кэша по таймауту
- 1ый запрос - все элементы в кэше (без обновления)
- 2ой запрос - нет первого элемента
- 3ий запрос - пустой кэш

In [4]:
cache = get_initial_cache()

# первый и последний элемент в кэше
assert cache.cache['A']['value'] == 1
assert cache.cache['C']['value'] == 3

time.sleep(2)

# элемент A удален
try:
    print(cache.get('A'))
except KeyError:
    pass

time.sleep(2)

# элементы B, С удалены
try:
    print(cache.get('B'))
except KeyError:
    pass

# кэш пуст
assert not cache.cache

### Тест №2

- Вытеснение элемента с валидным временем
- Заполним кэш размером 3
- Добавим 4ый элемент

In [5]:
cache = get_initial_cache()
    
# первый элемент в кэше еще можно использовать
assert cache.cache['A']['ts'] - time.time() < 3

# добавим еще один элемент
cache.store('D', 4)

# первый элемент кэша удален
try:
    print(cache.get('A'))
except KeyError:
    pass 

### Тест №3

- Обновление метки времени при запросе элемента
- Заполним кэш размером 3
- Запросим присутствующий элемент

In [6]:
cache = get_initial_cache()

# элемент A первый в кэше
assert list(cache.cache.keys())[0] == 'A'

cache.get('A')

# элемент A последний в кэше
assert list(cache.cache.keys())[-1] == 'A'
# первым стал элемент B
assert list(cache.cache.keys())[0] == 'B'

### Тест №4

- Запись присутствующего в кэше элемента
- Заполним кэш размером 3
- Обновим присутствующий элемент

In [7]:
cache = get_initial_cache()

# элемент A первый в кэше
assert list(cache.cache.keys())[0] == 'A'
assert cache.cache['A']['value'] == 1

cache.store('A', 4)

# элемент A последний в кэше с новым значением
assert list(cache.cache.keys())[-1] == 'A'
assert cache.cache['A']['value'] == 4

# первым стал элемент B
assert list(cache.cache.keys())[0] == 'B'

## 3. Тест эффективности работы в зависимости от размера и таймаута

In [8]:
def get_cache_performance(size, timeout, n):
    cache = TLRU(size=size, time_to_use=timeout)
    hits = 0
    misses = 0
    for i in range(n):
        random_element = random.choice(string.ascii_uppercase)
        operation = random.choice(['s', 'g'])
        if operation == 's':
            cache.store(random_element, random_element)
        else:
            try:
                cache.get(random_element)
                hits += 1
            except KeyError:
                misses += 1
    print('Эффективность кэширования:', round(hits / (hits + misses), 2))

### Тест №1

- 26 типов элементов
- Размер кэша 13
- Таймаут 0.00001 с

In [9]:
SIZE = 13
TIMEOUT = 0.00001
N = 100000

get_cache_performance(SIZE, TIMEOUT, N)

Эффективность кэширования: 0.07


### Тест №2

- 26 типов элементов
- Размер кэша 13
- Таймаут 0.0001 с (увеличено в 10 раз)

In [10]:
SIZE = 13
TIMEOUT = 0.0001
N = 100000

get_cache_performance(SIZE, TIMEOUT, N)

Эффективность кэширования: 0.48


### Тест №3

- 26 типов элементов
- Размер кэша 26 (увеличено в 2 раза)
- Таймаут 0.0001 с

In [11]:
SIZE = 26
TIMEOUT = 0.0001
N = 100000

get_cache_performance(SIZE, TIMEOUT, N)

Эффективность кэширования: 0.66


### Тест №4

- 26 типов элементов
- Размер кэша 26
- Таймаут 0.001 с (увеличено в 10 раз)

In [12]:
SIZE = 26
TIMEOUT = 0.001
N = 100000

get_cache_performance(SIZE, TIMEOUT, N)

Эффективность кэширования: 0.96


## 4. Сравнение с другими реализациями кэширования

### LRU из модуля functools

In [13]:
%%timeit -r 3 -n 3 -o -q

@functools.lru_cache(maxsize=100)
def fib(n):
    if n < 2:
        return n
    return fib(n-1) + fib(n-2)

fib(1000)

<TimeitResult : 424 µs ± 87.9 µs per loop (mean ± std. dev. of 3 runs, 3 loops each)>

### TTLCache из модуля cachetools

In [14]:
%%timeit -r 3 -n 3 -o -q

cache = TTLCache(maxsize=100, ttl=1)

@cached(cache)
def fib(n):
    if n < 2:
        return n
    return fib(n-1) + fib(n-2)

fib(1000)

<TimeitResult : 15.7 ms ± 852 µs per loop (mean ± std. dev. of 3 runs, 3 loops each)>

### Собственная реализация

In [15]:
%%timeit -r 3 -n 3 -o -q

cache = TLRU(size=100, time_to_use=1)

def fib(n):
    if n < 2:
        return n
    try:
        return cache.get(n-1) + cache.get(n-2)
    except:
        a = fib(n-1)
        b = fib(n-2)
        cache.store(n-1, a)
        cache.store(n-2, b)
        return a + b
    
fib(1000)

<TimeitResult : 8.79 ms ± 1.19 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)>