Skip to content

Commit

Permalink
WebSocket testing support, via Agent-style interface (#1186)
Browse files Browse the repository at this point in the history
* proof-of-concept

* missing arg

* fixups

* an example

* leftovers

* client-side is_open

* cleanup

* cleanup

* cleanup

* proof-of-concept memory testing agent

* there's MemoryReactorClock it turns out

* move test-server-protocol to the test

* make tests work for real

* a different way to structure tests

* refactor; pass in reactor, proper test resolver

* refactor tests (Echo one doesn't work)

* refactor; echo server example works

* cleanup

* Use an IOPumper to flush data (thanks to @exarkun suggestions)

* separate pumper

* flake8

* IHostnameResolver is twisted 17.1.0+ only

* py27

* flake8

* a test of wss://

* cleanups
  • Loading branch information
meejah authored and oberstet committed May 7, 2019
1 parent 82848e0 commit c8c7420
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 1 deletion.
77 changes: 77 additions & 0 deletions autobahn/twisted/test/test_websocket_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from twisted.trial import unittest

try:
from autobahn.twisted.testing import create_memory_agent, MemoryReactorClockResolver, create_pumper
HAVE_TESTING = True
except ImportError:
HAVE_TESTING = False

from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.websocket import WebSocketServerProtocol


class TestAgent(unittest.TestCase):

skip = not HAVE_TESTING

def setUp(self):
self.pumper = create_pumper()
self.reactor = MemoryReactorClockResolver()
return self.pumper.start()

def tearDown(self):
return self.pumper.stop()

@inlineCallbacks
def test_echo_server(self):

class EchoServer(WebSocketServerProtocol):
def onMessage(self, msg, is_binary):
self.sendMessage(msg)

agent = create_memory_agent(self.reactor, self.pumper, EchoServer)
proto = yield agent.open(u"ws://localhost:1234/ws", dict())

messages = []

def got(msg, is_binary):
messages.append(msg)
proto.on("message", got)

proto.sendMessage(b"hello")

if True:
# clean close
proto.sendClose()
else:
# unclean close
proto.transport.loseConnection()
yield proto.is_closed
self.assertEqual([b"hello"], messages)

@inlineCallbacks
def test_secure_echo_server(self):

class EchoServer(WebSocketServerProtocol):
def onMessage(self, msg, is_binary):
self.sendMessage(msg)

agent = create_memory_agent(self.reactor, self.pumper, EchoServer)
proto = yield agent.open(u"wss://localhost:1234/ws", dict())

messages = []

def got(msg, is_binary):
messages.append(msg)
proto.on("message", got)

proto.sendMessage(b"hello")

if True:
# clean close
proto.sendClose()
else:
# unclean close
proto.transport.loseConnection()
yield proto.is_closed
self.assertEqual([b"hello"], messages)
290 changes: 290 additions & 0 deletions autobahn/twisted/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################

from __future__ import absolute_import

# IHostnameResolver et al. were added in Twisted 17.1.0 .. before
# that, it was IResolverSimple only.

try:
from twisted.internet.interfaces import IHostnameResolver
except ImportError:
raise ImportError(
"Twisted 17.1.0 or later required for autobahn.twisted.testing"
)

from twisted.internet.defer import Deferred
from twisted.internet.address import IPv4Address
from twisted.internet._resolver import HostResolution # "internal" class, but it's simple
from twisted.internet.interfaces import ISSLTransport, IReactorPluggableNameResolver
from twisted.test.proto_helpers import MemoryReactorClock
from twisted.test import iosim

from zope.interface import directlyProvides, implementer

from autobahn.websocket.interfaces import IWebSocketClientAgent
from autobahn.twisted.websocket import _TwistedWebSocketClientAgent
from autobahn.twisted.websocket import WebSocketServerProtocol
from autobahn.twisted.websocket import WebSocketServerFactory


__all__ = (
'create_pumper',
'create_memory_agent',
'MemoryReactorClockResolver',
)


@implementer(IHostnameResolver)
class _StaticTestResolver(object):
def resolveHostName(self, receiver, hostName, portNumber=0):
"""
Implement IHostnameResolver which always returns 127.0.0.1:31337
"""
resolution = HostResolution(hostName)
receiver.resolutionBegan(resolution)
receiver.addressResolved(
IPv4Address('TCP', '127.0.0.1', 31337 if portNumber == 0 else portNumber)
)
receiver.resolutionComplete()


@implementer(IReactorPluggableNameResolver)
class _TestNameResolver(object):
"""
A test version of IReactorPluggableNameResolver
"""

_resolver = None

@property
def nameResolver(self):
if self._resolver is None:
self._resolver = _StaticTestResolver()
return self._resolver

def installNameResolver(self, resolver):
old = self._resolver
self._resolver = resolver
return old


class MemoryReactorClockResolver(MemoryReactorClock, _TestNameResolver):
"""
Combine MemoryReactor, Clock and an IReactorPluggableNameResolver
together.
"""
pass


class _TwistedWebMemoryAgent(IWebSocketClientAgent):
"""
A testing agent which will hook up an instance of
`server_protocol` for every client that is created via the `open`
API call.
:param reactor: the reactor to use for tests (usually an instance
of MemoryReactorClockResolver)
:param pumper: an implementation IPumper (e.g. as returned by
`create_pumper`)
:param server_protocol: the server-side WebSocket protocol class
to instantiate (e.g. a subclass of `WebSocketServerProtocol`
"""

def __init__(self, reactor, pumper, server_protocol):
self._reactor = reactor
self._server_protocol = server_protocol
self._pumper = pumper

# our "real" underlying agent under test
self._agent = _TwistedWebSocketClientAgent(self._reactor)
self._pumps = set()
self._servers = dict() # client -> server

def open(self, transport_config, options, protocol_class=None):
"""
Implement IWebSocketClientAgent with in-memory transports.
:param transport_config: a string starting with 'wss://' or
'ws://'
:param options: a dict containing options
:param protocol_class: the client protocol class to
instantiate (or `None` for defaults, which is to use
`WebSocketClientProtocol`)
"""
is_secure = transport_config.startswith("wss://")

# call our "real" agent
real_client_protocol = self._agent.open(
transport_config, options,
protocol_class=protocol_class,
)

if is_secure:
host, port, factory, context_factory, timeout, bindAddress = self._reactor.sslClients[-1]
else:
host, port, factory, timeout, bindAddress = self._reactor.tcpClients[-1]

server_address = IPv4Address('TCP', '127.0.0.1', port)
client_address = IPv4Address('TCP', '127.0.0.1', 31337)

server_protocol = self._server_protocol()
server_protocol.factory = WebSocketServerFactory()

server_transport = iosim.FakeTransport(
server_protocol, isServer=True,
hostAddress=server_address, peerAddress=client_address)
clientProtocol = factory.buildProtocol(None)
client_transport = iosim.FakeTransport(
clientProtocol, isServer=False,
hostAddress=client_address, peerAddress=server_address)

if is_secure:
directlyProvides(server_transport, ISSLTransport)
directlyProvides(client_transport, ISSLTransport)

pump = iosim.connect(
server_protocol, server_transport, clientProtocol, client_transport)
self._pumper.add(pump)

def add_mapping(proto):
self._servers[proto] = server_protocol
return proto
real_client_protocol.addCallback(add_mapping)
return real_client_protocol


class _Kalamazoo(object):
"""
Feeling whimsical about class names, see https://en.wikipedia.org/wiki/Handcar
This is 'an IOPump pumper', an object which causes a series of
IOPumps it is monitoring to do their I/O operations
periodically. This needs the 'real' reactor which trial drives,
because reasons:
- so @inlineCallbacks / async-def functions work
(if I could explain exactly why here, I would)
- we need to 'break the loop' of synchronous calls somewhere and
polluting the tests themselves with that is bad
- get rid of e.g. .flush() calls in tests themselves (thus
'teaching' the tests about details of I/O scheduling that they
shouldn't know).
"""

def __init__(self):
self._pumps = set()
self._pumping = False
self._waiting_for_stop = []
from twisted.internet import reactor as global_reactor
self._global_reactor = global_reactor

def add(self, p):
"""
Add a new IOPump. It will be removed when both its client and
server are disconnected.
"""
self._pumps.add(p)

def start(self):
"""
Begin triggering I/O in all IOPump instances we have. We will keep
periodically 'pumping' our IOPumps until `.stop()` is
called. Call from `setUp()` for example.
"""
if self._pumping:
return
self._pumping = True
self._global_reactor.callLater(0, self._pump_once)

def stop(self):
"""
:returns: a Deferred that fires when we have stopped pump()-ing
Call from `tearDown()`, for example.
"""
if self._pumping or len(self._waiting_for_stop):
d = Deferred()
self._waiting_for_stop.append(d)
self._pumping = False
return d
d = Deferred()
d.callback(None)
return d

def _pump_once(self):
"""
flush all data from all our IOPump instances and schedule another
iteration on the global reactor
"""
if self._pumping:
self._flush()
self._global_reactor.callLater(0.1, self._pump_once)
else:
for d in self._waiting_for_stop:
d.callback(None)
self._waiting_for_stop = []

def _flush(self):
"""
Flush all data between pending client/server pairs.
"""
old_pumps = self._pumps
new_pumps = self._pumps = set()
for p in old_pumps:
p.flush()
if p.clientIO.disconnected and p.serverIO.disconnected:
continue
new_pumps.add(p)


def create_pumper():
"""
return a new instance implementing IPumper
"""
return _Kalamazoo()


def create_memory_agent(reactor, pumper, server_protocol):
"""
return a new instance implementing `IWebSocketClientAgent`.
connection attempts will be satisfied by traversing the Upgrade
request path starting at `resource` to find a `WebSocketResource`
and then exchange data between client and server using purely
in-memory buffers.
"""
# Note, we currently don't actually do any "resource traversing"
# and basically accept any path at all to our websocket resource
if server_protocol is None:
server_protocol = WebSocketServerProtocol
return _TwistedWebMemoryAgent(reactor, pumper, server_protocol)

0 comments on commit c8c7420

Please sign in to comment.