Skip to content

Commit

Permalink
add bounded cache (beta)
Browse files Browse the repository at this point in the history
  • Loading branch information
VermiIIi0n committed Apr 7, 2023
1 parent 66e7f11 commit c8da054
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 141 deletions.
176 changes: 173 additions & 3 deletions asynctinydb/modifier.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""Modifier class for TinyDB."""

from __future__ import annotations
from typing import Any, Callable, TypeVar, overload
import time
from typing import Any, Callable, Mapping, Sequence, TypeVar, overload
import datetime as dt
import cachetools
from warnings import warn
from functools import partial
from .storages import Storage, StorageWithWriteReadPrePostHooks
from cachetools import Cache
from vermils.asynctools import async_run
from vermils.collections.fridge import FrozenDict
from vermils.gadgets import sort_class
from .storages import Storage, StorageWithWriteReadPrePostHooks
from .database import TinyDB
from .table import Table, IncreID, Document, BaseDocument
from .table import BaseID, Table, IncreID, Document, BaseDocument


T = TypeVar("T", bound=Table)
Expand Down Expand Up @@ -417,3 +420,170 @@ def access_time(_: str, tab: Table, doc: BaseDocument):
doc[fields["access"]] = get_time()

return tab

class Caching:
"""
## Bounded Caching
**WARNING: This is NOT CachingMiddleware, it will PURGE data in the database
that expires.
If you want to cache your data for performance, use `CachingMiddleware` instead.**
** Note modifiers in this class are still UNDER DEVELOPMENT, current implementation
is flawed and only works well if you access the data using `doc_id`. Conditional
searching/updating will mess up with the cache order, you may purge incorrect data.**
This class provides modifiers that turn TinyDB instances into cache system
with a bounded size.
You can choose algorithms such as `LRUCache`, `LFUCache`...
Or even implement your own.
"""

@staticmethod
def _add_cache(
tab: Table | TinyDB,
cacheT: type[Cache],
maxsize: int,
getsizeof: Callable[[Any], float] | None,
**kw):

if isinstance(tab, TinyDB):
tab = tab.default_table
tab._cook
if tab.no_dbcache:
raise ValueError("Modifier relies on db-level cache")

def _cook(raw: Mapping[Any, Mapping]
) -> Cache[BaseID, BaseDocument]:
nonlocal tab
doc_cls = tab.document_class
id_cls = tab.document_id_class
cache = cacheT(
maxsize=maxsize,
getsizeof=getsizeof,
**kw)
for rid, rdoc in raw.items():
doc_id = id_cls(rid)
doc = doc_cls(rdoc, doc_id)
cache[doc_id] = doc
return cache

tab._cook = _cook # type: ignore[method-assign]

@classmethod
def LRUCache(
cls,
tab: Table | TinyDB,
maxsize: int,
getsizeof: Callable[[Any], float] = None,
) -> Table[BaseID, BaseDocument]:
"""
### LRUCache
Least Recently Used Cache
"""
cls._add_cache(tab, cachetools.LRUCache, maxsize, getsizeof)
return tab

@classmethod
def LFUCache(
cls,
tab: Table | TinyDB,
maxsize: int,
getsizeof: Callable[[Any], float] = None,
) -> Table[BaseID, BaseDocument]:
"""
### LFUCache
Least Frequently Used Cache
"""
cls._add_cache(tab, cachetools.LFUCache, maxsize, getsizeof)
return tab

@classmethod
def RRCache(
cls,
tab: Table | TinyDB,
maxsize: int,
getsizeof: Callable[[Any], float] = None,
choice: Callable[[Sequence], Any] = None,
) -> Table[BaseID, BaseDocument]:
"""
### RRCache
Random Replacement Cache
"""
cls._add_cache(tab, cachetools.RRCache, maxsize, getsizeof,
choice=choice)
return tab

@classmethod
def TTLCache(
cls,
tab: Table | TinyDB,
maxsize: int,
ttl: float,
getsizeof: Callable[[Any], float] = None,
timer: Callable[[], float] = time.monotonic,
) -> Table[BaseID, BaseDocument]:
"""
### TTLCache
Time To Live Cache
"""
cls._add_cache(tab, cachetools.TTLCache, maxsize, getsizeof,
ttl=ttl, timer=timer)
return tab

@classmethod
def TLRUCache(
cls,
tab: Table | TinyDB,
maxsize: int,
ttu: Callable[[Any, Any, float], float],
getsizeof: Callable[[Any], float] = None,
timer: Callable[[], float] = time.monotonic,
) -> Table[BaseID, BaseDocument]:
"""
### TLRUCache
Time To Live Least Recently Used Cache
```
from datetime import datetime, timedelta
def my_ttu(_key, value, now):
# assume value.ttl contains the item's time-to-live in hours
return now + timedelta(hours=value.ttl)
cache = TLRUCache(maxsize=10, ttu=my_ttu, timer=datetime.now)
```
"""
cls._add_cache(tab, cachetools.TLRUCache, maxsize, getsizeof,
ttu=ttu, timer=timer)
return tab

@classmethod
def FIFOCache(
cls,
tab: Table | TinyDB,
maxsize: int,
getsizeof: Callable[[Any], float] = None,
) -> Table[BaseID, BaseDocument]:
"""
### FIFOCache
First In First Out Cache
"""
cls._add_cache(tab, cachetools.FIFOCache, maxsize, getsizeof)
return tab

@classmethod
def MRUCache(
cls,
tab: Table | TinyDB,
maxsize: int,
getsizeof: Callable[[Any], float] = None,
) -> Table[BaseID, BaseDocument]:
"""
### MRUCache
Most Recently Used Cache
"""
cls._add_cache(tab, cachetools.MRUCache, maxsize, getsizeof)
return tab
75 changes: 35 additions & 40 deletions asynctinydb/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def clear_cache(cls, table: Table):

class UUID(uuid.UUID, BaseID):
"""ID class using uuid4 UUIDs."""
_cache: dict[str, set[uuid.UUID]] = {}

def __init__(self, value: str | uuid.UUID): # skipcq: PYL-W0231
super().__init__(str(value))
Expand All @@ -122,24 +121,15 @@ def __hash__(self):

@classmethod
def next_id(cls, table: Table, keys: Collection[UUID]) -> UUID:
if table.name not in cls._cache:
cls._cache[table.name] = set()
while True:
new_id = cls(uuid.uuid4())
if (new_id not in cls._cache[table.name] # pragma: no branch
and new_id not in keys):
cls._cache[table.name].add(new_id)
return new_id
return cls(uuid.uuid4())

@classmethod
def mark_existed(cls, table: Table, new_id: UUID):
cache = cls._cache.get(table.name, set())
cache.add(new_id)
cls._cache[table.name] = cache
...

@classmethod
def clear_cache(cls, table: Table):
cls._cache.pop(table.name, None)
...


class BaseDocument(MutableMapping[IDVar, Any]):
Expand Down Expand Up @@ -251,7 +241,7 @@ def __init__(
"""Whether to disable the DB-level cache for this table."""
self._storage = storage
self._name = name
self._cache: dict[IDVar, DocVar] = None # type: ignore
self._cache: MutableMapping[IDVar, DocVar] = None # type: ignore[assignment]
"""Cache for documents in this table."""
self._query_cache: LRUCache[QueryLike, tuple[IDVar, ...]] \
= self.query_cache_class(capacity=cache_size)
Expand Down Expand Up @@ -322,7 +312,7 @@ async def insert(self, document: Mapping) -> IDVar:

doc_id: IDVar = None # type: ignore

def updater(table: dict[IDVar, DocVar]):
def updater(table: MutableMapping[IDVar, DocVar]):
# Now, we update the table and add the document
nonlocal doc_id
nonlocal document
Expand Down Expand Up @@ -364,7 +354,7 @@ async def insert_multiple(self, documents: Iterable[Mapping]) -> list[IDVar]:

doc_ids = []

def updater(table: dict[IDVar, DocVar]):
def updater(table: MutableMapping[IDVar, DocVar]):
existing_keys = table.keys()
for document in documents:

Expand Down Expand Up @@ -480,7 +470,7 @@ async def contains(

async def update(
self,
fields: Mapping | Callable[[Mapping], None],
fields: Mapping | Callable[[MutableMapping], None],
cond: QueryLike = None,
doc_ids: Iterable[IDVar] = None,
) -> list[IDVar]:
Expand All @@ -496,12 +486,12 @@ async def update(

# Define the function that will perform the update
if callable(fields):
def perform_update(table: dict[IDVar, DocVar], doc_id: IDVar):
def perform_update(table: MutableMapping[IDVar, DocVar], doc_id: IDVar):
# Update documents by calling the update function provided by
# the user
fields(table[doc_id]) # type: ignore
else:
def perform_update(table: dict[IDVar, DocVar], doc_id: IDVar):
def perform_update(table: MutableMapping[IDVar, DocVar], doc_id: IDVar):
nonlocal fields
if self._isolevel >= 2:
fields = deepcopy(fields)
Expand All @@ -513,7 +503,7 @@ def perform_update(table: dict[IDVar, DocVar], doc_id: IDVar):

updated_ids = []

def updater(table: dict[IDVar, DocVar]):
def updater(table: MutableMapping[IDVar, DocVar]):
# Process all documents
for doc_id in ids:
# Add ID to list of updated documents
Expand Down Expand Up @@ -543,7 +533,7 @@ async def update_multiple(

# Define the function that will perform the update
def perform_update(fields: Callable[[Mapping], None] | Mapping,
table: dict[IDVar, DocVar], doc_id: IDVar):
table: MutableMapping[IDVar, DocVar], doc_id: IDVar):
if callable(fields):
# Update documents by calling the update function provided
# by the user
Expand All @@ -560,7 +550,7 @@ def perform_update(fields: Callable[[Mapping], None] | Mapping,
# Collect affected doc_ids
updated_ids = []

def updater(table: dict[IDVar, DocVar]):
def updater(table: MutableMapping[IDVar, DocVar]):
# We need to convert the keys iterator to a list because
# we may remove entries from the ``table`` dict during
# iteration and doing this without the list conversion would
Expand Down Expand Up @@ -645,7 +635,7 @@ async def remove(
docs = await self.search(cond, doc_ids=doc_ids)
ids = [doc.doc_id for doc in docs]

def rm_updater(table: dict[IDVar, DocVar]):
def rm_updater(table: MutableMapping[IDVar, DocVar]):
for doc_id in ids:
# Other threads may have already removed the document
with suppress(KeyError):
Expand Down Expand Up @@ -752,7 +742,7 @@ def __del__(self):
self._sink.close()

def _search(self, cond: QueryLike | None,
docs: dict[IDVar, DocVar],
docs: MutableMapping[IDVar, DocVar],
limit: int | None,
doc_ids: Iterable[IDVar] | None) -> dict[IDVar, DocVar]:
limit = len(docs) if limit is None else limit
Expand Down Expand Up @@ -807,9 +797,13 @@ def _search(self, cond: QueryLike | None,

# deepcopy if isolation level is >= 2
# otherwise return shallow copy in case of no sieve been applied
# ps: sieves will perform a shallow copy.
if not isinstance(docs, dict):
docs = dict(docs)

return deepcopy(docs) if self._isolevel >= 2 else docs.copy()

async def _read_table(self) -> dict[IDVar, DocVar]:
async def _read_table(self) -> MutableMapping[IDVar, DocVar]:
"""
Read the table data from the underlying storage
if cache is not exist.
Expand All @@ -823,20 +817,20 @@ async def _read_table(self) -> dict[IDVar, DocVar]:
raw = await self._read_raw_table()
cooked: dict[IDVar, DocVar] | None = None

def cook():
nonlocal cooked
doc_cls = self.document_class
id_cls = self.document_id_class
cooked = {
id_cls(doc_id): doc_cls(rdoc, doc_id=id_cls(doc_id))
for doc_id, rdoc in raw.items()
}
if not self.no_dbcache:
# Caching if no_dbcache is not set
self._cache = cooked

await self._run_with_iso(cook)
return self._cache or cooked
cooked = await self._run_with_iso(self._cook, raw)
if not self.no_dbcache:
# Caching if no_dbcache is not set
self._cache = cooked
return cooked

def _cook(self, raw: Mapping[Any, Mapping]
) -> MutableMapping[IDVar, DocVar]:
doc_cls = self.document_class
id_cls = self.document_id_class
return {
id_cls(doc_id): doc_cls(rdoc, doc_id=id_cls(doc_id))
for doc_id, rdoc in raw.items()
}

async def _read_raw_table(self) -> MutableMapping[Any, Mapping]:
"""
Expand All @@ -857,7 +851,8 @@ async def _read_raw_table(self) -> MutableMapping[Any, Mapping]:
# Retrieve the current table's data
return tables.get(self.name, {})

async def _update_table(self, updater: Callable[[dict[IDVar, DocVar]], None]):
async def _update_table(self,
updater: Callable[[MutableMapping[IDVar, DocVar]], None]):
"""
Perform a table update operation.
Expand Down

0 comments on commit c8da054

Please sign in to comment.