Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement call cancelation #299 #1111

Merged
merged 3 commits into from
Jun 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions crossbar/router/dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ class InvocationRequest(object):
Holding information for an individual invocation.
"""

__slots__ = ('id', 'registration', 'caller', 'call', 'callee')
__slots__ = ('id', 'registration', 'caller', 'call', 'callee', 'canceled')

def __init__(self, id, registration, caller, call, callee):
self.id = id
self.registration = registration
self.caller = caller
self.call = call
self.callee = callee
self.canceled = False


class RegistrationExtra(object):
Expand Down Expand Up @@ -129,6 +130,9 @@ def __init__(self, router, options=None):
# _invocations map below! Use the helper methods
# _add_invoke_request and _remove_invoke_request

# map: call -> in-flight invocations
self._invocations_by_call = {}

# pending callee invocation requests
self._invocations = {}

Expand All @@ -145,7 +149,8 @@ def __init__(self, router, options=None):
registration_revocation=True,
payload_transparency=True,
testament_meta_api=True,
payload_encryption_cryptobox=True)
payload_encryption_cryptobox=True,
call_canceling=True)

# store for call queues
if self._router._store:
Expand Down Expand Up @@ -637,6 +642,7 @@ def _add_invoke_request(self, invocation_request_id, registration, session, call
"""
invoke_request = InvocationRequest(invocation_request_id, registration, session, call, callee)
self._invocations[invocation_request_id] = invoke_request
self._invocations_by_call[call.request] = invoke_request
invokes = self._callee_to_invocations.get(callee, [])
invokes.append(invoke_request)
self._callee_to_invocations[callee] = invokes
Expand All @@ -652,15 +658,33 @@ def _remove_invoke_request(self, invocation_request):
if not invokes:
del self._callee_to_invocations[invocation_request.callee]
del self._invocations[invocation_request.id]
del self._invocations_by_call[invocation_request.call.request]

# noinspection PyUnusedLocal
def processCancel(self, session, cancel):
# type: (session.RouterSession, message.Cancel) -> None
"""
Implements :func:`crossbar.router.interfaces.IDealer.processCancel`
"""
assert(session in self._session_to_registrations)
if cancel.request in self._invocations_by_call:
invocation_request = self._invocations_by_call[cancel.request]

if invocation_request.caller is not session:
raise ProtocolError(u"Dealer.processCancel(): CANCEL received for non-owned call request ID {0}".format(cancel.request))

# for those that repeatedly push elevator buttons
if invocation_request.canceled:
return

raise Exception("not implemented")
invocation_request.canceled = True

if 'callee' in session._session_roles and session._session_roles['callee'] and session._session_roles['callee'].call_canceling:
self._router.send(invocation_request.callee, message.Interrupt(
invocation_request.id,
cancel.mode
))

return

def processYield(self, session, yield_):
"""
Expand Down
158 changes: 158 additions & 0 deletions crossbar/router/test/test_dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from autobahn.wamp import message
from autobahn.wamp import role
from autobahn.wamp.exception import ProtocolError

from crossbar.worker.router import RouterRealm
from crossbar.router.router import RouterFactory
Expand Down Expand Up @@ -150,3 +151,160 @@ def test_outstanding_invoke_but_caller_gone(self):
dealer.detach(session)

self.assertEqual([], outstanding.mock_calls)

def test_call_cancel(self):
last_message = {'1': []}

def session_send(msg):
last_message['1'] = msg

session = mock.Mock()
session._transport.send = session_send
session._session_roles = {'callee': role.RoleCalleeFeatures(call_canceling=True)}

dealer = self.router._dealer
dealer.attach(session)

def authorize(*args, **kwargs):
return defer.succeed({u'allow': True, u'disclose': False})

self.router.authorize = mock.Mock(side_effect=authorize)

dealer.processRegister(session, message.Register(
1,
u'com.example.my.proc',
u'exact',
message.Register.INVOKE_SINGLE,
1
))

registered_msg = last_message['1']
self.assertIsInstance(registered_msg, message.Registered)

dealer.processCall(session, message.Call(
2,
u'com.example.my.proc',
[]
))

invocation_msg = last_message['1']
self.assertIsInstance(invocation_msg, message.Invocation)

dealer.processCancel(session, message.Cancel(
2
))

# should receive an INTERRUPT from the dealer now
interrupt_msg = last_message['1']
self.assertIsInstance(interrupt_msg, message.Interrupt)
self.assertEqual(interrupt_msg.request, invocation_msg.request)

dealer.processInvocationError(session, message.Error(
message.Invocation.MESSAGE_TYPE,
invocation_msg.request,
u'wamp.error.canceled'
))

call_error_msg = last_message['1']
self.assertIsInstance(call_error_msg, message.Error)
self.assertEqual(message.Call.MESSAGE_TYPE, call_error_msg.request_type)
self.assertEqual(u'wamp.error.canceled', call_error_msg.error)

def test_call_cancel_without_callee_support(self):
last_message = {'1': []}

def session_send(msg):
last_message['1'] = msg

session = mock.Mock()
session._transport.send = session_send
session._session_roles = {'callee': role.RoleCalleeFeatures()}

dealer = self.router._dealer
dealer.attach(session)

def authorize(*args, **kwargs):
return defer.succeed({u'allow': True, u'disclose': False})

self.router.authorize = mock.Mock(side_effect=authorize)

dealer.processRegister(session, message.Register(
1,
u'com.example.my.proc',
u'exact',
message.Register.INVOKE_SINGLE,
1
))

registered_msg = last_message['1']
self.assertIsInstance(registered_msg, message.Registered)

dealer.processCall(session, message.Call(
2,
u'com.example.my.proc',
[]
))

invocation_msg = last_message['1']
self.assertIsInstance(invocation_msg, message.Invocation)

dealer.processCancel(session, message.Cancel(
2
))

# set message to None to make sure that we get nothing back
last_message['1'] = None

# should NOT receive an INTERRUPT from the dealer now
interrupt_msg = last_message['1']
self.assertIsNone(interrupt_msg)

def test_call_cancel_nonowned_call(self):
last_message = {'1': []}

def session_send(msg):
last_message['1'] = msg

session = mock.Mock()
session._transport.send = session_send
session._session_roles = {'callee': role.RoleCalleeFeatures(call_canceling=True)}

dealer = self.router._dealer
dealer.attach(session)

def authorize(*args, **kwargs):
return defer.succeed({u'allow': True, u'disclose': False})

self.router.authorize = mock.Mock(side_effect=authorize)

dealer.processRegister(session, message.Register(
1,
u'com.example.my.proc',
u'exact',
message.Register.INVOKE_SINGLE,
1
))

registered_msg = last_message['1']
self.assertIsInstance(registered_msg, message.Registered)

dealer.processCall(session, message.Call(
2,
u'com.example.my.proc',
[]
))

invocation_msg = last_message['1']
self.assertIsInstance(invocation_msg, message.Invocation)

bad_session = mock.Mock()
bad_session._session_roles = {'callee': role.RoleCalleeFeatures(call_canceling=True)}

dealer.attach(bad_session)

def attempt_bad_cancel():
dealer.processCancel(bad_session, message.Cancel(
2
))

self.failUnlessRaises(ProtocolError, attempt_bad_cancel)