Skip to content

Commit

Permalink
Support client disconnects with multiple servers (Fixes miguelgrinber…
Browse files Browse the repository at this point in the history
  • Loading branch information
KoraLinSar authored and miguelgrinberg committed Aug 27, 2022
1 parent 65fb9d0 commit ebefd23
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 6 deletions.
3 changes: 3 additions & 0 deletions socketio/asyncio_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

class AsyncManager(BaseManager):
"""Manage a client list for an asyncio server."""
async def can_disconnect(self, sid, namespace):
return self.is_connected(sid, namespace)

async def emit(self, event, data, namespace, room=None, skip_sid=None,
callback=None, **kwargs):
"""Emit a message to a single client, a room, or all the clients
Expand Down
22 changes: 22 additions & 0 deletions socketio/asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id})

async def can_disconnect(self, sid, namespace):
await self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})

async def disconnect(self, sid, namespace=None):
"""Disconnect a client."""
# this is a bit weird, the can_disconnect call on pubsub managers just
# issues a disconnect request to the message queue and returns None,
# indicating that the client cannot disconnect immediately. The
# server(s) listening on the queue will get this request and carry out
# the disconnect appropriately.
await self.can_disconnect(sid, namespace)

async def close_room(self, room, namespace=None):
await self._publish({'method': 'close_room', 'room': room,
'namespace': namespace or '/'})
Expand Down Expand Up @@ -128,6 +141,11 @@ async def _return_callback(self, host_id, sid, namespace, callback_id,
'sid': sid, 'namespace': namespace,
'id': callback_id, 'args': args})

async def _handle_disconnect(self, message):
await self.server.disconnect(sid=message.get('sid'),
namespace=message.get('namespace'),
ignore_queue=True)

async def _handle_close_room(self, message):
await super().close_room(
room=message.get('room'), namespace=message.get('namespace'))
Expand Down Expand Up @@ -155,9 +173,13 @@ async def _thread(self):
except:
pass
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
if data['method'] == 'emit':
await self._handle_emit(data)
elif data['method'] == 'callback':
await self._handle_callback(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)
10 changes: 8 additions & 2 deletions socketio/asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,23 @@ async def __aexit__(self, *args):

return _session_context_manager(self, sid, namespace)

async def disconnect(self, sid, namespace=None):
async def disconnect(self, sid, namespace=None, ignore_queue=False):
"""Disconnect a client.
:param sid: Session ID of the client.
:param namespace: The Socket.IO namespace to disconnect. If this
argument is omitted the default namespace is used.
:param ignore_queue: Only used when a message queue is configured. If
set to ``True``, the disconnect is processed
locally, without broadcasting on the queue. It is
recommended to always leave this parameter with
its default value of ``False``.
Note: this method is a coroutine.
"""
namespace = namespace or '/'
if self.manager.is_connected(sid, namespace=namespace):
if (ignore_queue and self.manager.is_connected(sid, namespace)) or \
await self.manager.can_disconnect(sid, namespace):
self.logger.info('Disconnecting %s [%s]', sid, namespace)
self.manager.pre_disconnect(sid, namespace=namespace)
await self._send_packet(sid, packet.Packet(packet.DISCONNECT,
Expand Down
3 changes: 3 additions & 0 deletions socketio/base_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def is_connected(self, sid, namespace):
except KeyError:
pass

def can_disconnect(self, sid, namespace):
return self.is_connected(sid, namespace)

def pre_disconnect(self, sid, namespace):
"""Put the client in the to-be-disconnected list.
Expand Down
22 changes: 22 additions & 0 deletions socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None,
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id})

def can_disconnect(self, sid, namespace):
self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})

def disconnect(self, sid, namespace=None):
"""Disconnect a client."""
# this is a bit weird, the can_disconnect call on pubsub managers just
# issues a disconnect request to the message queue and returns None,
# indicating that the client cannot disconnect immediately. The
# server(s) listening on the queue will get this request and carry out
# the disconnect appropriately.
self.can_disconnect(sid, namespace)

def close_room(self, room, namespace=None):
self._publish({'method': 'close_room', 'room': room,
'namespace': namespace or '/'})
Expand Down Expand Up @@ -125,6 +138,11 @@ def _return_callback(self, host_id, sid, namespace, callback_id, *args):
'sid': sid, 'namespace': namespace, 'id': callback_id,
'args': args})

def _handle_disconnect(self, message):
self.server.disconnect(sid=message.get('sid'),
namespace=message.get('namespace'),
ignore_queue=True)

def _handle_close_room(self, message):
super(PubSubManager, self).close_room(
room=message.get('room'), namespace=message.get('namespace'))
Expand All @@ -146,9 +164,13 @@ def _thread(self):
except:
pass
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
if data['method'] == 'emit':
self._handle_emit(data)
elif data['method'] == 'callback':
self._handle_callback(data)
elif data['method'] == 'disconnect':
self._handle_disconnect(data)
elif data['method'] == 'close_room':
self._handle_close_room(data)
10 changes: 8 additions & 2 deletions socketio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,21 @@ def __exit__(self, *args):

return _session_context_manager(self, sid, namespace)

def disconnect(self, sid, namespace=None):
def disconnect(self, sid, namespace=None, ignore_queue=False):
"""Disconnect a client.
:param sid: Session ID of the client.
:param namespace: The Socket.IO namespace to disconnect. If this
argument is omitted the default namespace is used.
:param ignore_queue: Only used when a message queue is configured. If
set to ``True``, the disconnect is processed
locally, without broadcasting on the queue. It is
recommended to always leave this parameter with
its default value of ``False``.
"""
namespace = namespace or '/'
if self.manager.is_connected(sid, namespace=namespace):
if (ignore_queue and self.manager.is_connected(sid, namespace)) or \
self.manager.can_disconnect(sid, namespace):
self.logger.info('Disconnecting %s [%s]', sid, namespace)
self.manager.pre_disconnect(sid, namespace=namespace)
self._send_packet(sid, packet.Packet(packet.DISCONNECT,
Expand Down
18 changes: 17 additions & 1 deletion tests/asyncio/test_asyncio_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class TestAsyncPubSubManager(unittest.TestCase):
def setUp(self):
mock_server = mock.MagicMock()
mock_server._emit_internal = AsyncMock()
mock_server.disconnect = AsyncMock()
self.pm = asyncio_pubsub_manager.AsyncPubSubManager()
self.pm._publish = AsyncMock()
self.pm.set_server(mock_server)
Expand Down Expand Up @@ -115,6 +116,11 @@ def test_emit_with_ignore_queue(self):
self.pm.server._emit_internal.mock.assert_called_once_with(
'123', 'foo', 'bar', '/', None)

def test_disconnect(self):
_run(self.pm.disconnect('123', '/foo'))
self.pm._publish.mock.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})

def test_close_room(self):
_run(self.pm.close_room('foo'))
self.pm._publish.mock.assert_called_once_with(
Expand Down Expand Up @@ -142,7 +148,7 @@ def test_handle_emit_with_namespace(self):
self.pm, 'foo', 'bar', namespace='/baz', room=None,
skip_sid=None, callback=None)

def test_handle_emiti_with_room(self):
def test_handle_emit_with_room(self):
with mock.patch.object(asyncio_manager.AsyncManager, 'emit',
new=AsyncMock()) as super_emit:
_run(self.pm._handle_emit({'event': 'foo', 'data': 'bar',
Expand Down Expand Up @@ -216,6 +222,12 @@ def test_handle_callback_missing_args(self):
'host_id': host_id}))
self.assertEqual(trigger.mock.call_count, 0)

def test_handle_disconnect(self):
_run(self.pm._handle_disconnect({'method': 'disconnect', 'sid': '123',
'namespace': '/foo'}))
self.pm.server.disconnect.mock.assert_called_once_with(
sid='123', namespace='/foo', ignore_queue=True)

def test_handle_close_room(self):
with mock.patch.object(asyncio_manager.AsyncManager, 'close_room',
new=AsyncMock()) as super_close_room:
Expand All @@ -236,13 +248,15 @@ def test_handle_close_room_with_namespace(self):
def test_background_thread(self):
self.pm._handle_emit = AsyncMock()
self.pm._handle_callback = AsyncMock()
self.pm._handle_disconnect = AsyncMock()
self.pm._handle_close_room = AsyncMock()

def messages():
import pickle
yield {'method': 'emit', 'value': 'foo'}
yield {'missing': 'method'}
yield '{"method": "callback", "value": "bar"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
yield {'method': 'bogus'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
yield 'bad json'
Expand All @@ -258,5 +272,7 @@ def messages():
{'method': 'emit', 'value': 'foo'})
self.pm._handle_callback.mock.assert_called_once_with(
{'method': 'callback', 'value': 'bar'})
self.pm._handle_disconnect.mock.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
self.pm._handle_close_room.mock.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'})
1 change: 1 addition & 0 deletions tests/asyncio/test_asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def tearDown(self):

def _get_mock_manager(self):
mgr = mock.MagicMock()
mgr.can_disconnect = AsyncMock()
mgr.emit = AsyncMock()
mgr.close_room = AsyncMock()
mgr.trigger_callback = AsyncMock()
Expand Down
18 changes: 17 additions & 1 deletion tests/common/test_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def test_emit_with_ignore_queue(self):
self.pm.server._emit_internal.assert_called_once_with('123', 'foo',
'bar', '/', None)

def test_disconnect(self):
self.pm.disconnect('123', '/foo')
self.pm._publish.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})

def test_close_room(self):
self.pm.close_room('foo')
self.pm._publish.assert_called_once_with(
Expand All @@ -137,7 +142,7 @@ def test_handle_emit_with_namespace(self):
room=None, skip_sid=None,
callback=None)

def test_handle_emiti_with_room(self):
def test_handle_emit_with_room(self):
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
'room': 'baz'})
Expand Down Expand Up @@ -204,6 +209,13 @@ def test_handle_callback_missing_args(self):
'host_id': host_id})
self.assertEqual(trigger.call_count, 0)

def test_handle_disconnect(self):
self.pm._handle_disconnect({'method': 'disconnect', 'sid': '123',
'namespace': '/foo'})
self.pm.server.disconnect.assert_called_once_with(sid='123',
namespace='/foo',
ignore_queue=True)

def test_handle_close_room(self):
with mock.patch.object(base_manager.BaseManager, 'close_room') \
as super_close_room:
Expand All @@ -223,13 +235,15 @@ def test_handle_close_room_with_namespace(self):
def test_background_thread(self):
self.pm._handle_emit = mock.MagicMock()
self.pm._handle_callback = mock.MagicMock()
self.pm._handle_disconnect = mock.MagicMock()
self.pm._handle_close_room = mock.MagicMock()

def messages():
import pickle
yield {'method': 'emit', 'value': 'foo'}
yield {'missing': 'method'}
yield '{"method": "callback", "value": "bar"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
yield {'method': 'bogus'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
yield 'bad json'
Expand All @@ -245,5 +259,7 @@ def messages():
{'method': 'emit', 'value': 'foo'})
self.pm._handle_callback.assert_called_once_with(
{'method': 'callback', 'value': 'bar'})
self.pm._handle_disconnect.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
self.pm._handle_close_room.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'})

0 comments on commit ebefd23

Please sign in to comment.