Skip to content

Commit

Permalink
POC: Custom delivery options
Browse files Browse the repository at this point in the history
  • Loading branch information
Artur Stawiarski committed Nov 6, 2015
1 parent b037751 commit e676f11
Showing 1 changed file with 34 additions and 9 deletions.
43 changes: 34 additions & 9 deletions nameko/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,23 +300,35 @@ class RpcProxy(DependencyProvider):

rpc_reply_listener = ReplyListener()

def __init__(self, target_service):
def __init__(self, target_service, delivery_options=None):
self.target_service = target_service
self.delivery_options = delivery_options

def get_dependency(self, worker_ctx):
return ServiceProxy(worker_ctx, self.target_service,
self.rpc_reply_listener)
self.rpc_reply_listener, self.delivery_options)


class ServiceProxy(object):
def __init__(self, worker_ctx, service_name, reply_listener):
def __init__(self, worker_ctx, service_name, reply_listener,
delivery_options=None):
self.worker_ctx = worker_ctx
self.service_name = service_name
self.reply_listener = reply_listener
self.delivery_options = delivery_options

def set_delivery_options(self, expire=None, delivery='persistent',
compression=None):
self.delivery_options = {
'expiration': expire,
'delivery_mode': delivery,
'compression': compression
}

def __getattr__(self, name):
return MethodProxy(
self.worker_ctx, self.service_name, name, self.reply_listener)
self.worker_ctx, self.service_name, name, self.reply_listener,
self.delivery_options)


class RpcReply(object):
Expand All @@ -340,21 +352,33 @@ def result(self):

class MethodProxy(HeaderEncoder):

def __init__(self, worker_ctx, service_name, method_name, reply_listener):
def __init__(self, worker_ctx, service_name, method_name, reply_listener,
delivery_options=None):
self.worker_ctx = worker_ctx
self.service_name = service_name
self.method_name = method_name
self.reply_listener = reply_listener
self.delivery_options = delivery_options or {}

def __call__(self, *args, **kwargs):
reply = self._call(*args, **kwargs)
reply = self._call(args, kwargs, self.delivery_options)
return reply.result()

def async(self, *args, **kwargs):
reply = self._call(*args, **kwargs)
reply = self._call(args, kwargs, self.delivery_options)
return reply

def send(self, args, kwargs, expire=None, delivery='persistent',
compression=None):
delivery_options = {
'expiration': expire,
'delivery_mode': delivery,
'compression': compression
}
reply = self._call(args, kwargs, delivery_options)
return reply

def _call(self, *args, **kwargs):
def _call(self, args, kwargs, delivery_options):
_log.debug('invoking %s', self)

worker_ctx = self.worker_ctx
Expand Down Expand Up @@ -408,7 +432,8 @@ def _call(self, *args, **kwargs):
headers=headers,
correlation_id=correlation_id,
retry=True,
retry_policy=DEFAULT_RETRY_POLICY
retry_policy=DEFAULT_RETRY_POLICY,
**delivery_options
)

# This used to do .empty() to check if the queue is empty
Expand Down

0 comments on commit e676f11

Please sign in to comment.