Skip to content

Commit

Permalink
fix: lock on wrong redis params
Browse files Browse the repository at this point in the history
  • Loading branch information
Dima Kryukov committed Nov 6, 2022
1 parent 96648bc commit 4429f01
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 22 deletions.
8 changes: 4 additions & 4 deletions cashews/backend_settings.py
Expand Up @@ -3,7 +3,7 @@

from .backends.interface import Backend
from .backends.memory import Memory
from .exceptions import BackendNotAvailable
from .exceptions import BackendNotAvailableError

try:
from .backends.client_side import BcastClientSide
Expand Down Expand Up @@ -50,22 +50,22 @@ def _redis_fabric(**params: Any) -> Union[Redis, BcastClientSide]:
def settings_url_parse(url):
parse_result = urlparse(url)
params = dict(parse_qsl(parse_result.query))
params = _fix_params_types(params)
params = serialize_params(params)

alias = parse_result.scheme
if alias == "":
return Memory, {"disable": True}

if alias not in _BACKENDS:
error = _CUSTOM_ERRORS.get(alias, f"wrong backend alias {alias}")
raise BackendNotAvailable(error)
raise BackendNotAvailableError(error)
backend_class, pass_uri = _BACKENDS[alias]
if pass_uri:
params["address"] = url.split("?")[0]
return backend_class, params


def _fix_params_types(params: Dict[str, str]) -> Dict[str, Union[str, int, bool, float]]:
def serialize_params(params: Dict[str, str]) -> Dict[str, Union[str, int, bool, float]]:
new_params = {}
bool_keys = ("safe", "enable", "disable", "client_side")
true_values = (
Expand Down
22 changes: 15 additions & 7 deletions cashews/backends/client_side.py
Expand Up @@ -57,7 +57,7 @@ def __init__(self, *args: Any, local_cache=None, client_side_prefix: str = _DEFA
self._local_cache = Memory() if local_cache is None else local_cache
self._prefix = client_side_prefix
self._recently_update = Memory(size=500, check_interval=5)
self.__listen_task = None
self._listen_task = None
self._listen_started = None
super().__init__(*args, **kwargs)

Expand All @@ -66,8 +66,12 @@ async def init(self):
await self._local_cache.init()
await self._recently_update.init()
await super().init()
self.__listen_task = asyncio.create_task(self._listen_invalidate_forever())
await self._listen_started.wait()
self.__is_init = False
self._listen_task = asyncio.create_task(self._listen_invalidate_forever())
await asyncio.wait([self._listen_started.wait()], timeout=2)
if self._listen_task.done():
raise self._listen_task.exception()
self.__is_init = True

async def _mark_as_recently_updated(self, key: str):
await self._recently_update.set(key, True, expire=5)
Expand All @@ -84,9 +88,13 @@ async def _listen_invalidate_forever(self):
await self._listen_invalidate()
except (RedisConnectionError, ConnectionRefusedError):
logger.error("broken connection with redis. Clearing client side storage")
self._listen_started = asyncio.Event()
self._listen_started.clear()
await self._local_cache.clear()
await asyncio.sleep(_RECONNECT_WAIT)
except Exception:
self._listen_started.clear()
await self._local_cache.clear()
raise

async def _get_channel(self):
pubsub = self._client.pubsub()
Expand Down Expand Up @@ -238,9 +246,9 @@ async def clear(self):
return await super().clear()

def close(self):
if self.__listen_task is not None:
self.__listen_task.cancel()
self.__listen_task = None
if self._listen_task is not None:
self._listen_task.cancel()
self._listen_task = None
self._local_cache.close()
self._recently_update.close()
super().close()
6 changes: 5 additions & 1 deletion cashews/exceptions.py
Expand Up @@ -2,10 +2,14 @@ class CacheError(Exception):
pass


class BackendNotAvailable(CacheError):
class BackendNotAvailableError(CacheError):
"""For wrong or not available cache alias"""


class NotConfiguredError(CacheError):
"""If cache was not configured"""


class UnsupportedPicklerError(CacheError):
"""Unknown or unsupported pickle type."""

Expand Down
6 changes: 5 additions & 1 deletion cashews/wrapper.py
@@ -1,6 +1,6 @@
import asyncio
from contextlib import contextmanager
from functools import partial, wraps
from functools import lru_cache, partial, wraps
from typing import Any, AsyncIterator, Callable, Dict, Iterable, Mapping, Optional, Tuple, Type, Union

from . import decorators, validation
Expand All @@ -10,6 +10,7 @@
from .cache_condition import create_time_condition, get_cache_condition
from .commands import Command
from .disable_control import _is_disable_middleware
from .exceptions import NotConfiguredError
from .ttl import ttl_to_seconds

try:
Expand Down Expand Up @@ -85,10 +86,13 @@ def is_disable(self, *cmds: Command, prefix: str = ""):
def is_enable(self, *cmds: Command, prefix: str = ""):
return not self.is_disable(*cmds, prefix=prefix)

@lru_cache(maxsize=1000)
def _get_backend_and_config(self, key: str) -> Tuple[Backend, Tuple[Callable]]:
for prefix in sorted(self._backends.keys(), reverse=True):
if key.startswith(prefix):
return self._backends[prefix]
if self.default_prefix not in self._backends:
raise NotConfiguredError("run `cache.setup(...)` before using cache")
return self._backends[self.default_prefix]

def _get_backend(self, key: str) -> Backend:
Expand Down
39 changes: 32 additions & 7 deletions examples/bench_app.py
@@ -1,18 +1,15 @@
import asyncio
import random
import string
import time
from datetime import timedelta

from fastapi import FastAPI

from cashews import Cache, LockedError, RateLimitError, cache, context_cache_detect, mem, utils # noqa: F401
from cashews import Cache, Command, LockedError, RateLimitError, cache, context_cache_detect, mem, utils # noqa: F401

app = FastAPI()
cache.setup("mem://")
# cache.setup("disk://")
# cache.setup("redis://?safe=True&maxsize=20&create_connection_timeout=0.01")
# cache.setup("redis://?safe=True&maxsize=20&create_connection_timeout=0.01", client_side=True)
# cache.setup("redis://?safe=True&maxsize=20&create_connection_timeout=0.01", client_side=True, local_cache=Cache("disk").setup("disk://")) # noqa: 139
cache.setup("redis://", client_side=True)


@app.get("/")
Expand All @@ -30,12 +27,40 @@ async def early():


@app.get("/hit")
@cache.hit(ttl=timedelta(minutes=1), cache_hits=100, update_before=80)
@cache.hit(ttl=timedelta(minutes=1), cache_hits=100, update_after=80)
async def hit():
await asyncio.sleep(1)
return "".join([random.choice(string.ascii_letters) for _ in range(10)])


@app.middleware("http")
async def add_process_time_header(request, call_next):
start_time = time.perf_counter()
response = await call_next(request)
process_time = time.perf_counter() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response


@app.middleware("http")
async def add_from_cache_headers(request, call_next):
with cache.detect as detector:
response = await call_next(request)
if request.method.lower() != "get":
return response
if detector.calls:
response.headers["X-From-Cache-keys"] = ";".join(detector.calls.keys())
return response


@app.middleware("http")
async def disable_middleware(request, call_next):
if request.headers.get("X-No-Cache"):
with cache.disabling(Command.GET):
return await call_next(request)
return await call_next(request)


if __name__ == "__main__":
import uvicorn

Expand Down
2 changes: 2 additions & 0 deletions tests/test_disable_control.py
Expand Up @@ -38,6 +38,8 @@ async def test_init_disable(cache):
cache.setup("mem://localhost?disable=1")
assert cache.is_disable()


async def test_init_enable(cache):
cache.setup("mem://localhost?enable=1")
assert cache.is_enable()
assert not cache.is_disable()
Expand Down
4 changes: 2 additions & 2 deletions tests/test_settings_url.py
@@ -1,6 +1,6 @@
import pytest

from cashews.backend_settings import BackendNotAvailable, settings_url_parse
from cashews.backend_settings import BackendNotAvailableError, settings_url_parse
from cashews.backends.memory import Memory


Expand Down Expand Up @@ -32,7 +32,7 @@ def test_url(url, params):
),
)
def test_url_but_backend_dependency_is_not_installed(url, error):
with pytest.raises(BackendNotAvailable) as excinfo:
with pytest.raises(BackendNotAvailableError) as excinfo:
settings_url_parse(url)

assert str(excinfo.value) == error
Expand Down

0 comments on commit 4429f01

Please sign in to comment.