Skip to content

Commit

Permalink
Merge pull request #24 from and3rson/v0.8.0
Browse files Browse the repository at this point in the history
add feature in heartbeat_check thread to cleanup replies that remain for longer than timeout duration, it would take from 10-20 loop cycle to clear these remaining message in replies that no handler pick up
  • Loading branch information
0xGosu committed Oct 18, 2020
2 parents 05b3b8c + 0f67d67 commit dc72041
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 2 deletions.
2 changes: 1 addition & 1 deletion django_nameko/VERSION
@@ -1 +1 @@
0.7.3
0.8.0
21 changes: 21 additions & 0 deletions django_nameko/rpc.py
Expand Up @@ -217,6 +217,9 @@ def stop(self):
def heartbeat_check(self):
RATE = 2 + math.log(self.heartbeat, 30) if self.heartbeat > 30 else 2.
MIN_SLEEP = 3 # better sleep between 3 seconds, if this loop is running too frequent it may affect performance
loop_count = 0
REPLIES_CLEAN_UP_CYCLE = 10 # how many loop cycle to perform replies clean up
replies_timestamp = {} # hash of correlation_id of replies and its timestamp when first detected
while self.heartbeat and self.state == 'STARTED':
time.sleep(max(self.heartbeat / abs(RATE), MIN_SLEEP))
count_ok = 0
Expand Down Expand Up @@ -244,6 +247,23 @@ def heartbeat_check(self):
ctx = ClusterRpcProxyPool.RpcContext(self, self.config)
else:
count_ok += 1
# this allow only one RPCProxy connection to be cleanup at a time
d = loop_count - count_ok
if d > 0 and d % REPLIES_CLEAN_UP_CYCLE == 0:
count_clean = 0
now = time.time()
# perform cleanup on this RpcProxy connection replies
for msg_correlation_id in ctx._rpc._reply_listener.queue_consumer.replies.keys():
timestamp = replies_timestamp.get(msg_correlation_id)
if timestamp is None:
replies_timestamp[msg_correlation_id] = now
else:
# clean up the reply if its has stay in replies
if now - timestamp > self.timeout:
del ctx._rpc._reply_listener.queue_consumer.replies[msg_correlation_id]
del replies_timestamp[msg_correlation_id]
count_clean += 1
_logger.debug("Perform cleanup remove %d message", count_clean)
finally:
if ctx is not None and self.queue is not None:
self.queue.put_nowait(ctx)
Expand All @@ -255,6 +275,7 @@ def heartbeat_check(self):
_logger.error("%s: %s", type(exc).__name__, exc.args[0])
# just log the error out without raise to keep the heartbeat thread going
_logger.debug("Heart beat %d OK", count_ok)
loop_count += 1

def __del__(self):
if self.state != 'STOPPED':
Expand Down
4 changes: 3 additions & 1 deletion tox.ini
Expand Up @@ -13,7 +13,8 @@ minversion=1.8.0
envlist =

py{27}-django{111}-nameko{211,212}
py{35,36,37}-django{111,20,21,22}-nameko{211,212}
py{35}-django{111,20,21,22}-nameko{211,212}
py{36,37}-django{111,20,21,22}-nameko{211,212,213}
isort
flake8

Expand All @@ -31,6 +32,7 @@ deps =
django22: django >=2.2a1,<2.3
nameko211: nameko >=2.11,<2.12
nameko212: nameko >=2.12,<2.13
nameko213: nameko >=2.13,<2.14


[testenv:package]
Expand Down

0 comments on commit dc72041

Please sign in to comment.