Skip to content
Permalink
Browse files

use txaio

  • Loading branch information
meejah committed Apr 7, 2015
1 parent 0584475 commit cd5dda0dc9acb27d1016022c9c2b83e5b1671482
Showing with 25 additions and 28 deletions.
  1. +7 −7 crossbar/router/broker.py
  2. +7 −7 crossbar/router/dealer.py
  3. +2 −4 crossbar/router/router.py
  4. +9 −10 crossbar/router/session.py
@@ -37,17 +37,17 @@

from autobahn.wamp.message import _URI_PAT_STRICT_NON_EMPTY, \
_URI_PAT_LOOSE_NON_EMPTY, _URI_PAT_STRICT_EMPTY, _URI_PAT_LOOSE_EMPTY
from autobahn.twisted.wamp import FutureMixin

from crossbar.router.observation import UriObservationMap
from crossbar.router.types import RouterOptions
from crossbar.router.interfaces import IRouter

__all__ = ('Broker',)
import txaio

__all__ = ('Broker',)

class Broker(FutureMixin):

class Broker(object):
"""
Basic WAMP broker.
"""
@@ -162,7 +162,7 @@ def processPublish(self, session, publish):

# authorize PUBLISH action
#
d = self._as_future(self._router.authorize, session, publish.topic, IRouter.ACTION_PUBLISH)
d = txaio.as_future(self._router.authorize, session, publish.topic, IRouter.ACTION_PUBLISH)

def on_authorize_success(authorized):

@@ -269,7 +269,7 @@ def on_authorize_error(err):
reply = message.Error(message.Publish.MESSAGE_TYPE, publish.request, ApplicationError.AUTHORIZATION_FAILED, ["failed to authorize session for publishing to topic URI '{0}': {1}".format(publish.topic, err.value)])
session._transport.send(reply)

self._add_future_callbacks(d, on_authorize_success, on_authorize_error)
txaio.add_callbacks(d, on_authorize_success, on_authorize_error)

def processSubscribe(self, session, subscribe):
"""
@@ -296,7 +296,7 @@ def processSubscribe(self, session, subscribe):

# authorize action
#
d = self._as_future(self._router.authorize, session, subscribe.topic, IRouter.ACTION_SUBSCRIBE)
d = txaio.as_future(self._router.authorize, session, subscribe.topic, IRouter.ACTION_SUBSCRIBE)

def on_authorize_success(authorized):
if not authorized:
@@ -343,7 +343,7 @@ def on_authorize_error(err):
reply = message.Error(message.Subscribe.MESSAGE_TYPE, subscribe.request, ApplicationError.AUTHORIZATION_FAILED, ["failed to authorize session for subscribing to topic URI '{0}': {1}".format(subscribe.topic, err.value)])
session._transport.send(reply)

self._add_future_callbacks(d, on_authorize_success, on_authorize_error)
txaio.add_callbacks(d, on_authorize_success, on_authorize_error)

def processUnsubscribe(self, session, unsubscribe):
"""
@@ -39,12 +39,13 @@

from autobahn.wamp.message import _URI_PAT_STRICT_NON_EMPTY, \
_URI_PAT_LOOSE_NON_EMPTY, _URI_PAT_STRICT_EMPTY, _URI_PAT_LOOSE_EMPTY
from autobahn.twisted.wamp import FutureMixin

from crossbar.router.observation import UriObservationMap
from crossbar.router.types import RouterOptions
from crossbar.router.interfaces import IRouter

import txaio

__all__ = ('Dealer',)


@@ -63,8 +64,7 @@ def __init__(self, invoke=message.Register.INVOKE_SINGLE):
self.roundrobin_current = 0


class Dealer(FutureMixin):

class Dealer(object):
"""
Basic WAMP dealer.
"""
@@ -189,7 +189,7 @@ def processRegister(self, session, register):

# authorize action
#
d = self._as_future(self._router.authorize, session, register.procedure, IRouter.ACTION_REGISTER)
d = txaio.as_future(self._router.authorize, session, register.procedure, IRouter.ACTION_REGISTER)

def on_authorize_success(authorized):
if not authorized:
@@ -238,7 +238,7 @@ def on_authorize_error(err):
reply = message.Error(message.Register.MESSAGE_TYPE, register.request, ApplicationError.AUTHORIZATION_FAILED, ["failed to authorize session for registering procedure '{0}': {1}".format(register.procedure, err.value)])
session._transport.send(reply)

self._add_future_callbacks(d, on_authorize_success, on_authorize_error)
txaio.add_callbacks(d, on_authorize_success, on_authorize_error)

def processUnregister(self, session, unregister):
"""
@@ -337,7 +337,7 @@ def processCall(self, session, call):

# authorize CALL action
#
d = self._as_future(self._router.authorize, session, call.procedure, IRouter.ACTION_CALL)
d = txaio.as_future(self._router.authorize, session, call.procedure, IRouter.ACTION_CALL)

def on_authorize_success(authorized):

@@ -411,7 +411,7 @@ def on_authorize_error(err):
reply = message.Error(message.Call.MESSAGE_TYPE, call.request, ApplicationError.AUTHORIZATION_FAILED, ["failed to authorize session for calling procedure '{0}': {1}".format(call.procedure, err.value)])
session._transport.send(reply)

self._add_future_callbacks(d, on_authorize_success, on_authorize_error)
txaio.add_callbacks(d, on_authorize_success, on_authorize_error)

else:
reply = message.Error(message.Call.MESSAGE_TYPE, call.request, ApplicationError.NO_SUCH_PROCEDURE, ["no callee registered for procedure '{0}'".format(call.procedure)])
@@ -33,7 +33,6 @@
from twisted.python import log

from autobahn.wamp import message
from autobahn.twisted.wamp import FutureMixin
from autobahn.wamp.exception import ProtocolError

from crossbar.router.interfaces import IRouter
@@ -44,12 +43,11 @@
CrossbarRouterTrustedRole, CrossbarRouterRoleStaticAuth, \
CrossbarRouterRoleDynamicAuth


class Router(FutureMixin):

class Router(object):
"""
Basic WAMP router.
"""

broker = Broker
"""
The broker class this router will use.
@@ -47,11 +47,12 @@
from autobahn.wamp import message
from autobahn.wamp.exception import ApplicationError
from autobahn.wamp.protocol import BaseSession
from autobahn.twisted.wamp import FutureMixin
from autobahn.wamp.exception import ProtocolError, SessionNotReady
from autobahn.wamp.types import SessionDetails
from autobahn.wamp.interfaces import ITransportHandler

import txaio

from crossbar.router.auth import PendingAuthPersona, \
PendingAuthWampCra, \
PendingAuthTicket
@@ -151,7 +152,7 @@ def send(self, msg):
self._session._authid, self._session._authrole, self._session._authmethod,
self._session._authprovider)

self._session._as_future(self._session.onJoin, details)
txaio.as_future(self._session.onJoin, details)
# self._session.onJoin(details)

# app-to-router
@@ -200,8 +201,7 @@ def send(self, msg):
raise Exception("RouterApplicationSession.send: unhandled message {0}".format(msg))


class RouterSession(FutureMixin, BaseSession):

class RouterSession(BaseSession):
"""
WAMP router session. This class implements :class:`autobahn.wamp.interfaces.ITransportHandler`.
"""
@@ -282,7 +282,7 @@ def welcome(realm, authid=None, authrole=None, authmethod=None, authprovider=Non

details = types.HelloDetails(msg.roles, msg.authmethods, msg.authid, self._pending_session_id)

d = self._as_future(self.onHello, self._realm, details)
d = txaio.as_future(self.onHello, self._realm, details)

def success(res):
msg = None
@@ -302,11 +302,11 @@ def success(res):
if msg:
self._transport.send(msg)

self._add_future_callbacks(d, success, self._onError)
txaio.add_callbacks(d, success, self._onError)

elif isinstance(msg, message.Authenticate):

d = self._as_future(self.onAuthenticate, msg.signature, {})
d = txaio.as_future(self.onAuthenticate, msg.signature, {})

def success(res):
msg = None
@@ -323,7 +323,7 @@ def success(res):
if msg:
self._transport.send(msg)

self._add_future_callbacks(d, success, self._onError)
txaio.add_callbacks(d, success, self._onError)

elif isinstance(msg, message.Abort):

@@ -441,8 +441,7 @@ def onError(self, err):
ITransportHandler.register(RouterSession)


class RouterSessionFactory(FutureMixin):

class RouterSessionFactory(object):
"""
WAMP router session factory.
"""

0 comments on commit cd5dda0

Please sign in to comment.
You can’t perform that action at this time.