Skip to content

Commit

Permalink
Merge pull request #12 from Yupeek/master
Browse files Browse the repository at this point in the history
add unittests and support python3 along with python2
  • Loading branch information
0xGosu committed Jul 2, 2019
2 parents 7b401eb + 8fe902f commit 9e0b390
Show file tree
Hide file tree
Showing 11 changed files with 387 additions and 59 deletions.
25 changes: 24 additions & 1 deletion .travis.yml
@@ -1,21 +1,44 @@
sudo: false
language: python
dist: xenial
services:
- docker
before_install:
- docker run -d --rm -p 15672:15672 -p 5672:5672 -p 5671:5671 --name nameko-rabbitmq nameko/nameko-rabbitmq:3.6.6

python:
- "2.7"
- "3.5"
- "3.6"
- "3.7"

addons:
apt_packages:
- libenchant-dev

install:
- pip install tox-travis virtualenv tox python-coveralls coveralls

cache:
directories:
- $HOME/.cache/pip

script:
- QUIET=true tox
stages:
- test
- deploy

jobs:
include:
- stage: test
after_success:
- coveralls
- stage: deploy
python: 2.7
script: python2 ./setup.py test
script: skip
install: skip
if: repo = "and3rson/django-nameko"
deploy:
provider: pypi
user: tranvietanh1991
Expand Down
18 changes: 18 additions & 0 deletions README.md
Expand Up @@ -8,6 +8,14 @@

Django wrapper for [Nameko] microservice framework.


# support
tested with

- python 2.7, 3.5, 3.6, 3.7
- django 1.11, 2.0, 2.1, 2.2
- nameko 2.11, 2.12

# How to use

```python
Expand Down Expand Up @@ -86,6 +94,16 @@ rpc.mailer.send_mail(bar='foo')

```

# contribute

to run the tests:
1. run a local rabbitmq
2. execute tox
```bash
docker run -d --rm -p 15672:15672 -p 5672:5672 -p 5671:5671 --name nameko-rabbitmq nameko/nameko-rabbitmq:3.6.6
tox
```

# Credits
Thanks to guys who made an awesome [Nameko] framework.

Expand Down
4 changes: 3 additions & 1 deletion django_nameko/__init__.py
@@ -1,4 +1,6 @@
from rpc import ClusterRpcProxyPool, get_pool, destroy_pool
from __future__ import absolute_import, unicode_literals

from .rpc import ClusterRpcProxyPool, destroy_pool, get_pool

__all__ = [
'ClusterRpcProxyPool',
Expand Down
104 changes: 55 additions & 49 deletions django_nameko/rpc.py
Expand Up @@ -9,16 +9,17 @@
#
from __future__ import absolute_import

import copy
import logging
import weakref
from threading import Lock

from six.moves import xrange as xrange_six, queue as queue_six
from amqp.exceptions import ConnectionError
from nameko.standalone.rpc import ClusterRpcProxy
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
import copy
from nameko.standalone.rpc import ClusterRpcProxy
from six.moves import queue as queue_six
from six.moves import xrange as xrange_six

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,8 +55,11 @@ def __init__(self, pool, config):
self._enable_rpc_call = False

def __del__(self):
if self._proxy:

try:
self._proxy.stop()
except AttributeError:
pass
self._proxy = None
self._rpc = None

Expand All @@ -77,8 +81,8 @@ def __exit__(self, exc_type, exc_value, traceback, **kwargs):
self._enable_rpc_call = False
try:
if exc_type == RuntimeError and (
exc_value == "This consumer has been stopped, and can no longer be used"
or exc_value == "This consumer has been disconnected, and can no longer be used"):
str(exc_value) == "This consumer has been stopped, and can no longer be used"
or str(exc_value) == "This consumer has been disconnected, and can no longer be used"):
self._pool._clear()
self._pool._reload() # reload all worker
self.__del__()
Expand All @@ -93,8 +97,9 @@ def __exit__(self, exc_type, exc_value, traceback, **kwargs):
for key in self._rpc._worker_ctx.data.keys():
del self._rpc._worker_ctx.data[key]
elif len(self._rpc._worker_ctx.data) != len(self._pool.context_data) \
or cmp(self._rpc._worker_ctx.data, self._pool.context_data) != 0:
# ensure that worker_ctx.data is revert back to original pool.context_data when exit of block
or self._rpc._worker_ctx.data != self._pool.context_data:
# ensure that worker_ctx.data is revert back to original
# pool.context_data when exit of block
for key in self._rpc._worker_ctx.data.keys():
if key not in self._pool.context_data:
del self._rpc._worker_ctx.data[key]
Expand All @@ -111,7 +116,7 @@ def __init__(self, config, pool_size=None, context_data=None, timeout=0):
pool_size = getattr(settings, 'NAMEKO_POOL_SIZE', 4)
if context_data is None: # keep this for compatiblity
context_data = getattr(settings, 'NAMEKO_CONTEXT_DATA', None)
if timeout <= 0: # keep this for compatiblity
if timeout is None or timeout <= 0: # keep this for compatiblity
timeout = getattr(settings, 'NAMEKO_TIMEOUT', None)
self.config = copy.deepcopy(config)
self.pool_size = pool_size
Expand Down Expand Up @@ -142,8 +147,8 @@ def _clear(self):
def _reload(self, num_of_worker=0):
""" Reload into pool's queue with number of new worker
:param num_of_worker:
:return:
:param int num_of_worker:
:return: None
"""
if num_of_worker <= 0:
num_of_worker = self.pool_size
Expand All @@ -159,6 +164,7 @@ def next(self, block=True, timeout=None):
""" Fetch next connection.
This method is thread-safe.
:rtype: ClusterRpcProxyPool.RpcContext
"""
return self.queue.get(block=block, timeout=timeout)

Expand Down Expand Up @@ -208,6 +214,7 @@ def get_pool(pool_name=None):
# ...
with get_pool().next() as rpc:
rpc.mailer.send_mail(foo='bar')
:rtype: ClusterRpcProxyPool
"""

global nameko_global_pools
Expand All @@ -218,43 +225,41 @@ def get_pool(pool_name=None):
raise ImproperlyConfigured('NAMEKO_CONFIG must be specified')
NAMEKO_MULTI_POOL = [name for name in NAMEKO_CONFIG.keys() if name.islower()]
# Lazy instantiation, acquire lock first to prevent dupication init
create_pool_lock.acquire()
if not nameko_global_pools: # double check inside lock is importance
if NAMEKO_MULTI_POOL:
nameko_global_pools = dict()
if 'default' not in NAMEKO_CONFIG and 'AMQP_URL' not in NAMEKO_CONFIG['default']:
raise ImproperlyConfigured(
'NAMEKO_CONFIG must be specified and should include at least "default" config with "AMQP_URL"')
default_config = NAMEKO_CONFIG['default']
# default_context_data = NAMEKO_CONFIG['default']['POOL'].get('CONTEXT_DATA', dict())
# multi_context_data = getattr(settings, 'NAMEKO_MULTI_CONTEXT_DATA', dict())
for name, _config in NAMEKO_CONFIG.items():
# each nameko_global_pools will have different config with default config as default
if name != 'default':
# overide default config with nameko_global_pools config by merging 2 dict
pool_config = dict(mergedicts(default_config.copy(), _config))
else:
# default nameko_global_pools
pool_config = default_config.copy()
# extract nameko_global_pools config from RpcCluster config
pool_size = pool_config.pop('POOL_SIZE', None)
pool_context_data = pool_config.pop('POOL_CONTEXT_DATA', None)
pool_timeout = pool_config.pop('POOL_TIMEOUT', 0)
# init nameko_global_pools
_pool = ClusterRpcProxyPool(pool_config, pool_size=pool_size, context_data=pool_context_data,
timeout=pool_timeout)
_pool.start()
# assign nameko_global_pools to corresponding name
nameko_global_pools[name] = _pool
else:
# single nameko_global_pools with old style configuration
if not hasattr(settings, 'NAMEKO_CONFIG') or not settings.NAMEKO_CONFIG:
raise ImproperlyConfigured(
'NAMEKO_CONFIG must be specified and should include at least "AMQP_URL" key.')
nameko_global_pools = ClusterRpcProxyPool(settings.NAMEKO_CONFIG)
nameko_global_pools.start() # start immediately
with create_pool_lock:
if not nameko_global_pools: # double check inside lock is importance
if NAMEKO_MULTI_POOL:
nameko_global_pools = dict()
if 'default' not in NAMEKO_CONFIG or 'AMQP_URL' not in NAMEKO_CONFIG['default']:
raise ImproperlyConfigured(
'NAMEKO_CONFIG must be specified and should '
'include at least "default" config with "AMQP_URL"')
default_config = NAMEKO_CONFIG['default']
# default_context_data = NAMEKO_CONFIG['default']['POOL'].get('CONTEXT_DATA', dict())
# multi_context_data = getattr(settings, 'NAMEKO_MULTI_CONTEXT_DATA', dict())
for name, _config in NAMEKO_CONFIG.items():
# each nameko_global_pools will have different config with default config as default
if name != 'default':
# overide default config with nameko_global_pools config by merging 2 dict
pool_config = dict(mergedicts(default_config.copy(), _config))
else:
# default nameko_global_pools
pool_config = default_config.copy()
# extract nameko_global_pools config from RpcCluster config
pool_size = pool_config.pop('POOL_SIZE', None)
pool_context_data = pool_config.pop('POOL_CONTEXT_DATA', None)
pool_timeout = pool_config.pop('POOL_TIMEOUT', 0)
# init nameko_global_pools
_pool = ClusterRpcProxyPool(pool_config, pool_size=pool_size, context_data=pool_context_data,
timeout=pool_timeout)
_pool.start()
# assign nameko_global_pools to corresponding name
nameko_global_pools[name] = _pool
else:
# single nameko_global_pools with old style configuration

nameko_global_pools = ClusterRpcProxyPool(settings.NAMEKO_CONFIG)
nameko_global_pools.start() # start immediately
# Finish instantiation, release lock
create_pool_lock.release()

if pool_name is not None:
if isinstance(nameko_global_pools, dict) is False or pool_name not in nameko_global_pools:
Expand All @@ -264,9 +269,10 @@ def get_pool(pool_name=None):
_pool = nameko_global_pools[pool_name]
else:
if isinstance(nameko_global_pools, dict):
if len(nameko_global_pools) == 0:
if len(nameko_global_pools) == 0: # pragma: nocover
# this code is unreachable, it's not passilbe to have a dict without a key in it.
raise ImproperlyConfigured('NAMEKO_CONFIG must include at least 1 "default" config')
_pool = nameko_global_pools.get('default', nameko_global_pools.values()[0])
_pool = nameko_global_pools.get('default', next(iter(nameko_global_pools.values())))
else:
_pool = nameko_global_pools

Expand Down
21 changes: 21 additions & 0 deletions setup.py
@@ -1,6 +1,11 @@
#!/usr/bin/env python2
import os

from os import path
this_directory = path.abspath(path.dirname(__file__))
with open(path.join(this_directory, 'README.md'), 'rb') as f:
long_description = f.read().decode('utf8')

with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'django_nameko/VERSION')) as f:
__version__ = f.read()

Expand All @@ -10,6 +15,8 @@
name='django-nameko',
version=__version__,
description=' Django wrapper for nameko microservice framework.',
long_description=long_description,
long_description_content_type='text/markdown',
url='http://github.com/and3rson/django-nameko',
author='Andrew Dunai',
author_email='andrew@dun.ai',
Expand All @@ -24,4 +31,18 @@
],
test_suite='nose.collector',
tests_require=['nose'],
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU General Public License v3 (GPLv2)',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Operating System :: OS Independent',
'Topic :: Software Development :: Libraries',
'Topic :: Utilities',
],
)
1 change: 1 addition & 0 deletions tests/__init__.py
@@ -0,0 +1 @@
# -*- coding: utf-8 -*-
3 changes: 3 additions & 0 deletions tests/config.yaml
@@ -0,0 +1,3 @@
AMQP_URI: amqp://guest:guest@localhost/

max_workers: 1
15 changes: 15 additions & 0 deletions tests/services.py
@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
import logging

from nameko.rpc import rpc

logger = logging.getLogger(__name__)


class EchoService(object):

name = 'echo'

@rpc
def echo(self, *attrs):
return tuple(attrs)

0 comments on commit 9e0b390

Please sign in to comment.