Skip to content

Commit 285df05

Browse files
authored
Merge pull request #1 from clef/initial-logic
feat(flask_nameko): adds FlaskPooledClusterRpcProxy
2 parents 89698bb + 9aa8e95 commit 285df05

17 files changed

+314
-66
lines changed

MANIFEST.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
include HISTORY.rst
22
include LICENSE
3-
include README.rst
3+
include README.md
44

55
recursive-include tests *
66
recursive-exclude * __pycache__

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ clean-test:
4545
rm -fr htmlcov/
4646

4747
lint:
48-
flake8 flask-nameko tests
48+
flake8 flask_nameko tests
4949

5050
develop: clean
51-
virtualenv venv
51+
virtualenv --python=python2.7 venv
5252
. venv/bin/activate && pip install -r requirements_dev.txt
5353

5454
test:
@@ -61,7 +61,7 @@ test-all:
6161
tox
6262

6363
coverage:
64-
coverage run --source flask-nameko setup.py test
64+
coverage run --source flask_nameko setup.py test
6565
coverage report -m
6666
coverage html
6767
$(BROWSER) htmlcov/index.html

README.md

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,68 @@
1-
# flask-nameko
1+
# flask_nameko
22

33
A wrapper for using nameko services with Flask
44

55
## Installation
66

77
Install it via Clef's PyPI:
88

9-
pip install flask-nameko
9+
pip install flask_nameko
1010

1111
## Usage
12-
import flask-nameko
1312

14-
# Add more usage information here.
15-
# Good code is bad unless it's documented.
13+
To start using `flask_nameko`, you need to create and configure a new `FlaskPooledClusterRpcProxy` singleton, which you'll use to communicate with your Nameko cluster.
14+
15+
# __init__.py
16+
from flask import Flask
17+
from flask_nameko import FlaskPooledClusterRpcProxy
18+
19+
rpc = FlaskPooledClusterRpcProxy()
20+
21+
def create_app():
22+
app = Flask(__name__)
23+
app.config.update(dict(
24+
NAMEKO_AMQP_URI='amqp://localhost'
25+
))
26+
27+
rpc.init_app(app)
28+
29+
app = create_app()
30+
31+
Then, you can use the `FlaskPooledClusterRpcProxy` singleton just as you would normally use a `ClusterRpcProxy`, by accessing individual services by name and calling methods on them:
32+
33+
# routes.py
34+
35+
from . import (
36+
app,
37+
rpc
38+
)
39+
40+
@app.route('/'):
41+
def index():
42+
result = rpc.service.do_something('test')
43+
return result
44+
45+
## API
46+
47+
### Configuration
48+
49+
`FlaskPooledClusterRpcProxy` accepts all nameko configuration values, prefixed with the `NAMEKO_` prefix. In addition, it exposes additional configuration options:
50+
51+
* `INITIAL_CONNECTIONS (int, default=2)` - the number of initial connections to the Nameko cluster to create
52+
* `MAX_CONNECTIONS (int, default=8)` - the max number of connections to the Nameko cluster to create before raises an error
53+
54+
### Proxies
55+
56+
*flask_nameko.**FlaskPooledClusterRpcProxy**(app=None)*
57+
58+
This class is used to create a pool of connections to a Nameko cluster.
59+
60+
*init_app(app=None)*
61+
62+
Configure the proxy for a given app.
1663

1764
## Development
1865

19-
$ git clone git@github.com:clef/flask-nameko.git flask-nameko
20-
$ cd flask-nameko
66+
$ git clone git@github.com:clef/flask_nameko.git flask_nameko
67+
$ cd flask_nameko
2168
$ make develop

circle.yml

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,3 @@
1-
machine:
2-
environment:
3-
PIP_CERT: /home/ubuntu/.pypi.pem
4-
PIP_INTERNAL_CERT: |
5-
-----BEGIN CERTIFICATE-----
6-
MIIDVDCCAjwCCQDgHdRtPoGIUDANBgkqhkiG9w0BAQsFADBsMQswCQYDVQQGEwJV
7-
UzETMBEGA1UECBMKQ2FsaWZvcm5pYTENMAsGA1UEChMEQ2xlZjE5MDcGA1UEAxMw
8-
Y2xlZi1weXBpLTE3ODA3NjY5MjUudXMtZWFzdC0xLmVsYi5hbWF6b25hd3MuY29t
9-
MB4XDTE2MDQwODIzMjkyMVoXDTE3MDQwODIzMjkyMVowbDELMAkGA1UEBhMCVVMx
10-
EzARBgNVBAgTCkNhbGlmb3JuaWExDTALBgNVBAoTBENsZWYxOTA3BgNVBAMTMGNs
11-
ZWYtcHlwaS0xNzgwNzY2OTI1LnVzLWVhc3QtMS5lbGIuYW1hem9uYXdzLmNvbTCC
12-
ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAJ3vXy95gtyKgvsLI9G9H69s
13-
bFxnhdCbbQWLTFUypLWN92qssP73UZUHQsg7NvGLvUK6r99BtojsomC6KGqrt37S
14-
6gLQ0FnGmaMeGZw2f8T4Gdn+txegbZryNttxsEOX4uD7J9V818x/zv/1USheD/qV
15-
MrQENRvX0BbveJN7hIbTLN1+IQlSSoxLXSqf/gshUK99jhKSOaqxwSuweEuRInn8
16-
2jFh6GwLJL3mfpGYv6PnK+io3hgxljcC329ae4N/iegg3UawzGTJNT5AGpRjBAos
17-
vRERpu2wjqZ+y0YsWya1YSYwoleq7UYV9xXRAlavWy0XmSwXNGdHOXkiQjUUvmkC
18-
AwEAATANBgkqhkiG9w0BAQsFAAOCAQEAITaAnchir51dItbTAkKoJbNxS3kUmh57
19-
Y168Z+MblOoRPxjpeue+QXwErVMAALqAKgC9vOofDThSO0Z8jcKRG5OXXbH0fbIz
20-
WqYznjrbycAP3ecrcNtIB3gLPHCYabRHto/j0Zcn+7QwAmAVBNGvtRgldOL9TOm7
21-
JHrRDF3/thMtdRQfhbh1S1dBP+uuH+ouGfOJgP0LN5zu35vSKmKpLrSerTvW9yOK
22-
q4vttoEKuILwTc2ED94T9aw7Fy8zzh9477lb0fQUbh/aSF5oKX+SYQP0B0rB9qyJ
23-
klFCOKVEmTqvPdUG8VfEd+YO9EQI48S4HLJwwgnWIfV5lVjGgwpJ+w==
24-
-----END CERTIFICATE-----
25-
post:
26-
# Set up pip to use our private PyPI server
27-
- echo -e "$PIP_INTERNAL_CERT" | cat - `python -m pip._vendor.requests.certs` > $PIP_CERT
281
dependencies:
292
cache_directories:
303
- .tox

flask-nameko/flask-nameko.py

Lines changed: 0 additions & 1 deletion
This file was deleted.

flask-nameko/__init__.py renamed to flask_nameko/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@
33
__author__ = 'Jesse Pollak'
44
__email__ = 'jesse@getclef.com'
55
__version__ = '0.1.0'
6+
7+
from errors import *
8+
from proxies import FlaskPooledClusterRpcProxy

flask_nameko/connection_pool.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from Queue import Queue, Empty
2+
from threading import Lock
3+
from .errors import ClientUnavailableError
4+
5+
class ConnectionPool(object):
6+
def __init__(self, get_connection, initial_connections=2, max_connections=8):
7+
"""
8+
Create a new pool
9+
:param func get_connection: The function that returns a connection
10+
:param int initial_connections: The initial number of connection objects to create
11+
:param int max_connections: The maximum amount of connections to create. These
12+
connections will only be created on demand and will potentially be
13+
destroyed once they have been returned via a call to
14+
:meth:`release_connection`
15+
constructor
16+
"""
17+
self._get_connection = get_connection
18+
self._queue = Queue()
19+
self._current_connections = 0
20+
self._max_connections = max_connections
21+
self._lock = Lock()
22+
23+
for x in range(initial_connections):
24+
connection = self._make_connection()
25+
self._queue.put(connection)
26+
27+
def _make_connection(self):
28+
ret = self._get_connection()
29+
self._current_connections += 1
30+
return ret
31+
32+
def get_connection(self, initial_timeout=0.05, next_timeout=1):
33+
"""
34+
Wait until a connection instance is available
35+
:param float initial_timeout:
36+
how long to wait initially for an existing connection to complete
37+
:param float next_timeout:
38+
if the pool could not obtain a connection during the initial timeout,
39+
and we have allocated the maximum available number of connections, wait
40+
this long until we can retrieve another one
41+
:return: A connection object
42+
"""
43+
try:
44+
return self._queue.get(True, initial_timeout)
45+
except Empty:
46+
try:
47+
self._lock.acquire()
48+
if self._current_connections == self._max_connections:
49+
raise ClientUnavailableError("Too many connections in use")
50+
cb = self._make_connection()
51+
return cb
52+
except ClientUnavailableError as ex:
53+
try:
54+
return self._queue.get(True, next_timeout)
55+
except Empty:
56+
raise ex
57+
finally:
58+
self._lock.release()
59+
60+
def release_connection(self, cb):
61+
"""
62+
Return a Connection object to the pool
63+
:param Connection cb: the connection to release
64+
"""
65+
self._queue.put(cb, True)

flask_nameko/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
class BadConfigurationError(Exception):
2+
pass
3+
class ClientUnavailableError(Exception):
4+
pass

flask_nameko/proxies.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import re
2+
from flask import g
3+
from nameko.standalone.rpc import ClusterRpcProxy
4+
from .connection_pool import ConnectionPool
5+
from .errors import (
6+
BadConfigurationError
7+
)
8+
9+
class PooledClusterRpcProxy(object):
10+
11+
_pool = None
12+
_config = None
13+
14+
def __init__(self, config=None):
15+
if config:
16+
self.configure(config)
17+
18+
def configure(self, config):
19+
if not config.get('AMQP_URI'):
20+
raise BadConfigurationError("Please provide a valid configuration.")
21+
22+
self._config = config
23+
self._pool = ConnectionPool(
24+
self._get_nameko_connection,
25+
initial_connections=config.get('INITIAL_CONNECTIONS', 2),
26+
max_connections=config.get('MAX_CONNETIONS', 8)
27+
)
28+
29+
def _get_nameko_connection(self):
30+
proxy = ClusterRpcProxy(self._config)
31+
return proxy.start()
32+
33+
def get_connection(self):
34+
if not self._pool:
35+
raise ClusterNotConfiguredError("Please configure your cluster beore requesting a connection.")
36+
return self._pool.get_connection()
37+
38+
def release_connection(self, connection):
39+
return self._pool.release_connection(connection)
40+
41+
42+
class FlaskPooledClusterRpcProxy(PooledClusterRpcProxy):
43+
def __init__(self, app=None):
44+
if app:
45+
self.init_app(app)
46+
47+
def init_app(self, app):
48+
config = dict()
49+
for key, val in app.config.iteritems():
50+
match = re.match(r"NAMEKO\_(?P<name>.*)", key)
51+
if match:
52+
config[match.group('name')] = val
53+
self.configure(config)
54+
app.teardown_appcontext(self._teardown_nameko_connection)
55+
56+
def get_connection(self):
57+
connection = getattr(g, '_nameko_connection', None)
58+
if not connection:
59+
connection = super(FlaskPooledClusterRpcProxy, self).get_connection()
60+
g._nameko_connection = connection
61+
return connection
62+
63+
def _teardown_nameko_connection(self, exception):
64+
connection = getattr(g, '_nameko_connection', None)
65+
if connection is not None:
66+
self.release_connection(connection)
67+
68+
def __getattr__(self, name):
69+
return getattr(self.get_connection(), name)
70+
71+
def __getitem__(self, name):
72+
return getattr(self.get_connection(), name)
73+

pytest.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
[pytest]
22
norecursedirs = venv .git *.egg *site-packages*
3+
addopts = -s

0 commit comments

Comments
 (0)