From e9cf9583ad360b0ad5ce3d43d4931fc85a02624e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Quang=20T=C3=B9ng?= Date: Mon, 8 Jan 2024 16:47:12 +0700 Subject: [PATCH] Using linter pylint (#12) * Add linter pylint * Drop 3.7 support --- .github/workflows/python.yml | 2 +- Makefile | 1 + memproxy/__init__.py | 3 ++ memproxy/item.py | 36 +++++++++++++++++--- memproxy/memproxy.py | 46 ++++++++++++++++++++----- memproxy/proxy/__init__.py | 3 ++ memproxy/proxy/proxy.py | 66 ++++++++++++++++++++++++++++++------ memproxy/proxy/replicated.py | 26 ++++++++++---- memproxy/proxy/route.py | 52 ++++++++++++++++++++++++---- memproxy/proxy/stats.py | 22 ++++++++---- memproxy/redis.py | 46 +++++++++++++++++++------ memproxy/session.py | 15 ++++++-- requirements.txt | 7 ++++ setup.py | 2 +- 14 files changed, 270 insertions(+), 57 deletions(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 1b57eba..25b80c2 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - python-version: [ "3.7", "3.8" ] + python-version: [ "3.8", "3.9" ] services: redis: image: redis:6.0.8 diff --git a/Makefile b/Makefile index f65cef3..3ae3101 100644 --- a/Makefile +++ b/Makefile @@ -3,6 +3,7 @@ all: lint test lint: + pylint memproxy mypy . test: diff --git a/memproxy/__init__.py b/memproxy/__init__.py index 6b6af03..294302a 100644 --- a/memproxy/__init__.py +++ b/memproxy/__init__.py @@ -1,3 +1,6 @@ +""" +A Caching Library that Focuses on Consistency, Performance & High Availability. +""" from .item import Item, new_json_codec, ItemCodec, new_multi_get_filler, FillerFunc from .memproxy import LeaseGetResponse, LeaseSetResponse, DeleteResponse from .memproxy import LeaseGetResult diff --git a/memproxy/item.py b/memproxy/item.py index 624c149..05dd96b 100644 --- a/memproxy/item.py +++ b/memproxy/item.py @@ -1,3 +1,7 @@ +""" +Main package for accessing cache. +Clients should mostly use this package for accessing cached data. +""" from __future__ import annotations import dataclasses @@ -18,25 +22,30 @@ @dataclass class ItemCodec(Generic[T]): + """Item encoder & decoder for data in cache.""" encode: Callable[[T], bytes] decode: Callable[[bytes], T] class DataclassJSONEncoder(json.JSONEncoder): + """Custom JSON Encoder for dataclasses.""" + def default(self, o): + """implement default function of json.Encoder.""" if dataclasses.is_dataclass(o): return dataclasses.asdict(o) return super().default(o) def new_json_codec(cls: Type[T]) -> ItemCodec[T]: + """Creates a simple ItemCodec for dataclasses.""" return ItemCodec( encode=lambda x: json.dumps(x, cls=DataclassJSONEncoder).encode(), decode=lambda d: cls(**json.loads(d)), ) -class _ItemConfig(Generic[T, K]): +class _ItemConfig(Generic[T, K]): # pylint: disable=too-few-public-methods,too-many-instance-attributes __slots__ = ( 'pipe', 'key_fn', 'sess', 'codec', 'filler', 'hit_count', 'fill_count', 'cache_error_count', 'decode_error_count', @@ -121,7 +130,7 @@ def __call__(self) -> None: try: self.result = self.conf.codec.decode(get_resp[1]) return - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught self.conf.decode_error_count += 1 resp_error = f'Decode error. {str(e)}' @@ -136,6 +145,7 @@ def __call__(self) -> None: self._handle_filling() def result_func(self) -> T: + """Execute the session and map the result back to clients.""" if self.conf.sess.is_dirty: self.conf.sess.execute() @@ -144,7 +154,12 @@ def result_func(self) -> T: class Item(Generic[T, K]): - __slots__ = '_conf' + """ + Item object is for accessing cache keys. + Cache key will be filled for DB if cache miss, with intelligent batching. + Also providing stats for better monitoring. + """ + __slots__ = ('_conf',) _conf: _ItemConfig[T, K] @@ -157,6 +172,7 @@ def __init__( self._conf = _ItemConfig(pipe=pipe, key_fn=key_fn, filler=filler, codec=codec) def get(self, key: K) -> Promise[T]: + """Get data from cache key and fill from DB if it missed.""" # do init item state state: _ItemState[T, K] = _ItemState() @@ -171,6 +187,7 @@ def get(self, key: K) -> Promise[T]: return state.result_func def get_multi(self, keys: List[K]) -> Callable[[], List[T]]: + """Get multi cache keys at once. Equivalent to calling get() multiple times.""" conf = self._conf key_fn = conf.key_fn pipe = conf.pipe @@ -201,30 +218,36 @@ def result_func() -> List[T]: return result_func def compute_key_name(self, key: K) -> str: + """Calling the key name function, mostly for testing purpose.""" return self._conf.key_fn(key) @property def hit_count(self) -> int: + """Number of times cache get hit.""" return self._conf.hit_count @property def fill_count(self) -> int: + """Number of times cache get missed and need to fill from DB.""" return self._conf.fill_count @property def cache_error_count(self) -> int: + """Number of times cache servers return errors""" return self._conf.cache_error_count @property def decode_error_count(self) -> int: + """Number of times decode function raises errors.""" return self._conf.decode_error_count @property def bytes_read(self) -> int: + """Number of bytes read from the cache servers.""" return self._conf.bytes_read -class _MultiGetState(Generic[T, K]): +class _MultiGetState(Generic[T, K]): # pylint: disable=too-few-public-methods __slots__ = ('keys', 'result', 'completed') keys: List[K] @@ -237,6 +260,7 @@ def __init__(self): self.result = {} def add_key(self, key: K): + """Add key to the state of multi-get filler.""" self.keys.append(key) @@ -244,7 +268,7 @@ def add_key(self, key: K): GetKeyFunc = Callable[[T], K] # T -> K -class _MultiGetFunc(Generic[T, K]): +class _MultiGetFunc(Generic[T, K]): # pylint: disable=too-few-public-methods __slots__ = '_state', '_fill_func', '_get_key_func', '_default' _state: Optional[_MultiGetState[T, K]] @@ -269,6 +293,7 @@ def _get_state(self) -> _MultiGetState: return self._state def result_func(self, key: K) -> Promise[T]: + """Function that implement the filler function signature.""" state = self._get_state() state.add_key(key) @@ -294,5 +319,6 @@ def new_multi_get_filler( get_key_func: Callable[[T], K], # T -> K default: Callable[[], T], # () -> T ) -> Callable[[K], Promise[T]]: # K -> () -> T + """Helper function for creating Item object with a multi get filler.""" fn = _MultiGetFunc(fill_func=fill_func, key_func=get_key_func, default=default) return fn.result_func diff --git a/memproxy/memproxy.py b/memproxy/memproxy.py index f15d07a..6250943 100644 --- a/memproxy/memproxy.py +++ b/memproxy/memproxy.py @@ -1,7 +1,11 @@ +""" +Basic Data Types of Memproxy. +""" from abc import abstractmethod from dataclasses import dataclass from enum import Enum from typing import Callable, TypeVar, Optional, Tuple + from typing_extensions import Protocol from .session import Session @@ -18,6 +22,7 @@ class LeaseSetStatus(Enum): + """Status when calling Pipeline.lease_set().""" OK = 1 ERROR = 2 NOT_FOUND = 3 # key not found @@ -26,11 +31,13 @@ class LeaseSetStatus(Enum): @dataclass class LeaseSetResponse: + """Response Object when calling Pipeline.lease_set().""" status: LeaseSetStatus error: Optional[str] = None class DeleteStatus(Enum): + """Status when calling Pipeline.delete().""" OK = 1 ERROR = 2 NOT_FOUND = 3 # key not found @@ -38,48 +45,69 @@ class DeleteStatus(Enum): @dataclass class DeleteResponse: + """Response Object when calling Pipeline.delete().""" status: DeleteStatus error: Optional[str] = None +# pylint: disable=too-few-public-methods class LeaseGetResult(Protocol): + """Response Object when calling Pipeline.lease_get().""" + @abstractmethod - def result(self) -> LeaseGetResponse: pass + def result(self) -> LeaseGetResponse: + """When call will return the lease get response object.""" +# pylint: disable=too-few-public-methods class LeaseGetResultFunc: + """Mostly for testing purpose.""" + _fn: Promise[LeaseGetResponse] def __init__(self, fn: Promise[LeaseGetResponse]): self._fn = fn def result(self) -> LeaseGetResponse: + """Return lease get result.""" return self._fn() class Pipeline(Protocol): + """A Cache Pipeline.""" + @abstractmethod - def lease_get(self, key: str) -> LeaseGetResult: pass + def lease_get(self, key: str) -> LeaseGetResult: + """Returns data or a cas (lease id) number when not found.""" @abstractmethod - def lease_set(self, key: str, cas: int, data: bytes) -> Promise[LeaseSetResponse]: pass + def lease_set(self, key: str, cas: int, data: bytes) -> Promise[LeaseSetResponse]: + """Set data for the key when cas number is matched.""" @abstractmethod - def delete(self, key: str) -> Promise[DeleteResponse]: pass + def delete(self, key: str) -> Promise[DeleteResponse]: + """Delete key from cache servers.""" @abstractmethod - def lower_session(self) -> Session: pass + def lower_session(self) -> Session: + """Returns a session with lower priority.""" @abstractmethod - def finish(self) -> None: pass + def finish(self) -> None: + """Do clean up, for example, flush pending operations, e.g. set, delete.""" @abstractmethod - def __enter__(self): pass + def __enter__(self): + """Do clean up but using with.""" @abstractmethod - def __exit__(self, exc_type, exc_val, exc_tb): pass + def __exit__(self, exc_type, exc_val, exc_tb): + """Do clean up but using with.""" class CacheClient(Protocol): + """Cache Client is a class to create Pipeline objects.""" + @abstractmethod - def pipeline(self, sess: Optional[Session] = None) -> Pipeline: pass + def pipeline(self, sess: Optional[Session] = None) -> Pipeline: + """Create a new pipeline, create a new Session if input sess is None.""" diff --git a/memproxy/proxy/__init__.py b/memproxy/proxy/__init__.py index 4456d80..99c23ce 100644 --- a/memproxy/proxy/__init__.py +++ b/memproxy/proxy/__init__.py @@ -1,3 +1,6 @@ +""" +Implementation of Cache Client & Pipeline supporting Cache Replication. +""" from .proxy import ProxyCacheClient from .replicated import ReplicatedRoute, ReplicatedSelector from .route import Route, Selector, Stats diff --git a/memproxy/proxy/proxy.py b/memproxy/proxy/proxy.py index 4a7e8b5..0a59e55 100644 --- a/memproxy/proxy/proxy.py +++ b/memproxy/proxy/proxy.py @@ -1,3 +1,6 @@ +""" +CacheClient & Pipeline implementation as a proxy for multiple cache servers. +""" from typing import Dict, List, Callable, Optional from memproxy import LeaseGetResult @@ -7,7 +10,7 @@ from .route import Selector, Route -class _ClientConfig: +class _ClientConfig: # pylint: disable=too-few-public-methods __slots__ = ('clients', 'route') clients: Dict[int, CacheClient] @@ -18,8 +21,9 @@ def __init__(self, clients: Dict[int, CacheClient], route: Route): self.route = route -class _LeaseSetServer: - __slots__ = 'server_id' +class _LeaseSetServer: # pylint: disable=too-few-public-methods + """Store server id of cache key for lease set.""" + __slots__ = ('server_id',) server_id: Optional[int] @@ -27,8 +31,12 @@ def __init__(self, server_id: int): self.server_id = server_id -class _PipelineConfig: - __slots__ = ('conf', 'sess', 'pipe_sess', 'selector', '_pipelines', '_set_servers', 'pipe', 'server_id') +class _PipelineConfig: # pylint: disable=too-many-instance-attributes + """Config object for pipeline actions.""" + __slots__ = ( + 'conf', 'sess', 'pipe_sess', 'selector', + '_pipelines', '_set_servers', 'pipe', 'server_id', + ) conf: _ClientConfig sess: Session @@ -61,6 +69,7 @@ def __init__(self, conf: _ClientConfig, sess: Optional[Session]): self.server_id = None def get_pipeline(self, server_id: int) -> Pipeline: + """New pipeline object if not already created.""" pipe = self._pipelines.get(server_id) self.pipe = pipe if pipe: @@ -77,6 +86,7 @@ def _get_servers(self) -> Dict[str, _LeaseSetServer]: return self._set_servers def add_set_server(self, key: str, server_id: int): + """Add the server id to the key for lease_set.""" servers = self._get_servers() existing = servers.get(key) if existing and existing.server_id != server_id: @@ -86,6 +96,7 @@ def add_set_server(self, key: str, server_id: int): servers[key] = _LeaseSetServer(server_id=server_id) def get_set_server(self, key: str) -> Optional[int]: + """Find the server id for lease set.""" servers = self._get_servers() state = servers.get(key) @@ -95,13 +106,15 @@ def get_set_server(self, key: str) -> Optional[int]: return state.server_id def execute(self): + """execute pipeline stage.""" self.pipe = None self.server_id = None self.sess.execute() self.selector.reset() def finish(self): - for server_id in self._pipelines: + """finish pipeline stage.""" + for server_id in self._pipelines: # pylint: disable=consider-using-dict-items self._pipelines[server_id].finish() @@ -124,6 +137,8 @@ def _handle_resp(self): self.conf.add_set_server(self.key, self.server_id) def __call__(self) -> None: + """Get next func.""" + self.resp = self.fn.result() if self.resp[0] == 1: @@ -149,6 +164,8 @@ def next_again_func(): self.conf.sess.add_next_call(next_again_func) def result(self) -> LeaseGetResponse: + """Get response func.""" + conf = self.conf if conf.sess.is_dirty: conf.execute() @@ -169,9 +186,11 @@ def __init__(self, conf: _PipelineConfig, fn: Promise[LeaseSetResponse]): self.fn = fn def next_func(self): + """Set next func.""" self.resp = self.fn() def return_func(self) -> LeaseSetResponse: + """Set response func.""" self.conf.execute() return self.resp @@ -186,12 +205,18 @@ class _DeleteState: resp: DeleteResponse - def __init__(self, conf: _PipelineConfig, fn_list: List[Promise[DeleteResponse]], servers: List[int]): + def __init__( + self, + conf: _PipelineConfig, + fn_list: List[Promise[DeleteResponse]], + servers: List[int], + ): self.conf = conf self.fn_list = fn_list self.servers = servers def next_func(self) -> None: + """Delete next func.""" resp = DeleteResponse(status=DeleteStatus.NOT_FOUND) for i, resp_fn in enumerate(self.fn_list): new_resp = resp_fn() @@ -204,12 +229,15 @@ def next_func(self) -> None: self.resp = resp def return_func(self) -> DeleteResponse: + """Delete return func.""" self.conf.execute() return self.resp class ProxyPipeline: - __slots__ = '_conf' + """An Implementation of Pipeline.""" + + __slots__ = ('_conf',) _conf: _PipelineConfig @@ -217,6 +245,8 @@ def __init__(self, conf: _ClientConfig, sess: Optional[Session]): self._conf = _PipelineConfig(conf=conf, sess=sess) def lease_get(self, key: str) -> LeaseGetResult: + """Implement Pipeline.lease_get().""" + state = _LeaseGetState() # do init get state @@ -242,10 +272,15 @@ def lease_get(self, key: str) -> LeaseGetResult: return state def lease_set(self, key: str, cas: int, data: bytes) -> Promise[LeaseSetResponse]: + """Implement Pipeline.lease_set().""" + server_id = self._conf.get_set_server(key) if not server_id: def lease_set_error() -> LeaseSetResponse: - return LeaseSetResponse(status=LeaseSetStatus.ERROR, error='proxy: can not do lease set') + return LeaseSetResponse( + status=LeaseSetStatus.ERROR, + error='proxy: can not do lease set', + ) return lease_set_error @@ -259,6 +294,8 @@ def lease_set_error() -> LeaseSetResponse: return state.return_func def delete(self, key: str) -> Promise[DeleteResponse]: + """Implement Pipeline.delete().""" + servers = self._conf.selector.select_servers_for_delete(key) fn_list: List[Promise[DeleteResponse]] = [] @@ -273,9 +310,11 @@ def delete(self, key: str) -> Promise[DeleteResponse]: return state.return_func def lower_session(self) -> Session: + """get session with lower priority.""" return self._conf.sess.get_lower() def finish(self) -> None: + """finish pending actions in the Pipeline.""" self._conf.finish() def __enter__(self): @@ -285,8 +324,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.finish() +# pylint: disable=too-few-public-methods class ProxyCacheClient: - __slots__ = '_conf' + """An implementation of CacheClient supporting cache replication.""" + + __slots__ = ('_conf',) _conf: _ClientConfig @@ -307,4 +349,8 @@ def __init__( ) def pipeline(self, sess: Optional[Session] = None) -> Pipeline: + """ + :param sess: optional session object, if None will create a new session + :return: Pipeline object that handles cache replication + """ return ProxyPipeline(conf=self._conf, sess=sess) diff --git a/memproxy/proxy/replicated.py b/memproxy/proxy/replicated.py index 62bcf62..72cb79c 100644 --- a/memproxy/proxy/replicated.py +++ b/memproxy/proxy/replicated.py @@ -1,3 +1,6 @@ +""" +Implementation of Route and Selector Protocol. +""" from __future__ import annotations import random @@ -9,6 +12,8 @@ class ReplicatedSelector: + """Implement Selector Protocol that deals with cache replication.""" + __slots__ = '_conf', '_chosen_server', '_failed_servers', '_rand_func' _conf: _RouteConfig @@ -46,10 +51,10 @@ def _compute_chosen_server(self) -> bool: ok = False if all(w < 1.0 for w in weights): - for i in range(len(weights)): + for i in range(len(weights)): # pylint: disable=consider-using-enumerate weights[i] = 1.0 - recompute_weights_with_min_percent(weights, self._conf.min_percent) + _recompute_weights_with_min_percent(weights, self._conf.min_percent) # accumulate for i in range(1, len(weights)): @@ -62,7 +67,7 @@ def _compute_chosen_server(self) -> bool: chosen_weight = max_weight * pos - for i in range(len(weights)): + for i in range(len(weights)): # pylint: disable=consider-using-enumerate if weights[i] > chosen_weight: self._chosen_server = remaining[i] return ok @@ -71,6 +76,7 @@ def _compute_chosen_server(self) -> bool: return ok def set_failed_server(self, server_id: int) -> None: + """Implement the Selector.set_failed_server().""" if server_id in self._failed_servers: return self._failed_servers.add(server_id) @@ -78,6 +84,7 @@ def set_failed_server(self, server_id: int) -> None: self.reset() def select_server(self, _: str) -> Tuple[int, bool]: + """Implement the Selector.select_server().""" if self._chosen_server: return self._chosen_server, True @@ -86,6 +93,7 @@ def select_server(self, _: str) -> Tuple[int, bool]: return self._chosen_server, ok def select_servers_for_delete(self, key: str) -> List[int]: + """Implement the Selector.select_servers_for_delete().""" self.select_server(key) result: List[int] = [] @@ -96,6 +104,7 @@ def select_servers_for_delete(self, key: str) -> List[int]: return result def reset(self) -> None: + """Implement the Selector.reset().""" self._chosen_server = None @@ -105,6 +114,7 @@ def reset(self) -> None: def default_rand_func_factory() -> RandFunc: + """Returns default random with seed using time_ns().""" r = random.Random(time.time_ns()) return r.randrange @@ -117,8 +127,11 @@ class _RouteConfig: min_percent: float +# pylint: disable=too-few-public-methods class ReplicatedRoute: - __slots__ = '_conf' + """An implementation of Route Protocol that deals with replication.""" + + __slots__ = ('_conf',) _conf: _RouteConfig @@ -138,10 +151,11 @@ def __init__( ) def new_selector(self) -> Selector: + """Create a Selector for cache replication.""" return ReplicatedSelector(conf=self._conf) -def recompute_weights_with_min_percent(weights: List[float], min_percent: float) -> None: +def _recompute_weights_with_min_percent(weights: List[float], min_percent: float) -> None: total = 0.0 for w in weights: total += w @@ -153,7 +167,7 @@ def recompute_weights_with_min_percent(weights: List[float], min_percent: float) k += 1 new_weight = total / (100.0 - float(k) * min_percent) - for i in range(len(weights)): + for i in range(len(weights)): # pylint: disable=consider-using-enumerate w = weights[i] if w < minimum: weights[i] = new_weight diff --git a/memproxy/proxy/route.py b/memproxy/proxy/route.py index f83a730..6eac3ed 100644 --- a/memproxy/proxy/route.py +++ b/memproxy/proxy/route.py @@ -1,30 +1,68 @@ +""" +Basic Date Types and Declaration of Protocols. +""" from abc import abstractmethod from typing import Tuple, Optional, List + from typing_extensions import Protocol class Stats(Protocol): + """A Protocol for getting memory usage from Redis servers.""" + @abstractmethod - def get_mem_usage(self, server_id: int) -> Optional[float]: pass + def get_mem_usage(self, server_id: int) -> Optional[float]: + """ + :param server_id: server id + :return: memory usage in bytes, return None if server not connected + """ @abstractmethod - def notify_server_failed(self, server_id: int) -> None: pass + def notify_server_failed(self, server_id: int) -> None: + """ + :param server_id: server id + + Notify server for fast detecting failed servers. + """ class Selector(Protocol): + """A Protocol for selecting cache servers to get & delete.""" + @abstractmethod - def set_failed_server(self, server_id: int) -> None: pass + def set_failed_server(self, server_id: int) -> None: + """ + :param server_id: server id + + Set server id to not do get from in the next pipeline stage. + """ @abstractmethod - def select_server(self, key: str) -> Tuple[int, bool]: pass + def select_server(self, key: str) -> Tuple[int, bool]: + """ + :param key: Redis cache key + :return: a tuple includes server id and a boolean value + whether that server id can be connected + """ @abstractmethod - def select_servers_for_delete(self, key: str) -> List[int]: pass + def select_servers_for_delete(self, key: str) -> List[int]: + """ + :param key: Redis cache key + :return: list of server ids for deletion + """ @abstractmethod - def reset(self) -> None: pass + def reset(self) -> None: + """ + reset() is called whenever a pipeline stage is finished. + """ +# pylint: disable=too-few-public-methods class Route(Protocol): + """Object for selecting cache server for getting data.""" + @abstractmethod - def new_selector(self) -> Selector: pass + def new_selector(self) -> Selector: + """Create a new selector, best use on request scope.""" diff --git a/memproxy/proxy/stats.py b/memproxy/proxy/stats.py index ecf1b66..72f31a5 100644 --- a/memproxy/proxy/stats.py +++ b/memproxy/proxy/stats.py @@ -1,3 +1,7 @@ +""" +Implementation of ServerStats. +Get RAM usage of cache servers to do load-balancing. +""" import logging import random import threading @@ -28,27 +32,30 @@ def __init__(self, r: redis.Redis): self.mem = 0 def compute_next_wake_up(self, sleep_min: int, sleep_max: int): + """next wake time point for sleeping.""" rand = random.Random(time.time_ns()) d = rand.randint(sleep_min, sleep_max) self.next_wake_up = time.time() + float(d) def get_mem_usage(self, server_id, mem_logger: MemLogger): + """get RAM usage of cache servers.""" try: usage = self.client.info('memory').get('used_memory') if usage: self.mem = usage mem_logger(server_id, usage) - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught self.mem = None self.error = str(e) - logging.error(f'Server Stats error. {str(e)}') + logging.error('Server Stats error. %s', str(e)) def _empty_logger(_server_id: int, _mem: float): pass -class ServerStats: +class ServerStats: # pylint: disable=too-many-instance-attributes + """ServerStats periodic get RAM of cache servers for load-balancing.""" _servers: List[int] _states: Dict[int, _ServerState] @@ -79,7 +86,7 @@ def __init__( self._finished = threading.Semaphore(value=0) - servers = [server_id for server_id in clients] + servers = list(clients) servers.sort() self._servers = servers @@ -152,22 +159,25 @@ def _run(self) -> None: self._finished.release() return - notify_list = [server_id for server_id in self._notified] + notify_list = list(self._notified) self._notified.clear() self._notify_servers(timeout_servers, True) self._notify_servers(notify_list, False) def get_mem_usage(self, server_id: int) -> Optional[float]: + """Get RAM usage in bytes.""" return self._states[server_id].mem def notify_server_failed(self, server_id: int) -> None: + """Notify a server id has been returning errors.""" with self._mut: self._notified.add(server_id) self._cond.notify() def shutdown(self): + """Shutdown the server stats thread and wait for finishing.""" with self._mut: self._closed = True self._cond.notify() - self._finished.acquire() + self._finished.acquire() # pylint: disable=consider-using-with diff --git a/memproxy/redis.py b/memproxy/redis.py index e373f71..b0057dd 100644 --- a/memproxy/redis.py +++ b/memproxy/redis.py @@ -1,3 +1,6 @@ +""" +Implementation of CacheClient using redis. +""" from __future__ import annotations import random @@ -62,13 +65,20 @@ @dataclass class SetInput: + """Params for setting keys to redis.""" key: str cas: int val: bytes ttl: int -class RedisPipelineState: +class RedisPipelineState: # pylint: disable=too-many-instance-attributes + """ + State between pipeline stages. + A pipeline stage is a duration start with collecting operations, e.g. lease get/set, + then finish by executing it. + """ + __slots__ = ('_pipe', 'completed', 'keys', 'get_result', '_set_inputs', 'set_result', '_delete_keys', 'delete_result', 'redis_error') @@ -97,19 +107,22 @@ def __init__(self, pipe: RedisPipeline): self.redis_error = None def add_set_op(self, key: str, cas: int, val: bytes, ttl: int) -> int: + """Add set key operation.""" index = len(self._set_inputs) self._set_inputs.append(SetInput(key=key, cas=cas, val=val, ttl=ttl)) return index def add_delete_op(self, key: str) -> int: + """Add delete operation.""" index = len(self._delete_keys) self._delete_keys.append(key) return index def execute(self) -> None: + """Execute collected operations.""" try: self._execute_in_try() - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught self.redis_error = str(e) def _execute_in_try(self) -> None: @@ -119,7 +132,7 @@ def _execute_in_try(self) -> None: if len(self._set_inputs) > 0: self._execute_lease_set() - if len(self._delete_keys): + if len(self._delete_keys) > 0: with self._pipe.client.pipeline(transaction=False) as pipe: for key in self._delete_keys: pipe.delete(key) @@ -174,7 +187,7 @@ def _execute_lease_set_inputs(self, inputs: List[SetInput], client) -> List[byte return self._pipe.set_script(keys=keys, args=args, client=client) -class _RedisGetResult: +class _RedisGetResult: # pylint: disable=too-few-public-methods __slots__ = 'pipe', 'state', 'index' pipe: RedisPipeline @@ -182,9 +195,10 @@ class _RedisGetResult: index: int def result(self) -> LeaseGetResponse: + """Implementation of LeaseGetResult protocol.""" state = self.state if not state.completed: - self.pipe.execute(state) + self.pipe.execute(state) # pylint: disable=no-member if state.redis_error is not None: return 3, b'', 0, f'Redis Get: {state.redis_error}' @@ -201,11 +215,13 @@ def result(self) -> LeaseGetResponse: cas = int(num_str) return 2, b'', cas, None - else: - return 1, get_resp, 0, None + return 1, get_resp, 0, None + + +class RedisPipeline: # pylint: disable=too-many-instance-attributes + """A implementation of Pipeline using redis.""" -class RedisPipeline: __slots__ = ('client', 'get_script', 'set_script', '_sess', '_min_ttl', '_max_ttl', 'max_keys_per_batch', '_state', '_rand') @@ -224,7 +240,7 @@ class RedisPipeline: _state: Optional[RedisPipelineState] _rand: Optional[random.Random] - def __init__( + def __init__( # pylint: disable=too-many-arguments self, r: redis.Redis, get_script: Any, set_script: Any, min_ttl: int, max_ttl: int, @@ -251,11 +267,13 @@ def _get_state(self) -> RedisPipelineState: return self._state def execute(self, state: RedisPipelineState): + """Executing the pipeline state.""" if not state.completed: state.execute() self._state = None def lease_get(self, key: str) -> LeaseGetResult: + """Lease get from cache.""" if self._state is None: self._state = RedisPipelineState(self) @@ -273,6 +291,7 @@ def lease_get(self, key: str) -> LeaseGetResult: return result def lease_set(self, key: str, cas: int, data: bytes) -> Promise[LeaseSetResponse]: + """Set data into cache if cas number is matched.""" state = self._get_state() if self._rand is None: @@ -305,6 +324,7 @@ def lease_set_fn() -> LeaseSetResponse: return lease_set_fn def delete(self, key: str) -> Promise[DeleteResponse]: + """Delete cache key.""" state = self._get_state() index = state.add_delete_op(key) @@ -323,20 +343,25 @@ def delete_fn() -> DeleteResponse: return delete_fn def lower_session(self) -> Session: + """Returns the lower priority session""" return self._sess def finish(self) -> None: + """Do clean up.""" if self._state is not None: self.execute(self._state) def __enter__(self): + """Do clean up using with keyword.""" return self def __exit__(self, exc_type, exc_val, exc_tb): + """Do clean up using with keyword.""" self.finish() -class RedisClient: +class RedisClient: # pylint: disable=too-few-public-methods + """An implementation of Cache Client using redis.""" __slots__ = ('_client', '_get_script', '_set_script', '_min_ttl', '_max_ttl', '_max_keys_per_batch') _client: redis.Redis @@ -359,6 +384,7 @@ def __init__( self._max_keys_per_batch = max_keys_per_batch def pipeline(self, sess: Optional[Session] = None) -> Pipeline: + """Creates a new pipeline.""" return RedisPipeline( r=self._client, get_script=self._get_script, diff --git a/memproxy/session.py b/memproxy/session.py index 760504d..26f8db2 100644 --- a/memproxy/session.py +++ b/memproxy/session.py @@ -1,3 +1,6 @@ +""" +Session implementation. +""" from __future__ import annotations from typing import List, Callable, Optional @@ -6,6 +9,8 @@ class Session: + """Session class is for deferring function calls.""" + __slots__ = 'next_calls', '_lower', '_higher', 'is_dirty' next_calls: List[NextCallFunc] @@ -20,6 +25,7 @@ def __init__(self): self.is_dirty = False def add_next_call(self, fn: NextCallFunc) -> None: + """Add delay call to the list of defer funcs.""" self.next_calls.append(fn) if self.is_dirty: @@ -28,9 +34,13 @@ def add_next_call(self, fn: NextCallFunc) -> None: s: Optional[Session] = self while s and not s.is_dirty: s.is_dirty = True - s = s._lower + s = s._lower # pylint: disable=protected-access def execute(self) -> None: + """ + Execute defer funcs. + Those defer functions can itself call the add_next_call() inside of them. + """ if not self.is_dirty: return @@ -47,7 +57,8 @@ def execute(self) -> None: fn() def get_lower(self) -> Session: + """Returns a lower priority session.""" if self._lower is None: self._lower = Session() - self._lower._higher = self + self._lower._higher = self # pylint: disable=protected-access return self._lower diff --git a/requirements.txt b/requirements.txt index 91313e8..27207a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,19 @@ +astroid==3.0.1 cffi==1.15.1 coverage==7.2.7 cryptography==41.0.5 +dill==0.3.7 hiredis==2.2.3 +isort==5.12.0 +mccabe==0.7.0 mypy==1.4.1 mypy-extensions==1.0.0 +platformdirs==4.0.0 pycparser==2.21 +pylint==3.0.2 redis==3.2.0 tomli==2.0.1 +tomlkit==0.12.3 types-pyOpenSSL==23.3.0.0 types-redis==4.6.0.9 typing-extensions==4.7.1 diff --git a/setup.py b/setup.py index 24904b7..209df0a 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ 'memproxy.proxy': ['py.typed'], }, packages=['memproxy', 'memproxy.proxy'], - python_requires=">=3.7", + python_requires=">=3.8", setup_requires=['wheel'], install_requires=[ 'redis>=3.2.0',