Skip to content

Commit

Permalink
Using linter pylint (#12)
Browse files Browse the repository at this point in the history
* Add linter pylint

* Drop 3.7 support
  • Loading branch information
QuangTung97 committed Jan 8, 2024
1 parent fde7ff8 commit e9cf958
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: [ "3.7", "3.8" ]
python-version: [ "3.8", "3.9" ]
services:
redis:
image: redis:6.0.8
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
all: lint test

lint:
pylint memproxy
mypy .

test:
Expand Down
3 changes: 3 additions & 0 deletions memproxy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
A Caching Library that Focuses on Consistency, Performance & High Availability.
"""
from .item import Item, new_json_codec, ItemCodec, new_multi_get_filler, FillerFunc
from .memproxy import LeaseGetResponse, LeaseSetResponse, DeleteResponse
from .memproxy import LeaseGetResult
Expand Down
36 changes: 31 additions & 5 deletions memproxy/item.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
Main package for accessing cache.
Clients should mostly use this package for accessing cached data.
"""
from __future__ import annotations

import dataclasses
Expand All @@ -18,25 +22,30 @@

@dataclass
class ItemCodec(Generic[T]):
"""Item encoder & decoder for data in cache."""
encode: Callable[[T], bytes]
decode: Callable[[bytes], T]


class DataclassJSONEncoder(json.JSONEncoder):
"""Custom JSON Encoder for dataclasses."""

def default(self, o):
"""implement default function of json.Encoder."""
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)


def new_json_codec(cls: Type[T]) -> ItemCodec[T]:
"""Creates a simple ItemCodec for dataclasses."""
return ItemCodec(
encode=lambda x: json.dumps(x, cls=DataclassJSONEncoder).encode(),
decode=lambda d: cls(**json.loads(d)),
)


class _ItemConfig(Generic[T, K]):
class _ItemConfig(Generic[T, K]): # pylint: disable=too-few-public-methods,too-many-instance-attributes
__slots__ = (
'pipe', 'key_fn', 'sess', 'codec', 'filler',
'hit_count', 'fill_count', 'cache_error_count', 'decode_error_count',
Expand Down Expand Up @@ -121,7 +130,7 @@ def __call__(self) -> None:
try:
self.result = self.conf.codec.decode(get_resp[1])
return
except Exception as e:
except Exception as e: # pylint: disable=broad-exception-caught
self.conf.decode_error_count += 1
resp_error = f'Decode error. {str(e)}'

Expand All @@ -136,6 +145,7 @@ def __call__(self) -> None:
self._handle_filling()

def result_func(self) -> T:
"""Execute the session and map the result back to clients."""
if self.conf.sess.is_dirty:
self.conf.sess.execute()

Expand All @@ -144,7 +154,12 @@ def result_func(self) -> T:


class Item(Generic[T, K]):
__slots__ = '_conf'
"""
Item object is for accessing cache keys.
Cache key will be filled for DB if cache miss, with intelligent batching.
Also providing stats for better monitoring.
"""
__slots__ = ('_conf',)

_conf: _ItemConfig[T, K]

Expand All @@ -157,6 +172,7 @@ def __init__(
self._conf = _ItemConfig(pipe=pipe, key_fn=key_fn, filler=filler, codec=codec)

def get(self, key: K) -> Promise[T]:
"""Get data from cache key and fill from DB if it missed."""
# do init item state
state: _ItemState[T, K] = _ItemState()

Expand All @@ -171,6 +187,7 @@ def get(self, key: K) -> Promise[T]:
return state.result_func

def get_multi(self, keys: List[K]) -> Callable[[], List[T]]:
"""Get multi cache keys at once. Equivalent to calling get() multiple times."""
conf = self._conf
key_fn = conf.key_fn
pipe = conf.pipe
Expand Down Expand Up @@ -201,30 +218,36 @@ def result_func() -> List[T]:
return result_func

def compute_key_name(self, key: K) -> str:
"""Calling the key name function, mostly for testing purpose."""
return self._conf.key_fn(key)

@property
def hit_count(self) -> int:
"""Number of times cache get hit."""
return self._conf.hit_count

@property
def fill_count(self) -> int:
"""Number of times cache get missed and need to fill from DB."""
return self._conf.fill_count

@property
def cache_error_count(self) -> int:
"""Number of times cache servers return errors"""
return self._conf.cache_error_count

@property
def decode_error_count(self) -> int:
"""Number of times decode function raises errors."""
return self._conf.decode_error_count

@property
def bytes_read(self) -> int:
"""Number of bytes read from the cache servers."""
return self._conf.bytes_read


class _MultiGetState(Generic[T, K]):
class _MultiGetState(Generic[T, K]): # pylint: disable=too-few-public-methods
__slots__ = ('keys', 'result', 'completed')

keys: List[K]
Expand All @@ -237,14 +260,15 @@ def __init__(self):
self.result = {}

def add_key(self, key: K):
"""Add key to the state of multi-get filler."""
self.keys.append(key)


MultiGetFillFunc = Callable[[List[K]], List[T]] # [K] -> [T]
GetKeyFunc = Callable[[T], K] # T -> K


class _MultiGetFunc(Generic[T, K]):
class _MultiGetFunc(Generic[T, K]): # pylint: disable=too-few-public-methods
__slots__ = '_state', '_fill_func', '_get_key_func', '_default'

_state: Optional[_MultiGetState[T, K]]
Expand All @@ -269,6 +293,7 @@ def _get_state(self) -> _MultiGetState:
return self._state

def result_func(self, key: K) -> Promise[T]:
"""Function that implement the filler function signature."""
state = self._get_state()
state.add_key(key)

Expand All @@ -294,5 +319,6 @@ def new_multi_get_filler(
get_key_func: Callable[[T], K], # T -> K
default: Callable[[], T], # () -> T
) -> Callable[[K], Promise[T]]: # K -> () -> T
"""Helper function for creating Item object with a multi get filler."""
fn = _MultiGetFunc(fill_func=fill_func, key_func=get_key_func, default=default)
return fn.result_func
46 changes: 37 additions & 9 deletions memproxy/memproxy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""
Basic Data Types of Memproxy.
"""
from abc import abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Callable, TypeVar, Optional, Tuple

from typing_extensions import Protocol

from .session import Session
Expand All @@ -18,6 +22,7 @@


class LeaseSetStatus(Enum):
"""Status when calling Pipeline.lease_set()."""
OK = 1
ERROR = 2
NOT_FOUND = 3 # key not found
Expand All @@ -26,60 +31,83 @@ class LeaseSetStatus(Enum):

@dataclass
class LeaseSetResponse:
"""Response Object when calling Pipeline.lease_set()."""
status: LeaseSetStatus
error: Optional[str] = None


class DeleteStatus(Enum):
"""Status when calling Pipeline.delete()."""
OK = 1
ERROR = 2
NOT_FOUND = 3 # key not found


@dataclass
class DeleteResponse:
"""Response Object when calling Pipeline.delete()."""
status: DeleteStatus
error: Optional[str] = None


# pylint: disable=too-few-public-methods
class LeaseGetResult(Protocol):
"""Response Object when calling Pipeline.lease_get()."""

@abstractmethod
def result(self) -> LeaseGetResponse: pass
def result(self) -> LeaseGetResponse:
"""When call will return the lease get response object."""


# pylint: disable=too-few-public-methods
class LeaseGetResultFunc:
"""Mostly for testing purpose."""

_fn: Promise[LeaseGetResponse]

def __init__(self, fn: Promise[LeaseGetResponse]):
self._fn = fn

def result(self) -> LeaseGetResponse:
"""Return lease get result."""
return self._fn()


class Pipeline(Protocol):
"""A Cache Pipeline."""

@abstractmethod
def lease_get(self, key: str) -> LeaseGetResult: pass
def lease_get(self, key: str) -> LeaseGetResult:
"""Returns data or a cas (lease id) number when not found."""

@abstractmethod
def lease_set(self, key: str, cas: int, data: bytes) -> Promise[LeaseSetResponse]: pass
def lease_set(self, key: str, cas: int, data: bytes) -> Promise[LeaseSetResponse]:
"""Set data for the key when cas number is matched."""

@abstractmethod
def delete(self, key: str) -> Promise[DeleteResponse]: pass
def delete(self, key: str) -> Promise[DeleteResponse]:
"""Delete key from cache servers."""

@abstractmethod
def lower_session(self) -> Session: pass
def lower_session(self) -> Session:
"""Returns a session with lower priority."""

@abstractmethod
def finish(self) -> None: pass
def finish(self) -> None:
"""Do clean up, for example, flush pending operations, e.g. set, delete."""

@abstractmethod
def __enter__(self): pass
def __enter__(self):
"""Do clean up but using with."""

@abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb): pass
def __exit__(self, exc_type, exc_val, exc_tb):
"""Do clean up but using with."""


class CacheClient(Protocol):
"""Cache Client is a class to create Pipeline objects."""

@abstractmethod
def pipeline(self, sess: Optional[Session] = None) -> Pipeline: pass
def pipeline(self, sess: Optional[Session] = None) -> Pipeline:
"""Create a new pipeline, create a new Session if input sess is None."""
3 changes: 3 additions & 0 deletions memproxy/proxy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
Implementation of Cache Client & Pipeline supporting Cache Replication.
"""
from .proxy import ProxyCacheClient
from .replicated import ReplicatedRoute, ReplicatedSelector
from .route import Route, Selector, Stats
Expand Down
Loading

0 comments on commit e9cf958

Please sign in to comment.