Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust MQTT bridge to multi-node/clustering changes #2048

Open
jsanyi opened this issue Sep 8, 2022 · 4 comments
Open

Adjust MQTT bridge to multi-node/clustering changes #2048

jsanyi opened this issue Sep 8, 2022 · 4 comments

Comments

@jsanyi
Copy link

jsanyi commented Sep 8, 2022

I'm running 22.6.1 from a pip install in venv, and cannot get mqtt transport working, see crash log below.

=====

2022-09-08T10:06:50+0200 [Router      17775] Unhandled Error
Traceback (most recent call last):
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/log.py", line 80, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/context.py", line 117, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/context.py", line 82, in callWithContext
    return func(*args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/posixbase.py", line 683, in _doReadOrWrite
    why = selectable.doRead()
--- <exception caught here> ---
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/tcp.py", line 1410, in doRead
    protocol.makeConnection(transport)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/protocol.py", line 509, in makeConnection
    self.connectionMade()
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/bridge/mqtt/wamp.py", line 222, in connectionMade
    self._when_ready()
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/bridge/mqtt/wamp.py", line 241, in _when_ready
    self._wamp_session.onOpen(self._wamp_transport)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/router/session.py", line 421, in onOpen
    assert isinstance(transport,
builtins.AssertionError: unexpected router transport type <class 'crossbar.bridge.mqtt.wamp.WampTransport'>

2022-09-08T10:06:50+0200 [Router      17775] WampMQTTServerProtocol.process_connect(packet=Connect(client_id='mqtt-explorer-a16589b3', flags=ConnectFlags(username=False, password=False, will=False, will_retain=False, will_qos=0, clean_session=True, reserved=False), keep_alive=60, will_topic=None, will_message=None, username=None, password=None))
2022-09-08T10:06:50+0200 [Router      17775] Error handling a Connect, dropping connection
2022-09-08T10:06:50+0200 [Router      17775] Unhandled Error
Traceback (most recent call last):
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/worker/main.py", line 351, in _run_command_exec_worker
    reactor.run()
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/base.py", line 1318, in run
    self.mainLoop()
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/base.py", line 1331, in mainLoop
    reactorBaseSelf.doIteration(t)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/epollreactor.py", line 244, in doPoll
    log.callWithLogger(selectable, _drdw, selectable, fd, event)
--- <exception caught here> ---
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/log.py", line 96, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/log.py", line 80, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/context.py", line 117, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/context.py", line 82, in callWithContext
    return func(*args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/posixbase.py", line 696, in _doReadOrWrite
    self._disconnectSelectable(selectable, why, inRead)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/posixbase.py", line 300, in _disconnectSelectable
    selectable.connectionLost(f)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/tcp.py", line 326, in connectionLost
    protocol.connectionLost(reason)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/bridge/mqtt/wamp.py", line 227, in connectionLost
    self._wamp_session.onMessage(msg)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/router/session.py", line 457, in onMessage
    if not self._pending_session_id:
builtins.AttributeError: 'RouterSession' object has no attribute '_pending_session_id'

=====

This is my transport configuration:

                {
                    "id": "mqtt001",
                    "type": "mqtt",
                    "endpoint": {
                        "type": "tcp",
                        "port": 1883
                    },
                    "options": {
                        "realm": "realm1",
                        "role": "anonymous",
                        "payload_mapping": {
                            "": {
                                "type": "native",
                                "serializer": "json"
                            }
                        },
                        "auth": {
                            "anonymous": {
                                "type": "static",
                                "role": "anonymous"
                            }
                        }
                    }
                }
@oberstet
Copy link
Contributor

oberstet commented Sep 9, 2022

ah, right, looks like WampMQTTServerProtocol needs some adjustment to the recent multi-node / clustering (proxy workers, rlinks) changes in crossbar. nothing big, but someone would need to dig into.

@oberstet oberstet changed the title MQTT crash after connect Adjust MQTT bridge to multi-node/clustering changes Sep 9, 2022
@richard-pianka
Copy link

has there been any progress on this issue?

@richard-pianka
Copy link

apologies for the double comment – this is blocking us from upgrading to newer versions of crossbar. we're willing to pay to have this issue resolved if someone with more expertise is willing to fix it.

/cc @oberstet

@oberstet
Copy link
Contributor

@richard-pianka Hi Richard, sorry for not reacting before, but I'm busy these days with mundane things like making money;) However, as you mention you would pay for fixing this, so I'd be happy to resolve it. Please get in touch with me via email "tobias dot oberstein at gmail dot com".

As mentioned before, I don't expect this to be complex but tricky to fix the right location in the right way. I know the code base inside out, and for fixing the issue, I'd also create proper, automated test cases using your MQTT client library (or any standard one that fits the bill, like we already use in crossbar-examples) and actually running Crossbar.io in multi-process/node clustered configuration (multiple proxy and router worker) - the whole reason for the original change that lead to this regression ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants