Skip to content

Commit

Permalink
expand websocket auto-pingpong (#1520)
Browse files Browse the repository at this point in the history
* expand websocket auto-pingpong

* add autoPingRestartOnAnyTraffic
  • Loading branch information
oberstet committed Feb 13, 2022
1 parent 5716575 commit 483337d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 20 deletions.
2 changes: 1 addition & 1 deletion autobahn/asyncio/wamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def accept(response):
tcpNoDelay=True,
autoPingInterval=10.,
autoPingTimeout=5.,
autoPingSize=4,
autoPingSize=12,
perMessageCompressionOffers=offers,
perMessageCompressionAccept=accept)
# SSL context for client connection
Expand Down
2 changes: 1 addition & 1 deletion autobahn/twisted/wamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def accept(response):
tcpNoDelay=True,
autoPingInterval=10.,
autoPingTimeout=5.,
autoPingSize=4,
autoPingSize=12,
perMessageCompressionOffers=offers,
perMessageCompressionAccept=accept)

Expand Down
4 changes: 2 additions & 2 deletions autobahn/websocket/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def setProtocolOptions(self,
peer does not respond in time, drop the connection. Set to `0` to disable. (default: `0`).
:type autoPingTimeout: float or None
:param autoPingSize: Payload size for automatic pings/pongs. Must be an integer from `[4, 125]`. (default: `4`).
:param autoPingSize: Payload size for automatic pings/pongs. Must be an integer from `[12, 125]`. (default: `12`).
:type autoPingSize: int or None
:param serveFlashSocketPolicy: Serve the Flash Socket Policy when we receive a policy file request on this protocol. (default: `False`).
Expand Down Expand Up @@ -395,7 +395,7 @@ def setProtocolOptions(self,
peer does not respond in time, drop the connection. Set to `0` to disable. (default: `0`).
:type autoPingTimeout: float or None
:param autoPingSize: Payload size for automatic pings/pongs. Must be an integer from `[4, 125]`. (default: `4`).
:param autoPingSize: Payload size for automatic pings/pongs. Must be an integer from `[12, 125]`. (default: `12`).
:type autoPingSize: int
"""

Expand Down
81 changes: 67 additions & 14 deletions autobahn/websocket/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import pickle
import copy
import json
import time

from pprint import pformat
from collections import deque
Expand All @@ -47,7 +48,7 @@
from autobahn.websocket.types import ConnectionRequest, ConnectionResponse, ConnectionDeny
from autobahn.websocket.types import ConnectingRequest

from autobahn.util import Stopwatch, newid, wildcards2patterns, encode_truncate
from autobahn.util import Stopwatch, wildcards2patterns, encode_truncate
from autobahn.util import _LazyHexFormatter
from autobahn.util import ObservableMixin
from autobahn.websocket.utf8validator import Utf8Validator
Expand Down Expand Up @@ -511,7 +512,8 @@ class WebSocketProtocol(ObservableMixin):
'tcpNoDelay',
'autoPingInterval',
'autoPingTimeout',
'autoPingSize']
'autoPingSize',
'autoPingRestartOnAnyTraffic']
"""
Configuration attributes common to servers and clients.
"""
Expand Down Expand Up @@ -836,6 +838,21 @@ def onCloseHandshakeTimeout(self):
else:
self.log.debug('skipping closing handshake timeout: WebSocket connection is already closed')

def onAutoPong(self, ping_sent, ping_seq, pong_received, pong_rtt, payload):
"""
When doing automatic ping/pongs, this is called upon a successful pong.
:param ping_sent: Posix time in ns when ping was sent.
:param ping_seq: Sequence number of ping that was sent.
:param pong_received: Posix time in ns when pong was received.
:param pong_rtt: Pong roundtrip-time in ms measured.
:param payload: The complete WebSocket ping/pong message payload
(ping_sent 8 bytes big-endian | ping_seq 4 bytes big endian | max. 113 optional random bytes).
"""
self.log.info(
"Auto ping/pong: received pending pong (size={size}) for auto-ping (sent={sent}, seq={seq}, received={}) in RTT of {rtt} ms",
size=len(payload), sent=ping_sent, seq=ping_seq, received=pong_received, rtt=pong_rtt)

def onAutoPingTimeout(self):
"""
When doing automatic ping/pongs to detect broken connection, the peer
Expand Down Expand Up @@ -1068,6 +1085,8 @@ def _connectionMade(self):
self.autoPingTimeoutCall = None
self.autoPingPending = None
self.autoPingPendingCall = None
self.autoPingPendingSeq = 0
self.autoPingPendingSent = None

# set opening handshake timeout handler
if self.openHandshakeTimeout > 0:
Expand Down Expand Up @@ -1681,7 +1700,10 @@ def onFrameEnd(self):

self._onMessageFrameEnd()

if self.autoPingTimeoutCall:
if self.autoPingTimeoutCall and self.autoPingRestartOnAnyTraffic:
# cancel a pending ping timeout already by having received a data frame
# note that this is slightly wrong, but see _cancelAutoPingTimeoutCall and:
# https://github.com/crossbario/autobahn-python/issues/1327
self._cancelAutoPingTimeoutCall()

if self.current_frame.fin:
Expand Down Expand Up @@ -1742,12 +1764,22 @@ def processControlFrame(self):
if self.autoPingPending:
try:
if payload == self.autoPingPending:
self.log.debug("Auto ping/pong: received pending pong for auto-ping/pong")
# self.autoPingPendingSent
ping_sent = struct.unpack('>Q', payload[:8])[0]

# self.autoPingPendingSeq
ping_seq = struct.unpack('>L', payload[8:12])[0]

pong_received = time.time_ns()
pong_rtt = int((pong_received - ping_sent) / 10**6)

self.onAutoPong(ping_sent, ping_seq, pong_received, pong_rtt, payload)

if self.autoPingTimeoutCall:
self.autoPingTimeoutCall.cancel()

self.autoPingPending = None
self.autoPingPendingSent = None
self.autoPingTimeoutCall = None

if self.autoPingInterval:
Expand All @@ -1756,9 +1788,9 @@ def processControlFrame(self):
self._sendAutoPing,
)
else:
self.log.debug("Auto ping/pong: received non-pending pong")
self.log.warn("Auto ping/pong: received non-pending pong")
except:
self.log.debug("Auto ping/pong: received non-pending pong")
self.log.warn("Auto ping/pong: received non-pending pong")

# fire app-level callback
#
Expand Down Expand Up @@ -1886,14 +1918,19 @@ def _sendAutoPing(self):

self.autoPingPendingCall = None

self.autoPingPending = newid(self.autoPingSize).encode('utf8')
self.autoPingPendingSent = time.time_ns()
self.autoPingPendingSeq += 1
self.autoPingPending = b''.join([struct.pack('>Q', self.autoPingPendingSent),
struct.pack('>L', self.autoPingPendingSeq),
os.urandom(self.autoPingSize - 12)])

self.sendPing(self.autoPingPending)

if self.autoPingTimeout:
self.log.debug(
"Expecting ping in {seconds} seconds for auto-ping/pong",
"Expecting pong in {seconds} seconds for auto-ping ({size} bytes)",
seconds=self.autoPingTimeout,
size=len(self.autoPingPending),
)
self.autoPingTimeoutCall = self.factory._batched_timer.call_later(
self.autoPingTimeout,
Expand All @@ -1903,7 +1940,7 @@ def _sendAutoPing(self):
def _cancelAutoPingTimeoutCall(self):
"""
When data is received from client, use it in leu of timely PONG response - cancel pending timeout call
that will drop connection
that will drop connection. See https://github.com/crossbario/autobahn-python/issues/1327
"""
self.log.debug("Cancelling autoPingTimeoutCall due to incoming data")
self.autoPingTimeoutCall.cancel()
Expand Down Expand Up @@ -3222,7 +3259,10 @@ def resetProtocolOptions(self):
#
self.autoPingInterval = 0
self.autoPingTimeout = 0
self.autoPingSize = 4
self.autoPingSize = 12

# see: https://github.com/crossbario/autobahn-python/issues/1327 and _cancelAutoPingTimeoutCall
self.autoPingRestartOnAnyTraffic = True

# check WebSocket origin against this list
self.allowedOrigins = ["*"]
Expand Down Expand Up @@ -3254,6 +3294,7 @@ def setProtocolOptions(self,
autoPingInterval=None,
autoPingTimeout=None,
autoPingSize=None,
autoPingRestartOnAnyTraffic=None,
serveFlashSocketPolicy=None,
flashSocketPolicy=None,
allowedOrigins=None,
Expand Down Expand Up @@ -3320,9 +3361,13 @@ def setProtocolOptions(self,

if autoPingSize is not None and autoPingSize != self.autoPingSize:
assert(type(autoPingSize) == float or type(autoPingSize) == int)
assert(4 <= autoPingSize <= 125)
assert(12 <= autoPingSize <= 125)
self.autoPingSize = autoPingSize

if autoPingRestartOnAnyTraffic is not None and autoPingRestartOnAnyTraffic != self.autoPingRestartOnAnyTraffic:
assert(type(autoPingRestartOnAnyTraffic) == bool)
self.autoPingRestartOnAnyTraffic = autoPingRestartOnAnyTraffic

if serveFlashSocketPolicy is not None and serveFlashSocketPolicy != self.serveFlashSocketPolicy:
self.serveFlashSocketPolicy = serveFlashSocketPolicy

Expand Down Expand Up @@ -3971,7 +4016,10 @@ def resetProtocolOptions(self):
#
self.autoPingInterval = 0
self.autoPingTimeout = 0
self.autoPingSize = 4
self.autoPingSize = 12

# see: https://github.com/crossbario/autobahn-python/issues/1327 and _cancelAutoPingTimeoutCall
self.autoPingRestartOnAnyTraffic = True

def setProtocolOptions(self,
version=None,
Expand All @@ -3992,7 +4040,8 @@ def setProtocolOptions(self,
perMessageCompressionAccept=None,
autoPingInterval=None,
autoPingTimeout=None,
autoPingSize=None):
autoPingSize=None,
autoPingRestartOnAnyTraffic=None):
"""
Implements :func:`autobahn.websocket.interfaces.IWebSocketClientChannelFactory.setProtocolOptions`
"""
Expand Down Expand Up @@ -4061,5 +4110,9 @@ def setProtocolOptions(self,

if autoPingSize is not None and autoPingSize != self.autoPingSize:
assert(type(autoPingSize) == float or type(autoPingSize) == int)
assert(4 <= autoPingSize <= 125)
assert(12 <= autoPingSize <= 125)
self.autoPingSize = autoPingSize

if autoPingRestartOnAnyTraffic is not None and autoPingRestartOnAnyTraffic != self.autoPingRestartOnAnyTraffic:
assert(type(autoPingRestartOnAnyTraffic) == bool)
self.autoPingRestartOnAnyTraffic = autoPingRestartOnAnyTraffic
6 changes: 4 additions & 2 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
Changelog
=========

master
------
22.2.1.dev1
-----------

* new: add auto-ping/pong configuration knob ``autoPingRestartOnAnyTraffic`` (see discussion `here <https://github.com/crossbario/autobahn-python/issues/1327>`_).
* new: extended websocket auto-ping/pong ("heartbeating") with builtin RTT measurement
* new: experimental support for ``transaction_hash`` in WAMP Publish/Call (see discussion `here <https://github.com/wamp-proto/wamp-proto/issues/391#issuecomment-998577967>`_).
* new: support decimal numbers WAMP serialization and round-tripping in both JSON and CBOR
* fix: only depend on cbor2 (for WAMP CBOR serialization), not also cbor
Expand Down

0 comments on commit 483337d

Please sign in to comment.