Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adafruit_minimqtt produces non-descript error inside _wait_for_msg on Pico W #6988

Closed
sgbaird opened this issue Oct 4, 2022 · 23 comments · Fixed by #7048
Closed

adafruit_minimqtt produces non-descript error inside _wait_for_msg on Pico W #6988

sgbaird opened this issue Oct 4, 2022 · 23 comments · Fixed by #7048
Labels
Milestone

Comments

@sgbaird
Copy link

sgbaird commented Oct 4, 2022

CircuitPython version

Adafruit CircuitPython 0.8.0-beta1; Raspberry Pi Pico W

Code/REPL

"""
Based on https://gist.github.com/sammachin/b67cc4f395265bccd9b2da5972663e6d
http://www.steves-internet-guide.com/into-mqtt-python-client/
"""
import json
from secrets import secrets

import board
import digitalio
import busio
import socketpool
import wifi
import microcontroller
from binascii import hexlify
from adafruit_minimqtt.adafruit_minimqtt import MQTT

SSID, PASSWORD = secrets["ssid"], secrets["password"]

BROKER = "test.mosquitto.org"

TEXT_URL = "http://wifitest.adafruit.com/testwifi/index.html"

pool = socketpool.SocketPool(wifi.radio)

wifi.radio.connect(SSID, PASSWORD)

uid = microcontroller.Processor.uid
my_id = hexlify(b"{uid}").decode()

prefix = f"sdl-demo/picow/{my_id}/"
print(f"prefix: {prefix}")

onboard_led = digitalio.DigitalInOut(board.LED)  # only works for Pico W
onboard_led.direction = digitalio.Direction.OUTPUT

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe(prefix + "GPIO/#", qos=0)


def callback(topic, msg):
    t = topic.decode("utf-8").lstrip(prefix)
    print(t)

    if t[:5] == "GPIO/":
        p = int(t[5:])
        print(msg)
        data = json.loads(msg)
        onboard_led.value = data["value"]
        payload = {"onboard_led_value": onboard_led.value}
        print(payload)
        client.publish(prefix + "onboard_led/", payload, qos=0)


# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))


# Set up a MiniMQTT Client
client = MQTT(
    client_id=prefix,
    broker=BROKER,
    username=None,
    password=None,
    is_ssl=False,
    socket_pool=pool,
)

client.connect()

client.add_topic_callback(prefix + "GPIO/#", callback)
client.on_connect = on_connect
client.on_message = on_message
client.subscribe(prefix + "GPIO/#")

while True:
    client.check_msg()

Behavior

prefix: sdl-demo/picow/<PICO ID>/
Traceback (most recent call last):
  File "<stdin>", line 87, in <module>
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 741, in subscribe
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 876, in _wait_for_msg
MMQTTException:

For reference:
File "<stdin>", line 87, in <module> is:

client.connect()

EDIT:

client.subscribe(prefix + "GPIO/#")

https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/blob/f2cb3bbe830f05cb75cd397fe83803ee1e59946b/adafruit_minimqtt/adafruit_minimqtt.py#L741

https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/blob/f2cb3bbe830f05cb75cd397fe83803ee1e59946b/adafruit_minimqtt/adafruit_minimqtt.py#L861-L876

Description

  • adafruit_minimqtt fails for Pico W

Additional information

I've been porting my MicroPython implementation at https://github.com/sparks-baird/self-driving-lab-demo/blob/main/src/public_mqtt_sdl_demo/main.py to CircuitPython. Part of the appeal of using CircuitPython is to use ArduCam, which only has C and CircuitPython support -- no MicroPython support as far as I can tell ArduCAM/PICO_SPI_CAM#8 namato/micropython-ov2640#6. I'd be OK with another basic color camera, but had trouble finding one that looked like it would work as easily as ArduCam.

adafruit_minimqtt documentation

@sgbaird sgbaird added the bug label Oct 4, 2022
@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

Same issue with a slightly modified version of https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/blob/main/examples/minimqtt_simpletest.py:

# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT

import socketpool
import wifi
import adafruit_minimqtt.adafruit_minimqtt as MQTT

# Add a secrets.py to your filesystem that has a dictionary called secrets with "ssid" and
# "password" keys with your WiFi credentials. DO NOT share that file or commit it into Git or other
# source control.
# pylint: disable=no-name-in-module,wrong-import-order
try:
    from secrets import secrets
except ImportError:
    print("WiFi secrets are kept in secrets.py, please add them there!")
    raise

print("Connecting to %s" % secrets["ssid"])
wifi.radio.connect(secrets["ssid"], secrets["password"])
print("Connected to %s!" % secrets["ssid"])

### Topic Setup ###

# MQTT Topic
# Use this topic if you'd like to connect to a standard MQTT broker
mqtt_topic = "test/topic"

### Code ###
# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connect(mqtt_client, userdata, flags, rc):
    # This function will be called when the mqtt_client is connected
    # successfully to the broker.
    print("Connected to MQTT Broker!")
    print("Flags: {0}\n RC: {1}".format(flags, rc))


def disconnect(mqtt_client, userdata, rc):
    # This method is called when the mqtt_client disconnects
    # from the broker.
    print("Disconnected from MQTT Broker!")


def subscribe(mqtt_client, userdata, topic, granted_qos):
    # This method is called when the mqtt_client subscribes to a new feed.
    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))


def unsubscribe(mqtt_client, userdata, topic, pid):
    # This method is called when the mqtt_client unsubscribes from a feed.
    print("Unsubscribed from {0} with PID {1}".format(topic, pid))


def publish(mqtt_client, userdata, topic, pid):
    # This method is called when the mqtt_client publishes data to a feed.
    print("Published to {0} with PID {1}".format(topic, pid))


def message(client, topic, message):
    # Method called when a client's subscribed feed has a new value.
    print("New message on topic {0}: {1}".format(topic, message))


# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)

# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
    broker="test.mosquitto.org",
    username=None,
    password=None,
    socket_pool=pool,
    is_ssl=False,
)

# Connect callback handlers to mqtt_client
mqtt_client.on_connect = connect
mqtt_client.on_disconnect = disconnect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_unsubscribe = unsubscribe
mqtt_client.on_publish = publish
mqtt_client.on_message = message

print("Attempting to connect to %s" % mqtt_client.broker)
mqtt_client.connect()

print("Subscribing to %s" % mqtt_topic)
mqtt_client.subscribe(mqtt_topic)

print("Publishing to %s" % mqtt_topic)
mqtt_client.publish(mqtt_topic, "Hello Broker!")

print("Unsubscribing from %s" % mqtt_topic)
mqtt_client.unsubscribe(mqtt_topic)

print("Disconnecting from %s" % mqtt_client.broker)
mqtt_client.disconnect()
Connecting to <SSID>
Connected to <SSID>!
Attempting to connect to test.mosquitto.org
Connected to MQTT Broker!
Flags: 0
 RC: 0
Subscribing to test/topic
Traceback (most recent call last):
  File "<stdin>", line 88, in <module>
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 741, in subscribe
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 876, in _wait_for_msg
MMQTTException:

@anecdata
Copy link
Member

anecdata commented Oct 4, 2022

I don't know why the exception is blank, but it's possible it's an ETIMEOUT from losing the wifi connection (typically manifests as OSError: [Errno 2] No such file/directory), you could try to add explicit wifi reconnects before polling. I'd suggest a wifi connection function with its own exception-handling.

@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

@anecdata thanks for the quick response!

Would that be something similar to a MicroPython implementation:

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)

# Wait for connect or fail
max_wait = 30
while max_wait > 0:  # type: ignore
    if wlan.status() < 0 or wlan.status() >= 3:  # type: ignore
        break
    max_wait -= 1
    print("waiting for connection...")
    sleep(1)

# Handle connection error
if wlan.status() != 3:  # type: ignore
    raise RuntimeError("network connection failed")
else:
    print("connected")
    status = wlan.ifconfig()
    ip = status[0]  # type: ignore
    print(f"ip: {ip}")

except wrapped in a function and using:

wifi.radio.connect(SSID, PASSWORD) # is it this line or is it `pool = socketpool.SocketPool(wifi.radio)`?

inside of an on_subscribe function?

@anecdata
Copy link
Member

anecdata commented Oct 4, 2022

Well, the paradigm is a little different, and the implementation is early. There is no equivalent to MicroPython's wlan.status. At the moment, you would try wifi.radio.connect(SSID, PASSWORD) blindly, there is no harm trying to connect if already connected, it becomes a NOP internally.

In the CircuitPython espressif port, the presence of an IP address signifies connection to the AP, but I believe the uP/CYW43 driver has issues with that micropython#9455 (comment)

addendum: (I'm really not sure how pedantically CircuitPython raspberrypi API is going to try to mirror the espressif API. It should be close, but there may be limitations due to the driver.)

@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

Oof, I made a mistake 🤦‍♂️. It's erroring out during client.subscribe(...) not during client.connect(...). I tried adding wifi.radio.connect(SSID, PASSWORD) to all the functions (subscribe, connect, etc.) but it gave the same error.

@dhalbert
Copy link
Collaborator

dhalbert commented Oct 4, 2022

There is no equivalent to MicroPython's wlan.status.

Would this be helpful in writing network libraries or user code? If it's possible, maybe we should consider adding it.

@anecdata
Copy link
Member

anecdata commented Oct 4, 2022

@dhalbert It would be helpful for testing for now at least. Also some equivalent of uP wlan.isconnected() perhaps. My vote would be ideally / ultimately to mirror espresif and manage the existence of IP address to signify valid connection, but the cyw43 driver may not make that easy. Or alter espressif in some way so these two APIs match. But it may not be practical, I really don't know, but it seems like a decision that should be made explicitly one way or the other as this port proceeds.

@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

I redefined _wait_for_msg in adafruit_minimqtt.py as:

    def _wait_for_msg(self, timeout=0.1):
        """Reads and processes network events."""
        # CPython socket module contains a timeout attribute
        if hasattr(self._socket_pool, "timeout"):
            print("self._socket_pool.timeout exists")
            try:
                res = self._sock_exact_recv(1)
            except self._socket_pool.timeout:
                return None
        else:  # socketpool, esp32spi
            print("self._socket_pool.timeout does not exist")
            try:
                res = self._sock_exact_recv(1)
            except OSError as error:
                if error.errno in (errno.ETIMEDOUT, errno.EAGAIN):
                    # raised by a socket timeout if 0 bytes were present
                    return None
                print(error)
                raise MMQTTException from error

and ran minimqtt_simpletest.py:

Connecting to <SSID>
Connected to <SSID>!
Attempting to connect to test.mosquitto.org
self._socket_pool.timeout does not exist
Connected to MQTT Broker!
Flags: 0
 RC: 0
Subscribing to test/topic
self._socket_pool.timeout does not exist
-1

In other words, the error is OSError: -1

@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

Turns out it was an issue with the timeout during _wait_for_msg. I increased the default timeout in _wait_for_msg from 0.1 to 1.0, reverted the other changes I made (print statements, etc.) and got the following:

Connecting to <SSID>
Connected to <SSID>!
Attempting to connect to test.mosquitto.org
Connected to MQTT Broker!
Flags: 0
 RC: 0
Subscribing to test/topic
Subscribed to test/topic with QOS level 0
Publishing to test/topic
Published to test/topic with PID 1
Unsubscribing from test/topic
New message on topic test/topic: moshi moshi
New message on topic test/topic: Hello Broker!
Unsubscribed from test/topic with PID 2
Disconnecting from test.mosquitto.org
Disconnected from MQTT Broker!

Also, loving the "moshi moshi"

@anecdata
Copy link
Member

anecdata commented Oct 4, 2022

Looking less like the wifi connection is dropping. Since Pico W is alpha, do you have an ESP32-S2 to try this code on for comparison?

@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

@anecdata, unfortunately I don't own an ESP32-S2

@anecdata
Copy link
Member

anecdata commented Oct 4, 2022

_wait_for_msg() can take a timeout, but it looks like none of the code that calls it uses that kwarg, I think tyoically there would be a calll to .loop() in user code, and that takes a timeout, but I don't see it getting passed down. Just from a cursory look. Maybe this issue should be moved to the library?

@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

_wait_for_msg() can take a timeout, but it looks like none of the code that calls it uses that kwarg

At first glance, timeout is present but looks inaccessible to me:
https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/blob/f2cb3bbe830f05cb75cd397fe83803ee1e59946b/adafruit_minimqtt/adafruit_minimqtt.py#L879

Maybe the call to _wait_for_msg during connect uses the timeout value? I recognize I might be way off here.

Changing timeout to a low value like 0.01 (or the default, 0.1) reproducibly gave the error. Higher values reproducibly removed the error for the simple test. Interestingly, for 0.5, it got part-way through giving the moshi moshi but then failing before the Hello Broker!.

However, when I move away from the simple example and towards a script that listens in an ongoing sense for new messages, I'm getting that same error despite larger timeout values.

while True:
    client.loop()
    heartbeat(False)
    sign_of_life(False)

If I use a try:except:

while True:
    try:
        client.loop()
    except Exception as e:
        pass
    heartbeat(False)
    sign_of_life(False)

Then it loops without error-ing, but when the the companion host code is publishing messages (which works well in tandem with my MicroPython implementation), it's not being received by the CircuitPython implementation.

Also, I tried digging a bit more and found:

error.errno:  -1
errno.ETIMEDOUT:  116
errno.EAGAIN:  11

@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

That last part is in the context of these lines:

                if error.errno in (errno.ETIMEDOUT, errno.EAGAIN):
                    # raised by a socket timeout if 0 bytes were present
                    return None
                raise MMQTTException from error

@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

I'm hitting my limit here for now. I'm having trouble tracking down why it's giving OSError: -1 and what that means. If you think this should get transferred to minimqtt repo, I'm fine with that.

I've gotten as far as:

        if not self._backwards_compatible_sock:
            # CPython/Socketpool Impl.
            rc = bytearray(bufsize)
            self._sock.recv_into(rc, bufsize) # OSError originating from here (probably)

@calcut
Copy link

calcut commented Oct 4, 2022

yeah, this seems related to #101
Sounds like you are seeing this consistently every time on the Pico-W(?)

I don't typically get OSError at the same point you are seeing it (during subscribe)
I do get OSError occasionally at other times, though I think in my case that is due to a connectivity error generally, not MQTT specific.

I'm also running a watchdog timer to reset the system if it hangs. Which happens too often for my liking.
Particularly in _wait_for_msg and _sock_exact_recv. But I guess that is another issue!

I've been using ESP32S2, and getting best results using timeout=0
But I had to modify the loop() function so timeout=0 is actually passed to _wait_for_msg
See my PR adafruit/Adafruit_CircuitPython_MiniMQTT#122

I realise Circuitpython isn't targeting 100% uptime applications... but I've had so many stability problems that I'm reluctantly going to give this product a go for handling the networking.
https://blues.io/products/wifi-notecard/
That said, it would be awesome if we could get CP to the point where this wasn't needed

@sgbaird
Copy link
Author

sgbaird commented Oct 4, 2022

yeah, this seems related to #101 Sounds like you are seeing this consistently every time on the Pico-W(?)

I've been seeing it consistently on the Pico-W when I'm using MQTT with a _wait_for_msg timeout of 0.1 or less for a single subscribe+publish test example. However, regardless of whatever timeout I set, when I try to use while True: loop() or some variant of try-except or sleep(...), I get the OSError (which seems to be detrimental to MQTT properly receiving messages when I ignore it via try-except).

I don't typically get OSError at the same point you are seeing it (during subscribe) I do get OSError occasionally at other times, though I think in my case that is due to a connectivity error generally, not MQTT specific.

I'm also running a watchdog timer to reset the system if it hangs. Which happens too often for my liking. Particularly in _wait_for_msg and _sock_exact_recv. But I guess that is another issue!

I've been using ESP32S2, and getting best results using timeout=0 But I had to modify the loop() function so timeout=0 is actually passed to _wait_for_msg See my PR adafruit/Adafruit_CircuitPython_MiniMQTT#122

Maybe I'll give timeout=0 a try. When you do this, you still use sleep in the while loop, correct?

EDIT: timeout=0 hardcoded into _wait_for_msg causes OSError: -1 during client.connect(), so looks like I'd need to use your PR to verify.

EDIT: I tried with your PR, which allowed the first client.connect() and client.subscribe() to work OK, but the following is still producing the non-descript MMQTTException (which I'm guessing is the same OSError: -1 that is getting hidden, since that seems to be the recurring underlying error).

while True:
    sleep(2.0)
    client.loop(timeout=0.0)
    heartbeat(False)
    sign_of_life(False)

I realise Circuitpython isn't targeting 100% uptime applications... but I've had so many stability problems that I'm reluctantly going to give this product a go for handling the networking. blues.io/products/wifi-notecard That said, it would be awesome if we could get CP to the point where this wasn't needed

On a side note, thank you for mentioning this! I had been looking around for something like it but was having trouble finding something reasonably priced. A project/tutorial I'm developing would ideally be implemented in classrooms by instructors or as 100% uptime machines in university settings, but the Pico W doesn't support Enterprise-WPA authentication which is typical of many universities (e.g. Eduroam). Agreed about getting CP (and MP) to the point where it can handle the otherwise (usually) unavoidable stability issues.

@calcut
Copy link

calcut commented Oct 5, 2022

Maybe I'll give timeout=0 a try. When you do this, you still use sleep in the while loop, correct?

Sleep should be fine. Although I actually use the pattern

if time.monotonic() -timestamp > interval:
    timestamp = time.monotonic()
    loop()

(so it doesn't block other code from running)

@dhalbert dhalbert added this to the 8.0.0 milestone Oct 5, 2022
@georgboe
Copy link

It seems to me like errno should be returned here instead of ret when something goes wrong:
https://github.com/adafruit/circuitpython/blob/main/ports/raspberrypi/common-hal/socketpool/Socket.c#L1057-L1082

That way we would get MP_EAGAIN and MP_ETIMEDOUT instead of -1.


A workaround for now seems to be to add -1 to the ignored errors in _wait_for_msg() in adafruit_mqtt.py as such:

try:
    res = self._sock_exact_recv(1)
except OSError as error:
    if error.errno in (errno.ETIMEDOUT, errno.EAGAIN, -1):
        # raised by a socket timeout if 0 bytes were present
        return None
    raise MMQTTException from error

@rdagger
Copy link

rdagger commented Oct 24, 2022

I'm getting the same problem on an QT PY ESP32-S2, after upgrading from CircuitPython 7.3.3 to 8.0.0-beta.3.
My program subscribes to a topic on Adafruit IO. It's been very reliable in CircuitPython 7 but is unusable in 8.
I tried hard coding a 1 second timeout in the _wait_for_msg() method, and it fixed the MMQTTException during subscribing. However, the loop() method then fails with:

  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 864, in loop
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 894, in _wait_for_msg
�]0;�10.0.7.41 | 894@/lib/adafruit_minimqtt/adafruit_ MMQTTException | 8.0.0-beta.3�\

I tried hard coding longer timeouts and zero timeout in the loop() method but I can't get it to work.

@georgboe
Copy link

@rdagger Can you please try again with latest nightly build?

@rdagger
Copy link

rdagger commented Oct 26, 2022

@georgboe Looks like it works now, thanks!
Using 8.0.0-beta.3-15-g0bc986ea7
I reverted back to the unmodified version of adafruit_minimqtt.mpy and it still works without having to modify the timeouts.
I'll let it run for a couple of days to test the stability.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants