From bc69ed0ddc5639866e0314c252c8fdb162812bdc Mon Sep 17 00:00:00 2001 From: "tung.tq" Date: Fri, 1 Dec 2023 14:26:04 +0700 Subject: [PATCH 1/4] Make this library thread safe --- memproxy/proxy/proxy.py | 13 +------------ memproxy/redis.py | 25 +++---------------------- 2 files changed, 4 insertions(+), 34 deletions(-) diff --git a/memproxy/proxy/proxy.py b/memproxy/proxy/proxy.py index 26d2b58..4a7e8b5 100644 --- a/memproxy/proxy/proxy.py +++ b/memproxy/proxy/proxy.py @@ -154,17 +154,9 @@ def result(self) -> LeaseGetResponse: conf.execute() resp = self.resp - - if len(_P) < 4096: - _P.append(self) - return resp -get_state_pool: List[_LeaseGetState] = [] -_P = get_state_pool - - class _LeaseSetState: __slots__ = 'conf', 'fn', 'resp' @@ -225,10 +217,7 @@ def __init__(self, conf: _ClientConfig, sess: Optional[Session]): self._conf = _PipelineConfig(conf=conf, sess=sess) def lease_get(self, key: str) -> LeaseGetResult: - if len(_P) == 0: - state = _LeaseGetState() - else: - state = _P.pop() + state = _LeaseGetState() # do init get state conf = self._conf diff --git a/memproxy/redis.py b/memproxy/redis.py index 87a120e..a043d82 100644 --- a/memproxy/redis.py +++ b/memproxy/redis.py @@ -186,15 +186,10 @@ def result(self) -> LeaseGetResponse: self.pipe.execute(state) if state.redis_error is not None: - release_get_result(self) return 3, b'', 0, f'Redis Get: {state.redis_error}' get_resp = state.get_result[self.index] - # release to pool - if len(_P) < 4096: - _P.append(self) - if get_resp.startswith(b'val:'): return 1, get_resp[len(b'val:'):], 0, None @@ -209,19 +204,10 @@ def result(self) -> LeaseGetResponse: return 1, get_resp, 0, None -get_result_pool: List[_RedisGetResult] = [] -_P = get_result_pool - - -def release_get_result(r: _RedisGetResult): - if len(_P) >= 4096: - return - _P.append(r) - - class RedisPipeline: __slots__ = ('client', 'get_script', 'set_script', '_sess', - '_min_ttl', '_max_ttl', 'max_keys_per_batch', '_state') + '_min_ttl', '_max_ttl', 'max_keys_per_batch', + '_state') client: redis.Redis get_script: Any @@ -275,12 +261,7 @@ def lease_get(self, key: str) -> LeaseGetResult: index = len(state.keys) state.keys.append(key) - # do init get result - if len(_P) == 0: - result = _RedisGetResult() - else: - result = _P.pop() - + result = _RedisGetResult() result.pipe = self result.state = state result.index = index From 0b57512b211dcfe1f57a9f8d7584a5558ca921ec Mon Sep 17 00:00:00 2001 From: "tung.tq" Date: Fri, 1 Dec 2023 15:15:28 +0700 Subject: [PATCH 2/4] Non thread safe random --- memproxy/proxy/replicated.py | 17 +++++++++++++---- memproxy/proxy/stats.py | 3 ++- memproxy/redis.py | 10 ++++++++-- test/proxy/test_proxy.py | 5 ++++- test/proxy/test_replicated.py | 5 ++++- 5 files changed, 31 insertions(+), 9 deletions(-) diff --git a/memproxy/proxy/replicated.py b/memproxy/proxy/replicated.py index 77ee97c..62bcf62 100644 --- a/memproxy/proxy/replicated.py +++ b/memproxy/proxy/replicated.py @@ -1,6 +1,7 @@ from __future__ import annotations import random +import time from dataclasses import dataclass from typing import Optional, List, Tuple, Callable, Set @@ -8,17 +9,19 @@ class ReplicatedSelector: - __slots__ = '_conf', '_chosen_server', '_failed_servers' + __slots__ = '_conf', '_chosen_server', '_failed_servers', '_rand_func' _conf: _RouteConfig _chosen_server: Optional[int] _failed_servers: Set[int] + _rand_func: RandFunc def __init__(self, conf: _RouteConfig): self._conf = conf self._chosen_server = None self._failed_servers = set() + self._rand_func = conf.rand() def _compute_chosen_server(self) -> bool: remaining: List[int] = [] @@ -54,7 +57,7 @@ def _compute_chosen_server(self) -> bool: max_weight = weights[-1] - val = self._conf.rand(RAND_MAX) + val = self._rand_func(RAND_MAX) pos = float(val) / float(RAND_MAX) chosen_weight = max_weight * pos @@ -98,13 +101,19 @@ def reset(self) -> None: RAND_MAX = 1_000_000 RandFunc = Callable[[int], int] # (n) -> int, random from 0 -> n - 1 +RandomFactory = Callable[[], RandFunc] + + +def default_rand_func_factory() -> RandFunc: + r = random.Random(time.time_ns()) + return r.randrange @dataclass class _RouteConfig: servers: List[int] stats: Stats - rand: RandFunc + rand: RandomFactory min_percent: float @@ -115,7 +124,7 @@ class ReplicatedRoute: def __init__( self, server_ids: List[int], stats: Stats, - rand: RandFunc = random.randrange, + rand: RandomFactory = default_rand_func_factory, min_percent: float = 1.0, ): if len(server_ids) == 0: diff --git a/memproxy/proxy/stats.py b/memproxy/proxy/stats.py index e8923e2..ecf1b66 100644 --- a/memproxy/proxy/stats.py +++ b/memproxy/proxy/stats.py @@ -28,7 +28,8 @@ def __init__(self, r: redis.Redis): self.mem = 0 def compute_next_wake_up(self, sleep_min: int, sleep_max: int): - d = random.randint(sleep_min, sleep_max) + rand = random.Random(time.time_ns()) + d = rand.randint(sleep_min, sleep_max) self.next_wake_up = time.time() + float(d) def get_mem_usage(self, server_id, mem_logger: MemLogger): diff --git a/memproxy/redis.py b/memproxy/redis.py index a043d82..e373f71 100644 --- a/memproxy/redis.py +++ b/memproxy/redis.py @@ -1,6 +1,7 @@ from __future__ import annotations import random +import time from dataclasses import dataclass from typing import List, Optional, Union, Any @@ -207,7 +208,7 @@ def result(self) -> LeaseGetResponse: class RedisPipeline: __slots__ = ('client', 'get_script', 'set_script', '_sess', '_min_ttl', '_max_ttl', 'max_keys_per_batch', - '_state') + '_state', '_rand') client: redis.Redis get_script: Any @@ -221,6 +222,7 @@ class RedisPipeline: max_keys_per_batch: int _state: Optional[RedisPipelineState] + _rand: Optional[random.Random] def __init__( self, r: redis.Redis, @@ -241,6 +243,7 @@ def __init__( self.max_keys_per_batch = max_keys_per_batch self._state = None + self._rand = None def _get_state(self) -> RedisPipelineState: if self._state is None: @@ -272,7 +275,10 @@ def lease_get(self, key: str) -> LeaseGetResult: def lease_set(self, key: str, cas: int, data: bytes) -> Promise[LeaseSetResponse]: state = self._get_state() - ttl = random.randrange(self._min_ttl, self._max_ttl + 1) + if self._rand is None: + self._rand = random.Random(time.time_ns()) + + ttl = self._rand.randrange(self._min_ttl, self._max_ttl + 1) index = state.add_set_op(key=key, cas=cas, val=data, ttl=ttl) diff --git a/test/proxy/test_proxy.py b/test/proxy/test_proxy.py index 95a64fb..0756764 100644 --- a/test/proxy/test_proxy.py +++ b/test/proxy/test_proxy.py @@ -36,11 +36,14 @@ def setUp(self) -> None: 23: 100, } - self.route = ReplicatedRoute(self.server_ids, self.stats, rand=self.rand_func) + self.route = ReplicatedRoute(self.server_ids, self.stats, rand=self.rand_factory) self.client: CacheClient = ProxyCacheClient(self.server_ids, self.new_func, self.route) self.pipe = self.client.pipeline() + def rand_factory(self): + return self.rand_func + def rand_func(self, _: int): return self.rand_val diff --git a/test/proxy/test_replicated.py b/test/proxy/test_replicated.py index 6249eee..dbbcc72 100644 --- a/test/proxy/test_replicated.py +++ b/test/proxy/test_replicated.py @@ -17,9 +17,12 @@ def setUp(self) -> None: self.rand_calls = [] self.rand_val = 0 - self.route: Route = ReplicatedRoute(self.servers, self.stats, rand=self.rand_func) + self.route: Route = ReplicatedRoute(self.servers, self.stats, rand=self.rand_factory) self.selector = self.route.new_selector() + def rand_factory(self): + return self.rand_func + def rand_func(self, n: int) -> int: self.rand_calls.append(n) return self.rand_val From 23ba34590da2cad6b621ffc263db13dacdb3dd0c Mon Sep 17 00:00:00 2001 From: "tung.tq" Date: Fri, 1 Dec 2023 15:55:29 +0700 Subject: [PATCH 3/4] Fix Test Seed --- test/proxy/test_proxy_property.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test/proxy/test_proxy_property.py b/test/proxy/test_proxy_property.py index be56ce5..0633e07 100644 --- a/test/proxy/test_proxy_property.py +++ b/test/proxy/test_proxy_property.py @@ -59,7 +59,11 @@ def setUp(self) -> None: stats = ServerStats(clients=clients) - route = ReplicatedRoute(server_ids=server_ids, stats=stats) + def random_factory(): + r = random.Random(113) + return r.randrange + + route = ReplicatedRoute(server_ids=server_ids, stats=stats, rand=random_factory) def new_redis_client(server_id: int): return RedisClient(r=clients[server_id]) @@ -129,8 +133,6 @@ def key_fn(k: RoleKey) -> str: ) def test_normal(self) -> None: - random.seed(113) - for i in range(100): self.reset_pipe() user_item = self.new_user_item() @@ -250,8 +252,6 @@ def test_get_error(self) -> None: ) def test_get_both_user_and_role(self) -> None: - random.seed(113) - key1 = RoleKey(tenant='TENANT01', code='CODE01') key2 = RoleKey(tenant='TENANT02', code='CODE02') @@ -322,8 +322,6 @@ def test_get_both_user_and_role(self) -> None: ], fn()) def test_get_role(self) -> None: - random.seed(113) - key1 = RoleKey(tenant='TENANT01', code='CODE01') key2 = RoleKey(tenant='TENANT02', code='CODE02') key3 = RoleKey(tenant='TENANT01', code='CODE02') From d47ecb1133cf02835510cf6a02407da528d70281 Mon Sep 17 00:00:00 2001 From: "tung.tq" Date: Fri, 1 Dec 2023 16:03:29 +0700 Subject: [PATCH 4/4] Fix Test --- test/proxy/test_proxy_property.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/proxy/test_proxy_property.py b/test/proxy/test_proxy_property.py index 0633e07..591c092 100644 --- a/test/proxy/test_proxy_property.py +++ b/test/proxy/test_proxy_property.py @@ -59,8 +59,9 @@ def setUp(self) -> None: stats = ServerStats(clients=clients) + r = random.Random(113) + def random_factory(): - r = random.Random(113) return r.randrange route = ReplicatedRoute(server_ids=server_ids, stats=stats, rand=random_factory)