Skip to content

Commit

Permalink
Merge 0964a93 into 4cc267a
Browse files Browse the repository at this point in the history
  • Loading branch information
0xGosu committed Aug 16, 2019
2 parents 4cc267a + 0964a93 commit 72264f6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 7 deletions.
2 changes: 1 addition & 1 deletion django_nameko/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.6
0.6.0
60 changes: 54 additions & 6 deletions django_nameko/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import copy
import logging
import weakref
from threading import Lock

from amqp.exceptions import ConnectionError
from threading import Lock, Thread
import time
import socket
from amqp.exceptions import ConnectionError # heartbeat failed will raise this error: ConnectionForced
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from nameko.standalone.rpc import ClusterRpcProxy
from nameko.constants import AMQP_URI_CONFIG_KEY, HEARTBEAT_CONFIG_KEY
from six.moves import queue as queue_six
from six.moves import xrange as xrange_six

Expand Down Expand Up @@ -133,6 +135,8 @@ def __init__(self, config, pool_size=None, context_data=None, timeout=0):
self.pool_size = pool_size
self.context_data = copy.deepcopy(context_data)
self.timeout = timeout
self.heartbeat = self.config.get(HEARTBEAT_CONFIG_KEY)
self._heartbeat_check_thread = None
self.state = 'NOT_STARTED'
self.queue = None

Expand All @@ -144,6 +148,10 @@ def start(self):
ctx = ClusterRpcProxyPool.RpcContext(self, self.config)
self.queue.put(ctx)
self.state = 'STARTED'
if self.heartbeat:
self._heartbeat_check_thread = Thread(target=self.heartbeat_check)
self._heartbeat_check_thread.start()
_logger.debug("Heart beat check thread started")

@property
def is_started(self):
Expand Down Expand Up @@ -197,12 +205,52 @@ def stop(self):
break
self.queue.queue.clear()
self.queue = None
self.state = 'STOPPED'
if self._heartbeat_check_thread:
self._heartbeat_check_thread.join()
_logger.debug("Heart beat check thread stopped")

def heartbeat_check(self):
RATE = 2
while self.heartbeat and self.state == 'STARTED':
time.sleep(self.heartbeat/abs(RATE))
_logger.debug("Heart beating all connections")
for _ in xrange_six(self.pool_size):
ctx = None
try:
ctx = self.queue.get_nowait()
except queue_six.Empty:
break
else:
if ctx._rpc:
try:
try:
ctx._rpc._reply_listener.queue_consumer.connection.drain_events(timeout=0.1)
except socket.timeout:
pass
ctx._rpc._reply_listener.queue_consumer.connection.heartbeat_check() # rate=RATE

except ConnectionError as exc:
_logger.info("Heart beat failed. System will auto recover broken connection: %s", str(exc))
ctx.__del__()
ctx = ClusterRpcProxyPool.RpcContext(self, self.config)
finally:
if ctx is not None:
self.queue.put_nowait(ctx)

def __del__(self):
if self.state != 'STOPPED':
try:
self.stop()
except: # ignore any error since the object is being garbage collected
pass


nameko_global_pools = None
create_pool_lock = Lock()

WRONG_CONFIG_MSG = 'NAMEKO_CONFIG must be specified and should include at least "default" config with "AMQP_URI"'
WRONG_CONFIG_MSG = 'NAMEKO_CONFIG must be specified and should include at least "default" config with "%s"'%(AMQP_URI_CONFIG_KEY)


def mergedicts(dict1, dict2):
for k in set(dict1.keys()).union(dict2.keys()):
Expand Down Expand Up @@ -250,8 +298,8 @@ def get_pool(pool_name=None):
raise ImproperlyConfigured(WRONG_CONFIG_MSG)
else:
if 'AMQP_URL' in NAMEKO_CONFIG['default']: # compatible code to prevent typo mistake
NAMEKO_CONFIG['default']['AMQP_URI'] = NAMEKO_CONFIG['default'].pop('AMQP_URL')
if 'AMQP_URI' not in NAMEKO_CONFIG['default']:
NAMEKO_CONFIG['default'][AMQP_URI_CONFIG_KEY] = NAMEKO_CONFIG['default'].pop('AMQP_URL')
if AMQP_URI_CONFIG_KEY not in NAMEKO_CONFIG['default']:
raise ImproperlyConfigured(WRONG_CONFIG_MSG)

default_config = NAMEKO_CONFIG['default']
Expand Down

0 comments on commit 72264f6

Please sign in to comment.