Skip to content

Commit

Permalink
Make library thread safe (#8)
Browse files Browse the repository at this point in the history
* Make this library thread safe
  • Loading branch information
QuangTung97 committed Dec 1, 2023
1 parent 851bb16 commit 934bd9e
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 49 deletions.
13 changes: 1 addition & 12 deletions memproxy/proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,9 @@ def result(self) -> LeaseGetResponse:
conf.execute()

resp = self.resp

if len(_P) < 4096:
_P.append(self)

return resp


get_state_pool: List[_LeaseGetState] = []
_P = get_state_pool


class _LeaseSetState:
__slots__ = 'conf', 'fn', 'resp'

Expand Down Expand Up @@ -225,10 +217,7 @@ def __init__(self, conf: _ClientConfig, sess: Optional[Session]):
self._conf = _PipelineConfig(conf=conf, sess=sess)

def lease_get(self, key: str) -> LeaseGetResult:
if len(_P) == 0:
state = _LeaseGetState()
else:
state = _P.pop()
state = _LeaseGetState()

# do init get state
conf = self._conf
Expand Down
17 changes: 13 additions & 4 deletions memproxy/proxy/replicated.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
from __future__ import annotations

import random
import time
from dataclasses import dataclass
from typing import Optional, List, Tuple, Callable, Set

from .route import Selector, Stats


class ReplicatedSelector:
__slots__ = '_conf', '_chosen_server', '_failed_servers'
__slots__ = '_conf', '_chosen_server', '_failed_servers', '_rand_func'

_conf: _RouteConfig

_chosen_server: Optional[int]
_failed_servers: Set[int]
_rand_func: RandFunc

def __init__(self, conf: _RouteConfig):
self._conf = conf
self._chosen_server = None
self._failed_servers = set()
self._rand_func = conf.rand()

def _compute_chosen_server(self) -> bool:
remaining: List[int] = []
Expand Down Expand Up @@ -54,7 +57,7 @@ def _compute_chosen_server(self) -> bool:

max_weight = weights[-1]

val = self._conf.rand(RAND_MAX)
val = self._rand_func(RAND_MAX)
pos = float(val) / float(RAND_MAX)

chosen_weight = max_weight * pos
Expand Down Expand Up @@ -98,13 +101,19 @@ def reset(self) -> None:

RAND_MAX = 1_000_000
RandFunc = Callable[[int], int] # (n) -> int, random from 0 -> n - 1
RandomFactory = Callable[[], RandFunc]


def default_rand_func_factory() -> RandFunc:
r = random.Random(time.time_ns())
return r.randrange


@dataclass
class _RouteConfig:
servers: List[int]
stats: Stats
rand: RandFunc
rand: RandomFactory
min_percent: float


Expand All @@ -115,7 +124,7 @@ class ReplicatedRoute:

def __init__(
self, server_ids: List[int], stats: Stats,
rand: RandFunc = random.randrange,
rand: RandomFactory = default_rand_func_factory,
min_percent: float = 1.0,
):
if len(server_ids) == 0:
Expand Down
3 changes: 2 additions & 1 deletion memproxy/proxy/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def __init__(self, r: redis.Redis):
self.mem = 0

def compute_next_wake_up(self, sleep_min: int, sleep_max: int):
d = random.randint(sleep_min, sleep_max)
rand = random.Random(time.time_ns())
d = rand.randint(sleep_min, sleep_max)
self.next_wake_up = time.time() + float(d)

def get_mem_usage(self, server_id, mem_logger: MemLogger):
Expand Down
33 changes: 10 additions & 23 deletions memproxy/redis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import random
import time
from dataclasses import dataclass
from typing import List, Optional, Union, Any

Expand Down Expand Up @@ -186,15 +187,10 @@ def result(self) -> LeaseGetResponse:
self.pipe.execute(state)

if state.redis_error is not None:
release_get_result(self)
return 3, b'', 0, f'Redis Get: {state.redis_error}'

get_resp = state.get_result[self.index]

# release to pool
if len(_P) < 4096:
_P.append(self)

if get_resp.startswith(b'val:'):
return 1, get_resp[len(b'val:'):], 0, None

Expand All @@ -209,19 +205,10 @@ def result(self) -> LeaseGetResponse:
return 1, get_resp, 0, None


get_result_pool: List[_RedisGetResult] = []
_P = get_result_pool


def release_get_result(r: _RedisGetResult):
if len(_P) >= 4096:
return
_P.append(r)


class RedisPipeline:
__slots__ = ('client', 'get_script', 'set_script', '_sess',
'_min_ttl', '_max_ttl', 'max_keys_per_batch', '_state')
'_min_ttl', '_max_ttl', 'max_keys_per_batch',
'_state', '_rand')

client: redis.Redis
get_script: Any
Expand All @@ -235,6 +222,7 @@ class RedisPipeline:
max_keys_per_batch: int

_state: Optional[RedisPipelineState]
_rand: Optional[random.Random]

def __init__(
self, r: redis.Redis,
Expand All @@ -255,6 +243,7 @@ def __init__(
self.max_keys_per_batch = max_keys_per_batch

self._state = None
self._rand = None

def _get_state(self) -> RedisPipelineState:
if self._state is None:
Expand All @@ -275,12 +264,7 @@ def lease_get(self, key: str) -> LeaseGetResult:
index = len(state.keys)
state.keys.append(key)

# do init get result
if len(_P) == 0:
result = _RedisGetResult()
else:
result = _P.pop()

result = _RedisGetResult()
result.pipe = self
result.state = state
result.index = index
Expand All @@ -291,7 +275,10 @@ def lease_get(self, key: str) -> LeaseGetResult:
def lease_set(self, key: str, cas: int, data: bytes) -> Promise[LeaseSetResponse]:
state = self._get_state()

ttl = random.randrange(self._min_ttl, self._max_ttl + 1)
if self._rand is None:
self._rand = random.Random(time.time_ns())

ttl = self._rand.randrange(self._min_ttl, self._max_ttl + 1)

index = state.add_set_op(key=key, cas=cas, val=data, ttl=ttl)

Expand Down
5 changes: 4 additions & 1 deletion test/proxy/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ def setUp(self) -> None:
23: 100,
}

self.route = ReplicatedRoute(self.server_ids, self.stats, rand=self.rand_func)
self.route = ReplicatedRoute(self.server_ids, self.stats, rand=self.rand_factory)
self.client: CacheClient = ProxyCacheClient(self.server_ids, self.new_func, self.route)

self.pipe = self.client.pipeline()

def rand_factory(self):
return self.rand_func

def rand_func(self, _: int):
return self.rand_val

Expand Down
13 changes: 6 additions & 7 deletions test/proxy/test_proxy_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ def setUp(self) -> None:

stats = ServerStats(clients=clients)

route = ReplicatedRoute(server_ids=server_ids, stats=stats)
r = random.Random(113)

def random_factory():
return r.randrange

route = ReplicatedRoute(server_ids=server_ids, stats=stats, rand=random_factory)

def new_redis_client(server_id: int):
return RedisClient(r=clients[server_id])
Expand Down Expand Up @@ -129,8 +134,6 @@ def key_fn(k: RoleKey) -> str:
)

def test_normal(self) -> None:
random.seed(113)

for i in range(100):
self.reset_pipe()
user_item = self.new_user_item()
Expand Down Expand Up @@ -250,8 +253,6 @@ def test_get_error(self) -> None:
)

def test_get_both_user_and_role(self) -> None:
random.seed(113)

key1 = RoleKey(tenant='TENANT01', code='CODE01')
key2 = RoleKey(tenant='TENANT02', code='CODE02')

Expand Down Expand Up @@ -322,8 +323,6 @@ def test_get_both_user_and_role(self) -> None:
], fn())

def test_get_role(self) -> None:
random.seed(113)

key1 = RoleKey(tenant='TENANT01', code='CODE01')
key2 = RoleKey(tenant='TENANT02', code='CODE02')
key3 = RoleKey(tenant='TENANT01', code='CODE02')
Expand Down
5 changes: 4 additions & 1 deletion test/proxy/test_replicated.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ def setUp(self) -> None:
self.rand_calls = []
self.rand_val = 0

self.route: Route = ReplicatedRoute(self.servers, self.stats, rand=self.rand_func)
self.route: Route = ReplicatedRoute(self.servers, self.stats, rand=self.rand_factory)
self.selector = self.route.new_selector()

def rand_factory(self):
return self.rand_func

def rand_func(self, n: int) -> int:
self.rand_calls.append(n)
return self.rand_val
Expand Down

0 comments on commit 934bd9e

Please sign in to comment.