Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional timeout to subscribe() #631

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions src/paho/mqtt/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#
# Contributors:
# Roger Light - initial API and implementation
# Joachim Baumann - added timeout to subscribe.simple()

"""
This module provides some helper functions to allow straightforward subscribing
Expand All @@ -22,6 +23,7 @@

from .. import mqtt
from . import client as paho
from threading import Lock


def _on_connect_v5(client, userdata, flags, rc, properties):
Expand All @@ -35,17 +37,19 @@ def _on_connect_v5(client, userdata, flags, rc, properties):
else:
client.subscribe(userdata['topics'], userdata['qos'])


def _on_connect(client, userdata, flags, rc):
"""Internal v5 callback"""
_on_connect_v5(client, userdata, flags, rc, None)


def _on_message_callback(client, userdata, message):
"""Internal callback"""
userdata['callback'](client, userdata['userdata'], message)
userdata['callback'](client, userdata['userdata'],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. Existing user of subscribe.callback will be required to change the signature of their callback function to accept an additional parameter "lock".

The idea I see to avoid any breaking change and still support this feature (while simple() still just call callback()) would be to add support for a special exception "_StopSubscriber" that the user callback could raise to disconnect. The callback "_on_message_simple" could then raise that exception, this would continue to make simple() a normal user of callback().
So here we could do:

try:
  userdata['callback'](client, userdata['userdata'], message)
except _StopSubscriber:
  lock.release()

I suggest to name it "_StopSubscriber" because I would like to kept the existence of this special exception undocumented/unsupported for now, because I think the true solution would be to call "client.disconnect()" instead of this exception. But doing the disconnect will not work because currently the received message is acknowledged after the on_message callback so disconnecting from inside the callback will cause the acknowledge to be dropped.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought that since it is an internal function I could change the signature.

I could very simply determine whether userdata['lock'] is set, and if not call with the original signature instead. This would keep it compatible and would simply add the new functionality.

Using an exception here is not something I'd like to do. An exception always models something that is not part of the normal execution path, and in addition, is normally quite a bit slower. I think the approach with checking the value of 'lock' would be better. Or is there another motivation for using the exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not only an internal function or I did I missunderstood the code ? It's the callback passed to subscribe.callback() which is user provided.

In subscribe.simple() I agree it's internal, but we added timeout & the lock also on subscribe.callback().

I just realize we recently added a user_data_get on client. It's a bit broken in that use-case, since it will return the "internal" userdata instead of the user-userdata (userdata["userdata"]). But that probably solve our issue. We can access to userdata["lock"] by using client.user_data_get()["lock"]. This avoid any change in callback signature.

message, userdata['lock'])


def _on_message_simple(client, userdata, message):
def _on_message_simple(client, userdata, message, lock):
"""Internal callback"""

if userdata['msg_count'] == 0:
Expand All @@ -60,22 +64,27 @@ def _on_message_simple(client, userdata, message):
if userdata['messages'] is None and userdata['msg_count'] == 0:
userdata['messages'] = message
client.disconnect()
if lock:
lock.release()
return

userdata['messages'].append(message)
if userdata['msg_count'] == 0:
client.disconnect()
if lock:
lock.release()


def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=paho.MQTTv311, transport="tcp",
clean_session=True, proxy_args=None):
clean_session=True, proxy_args=None, timeout=None):
"""Subscribe to a list of topics and process them in a callback function.

This function creates an MQTT client, connects to a broker and subscribes
to a list of topics. Incoming messages are processed by the user provided
callback. This is a blocking function and will never return.
callback. This is a blocking function and will only return after the
timeout. If no timeout is given, the function will never return.

callback : function of the form "on_message(client, userdata, message)" for
processing the messages received.
Expand Down Expand Up @@ -132,16 +141,25 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
Defaults to True.

proxy_args: a dictionary that will be given to the client.

timeout: the timeout value after which the client disconnects from the
broker. If no timeout is given, the client disconnects only
after "msg_count" messages have been received.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no msg_count in the subscribe.callback()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I got this from the simple() function. I will remove this.

"""

if qos < 0 or qos > 2:
raise ValueError('qos must be in the range 0-2')

lock = None
if timeout is not None:
lock = Lock()

callback_userdata = {
'callback':callback,
'topics':topics,
'qos':qos,
'userdata':userdata}
'callback': callback,
'topics': topics,
'qos': qos,
'lock': lock,
'userdata': userdata}

client = paho.Client(client_id=client_id, userdata=callback_userdata,
protocol=protocol, transport=transport,
Expand Down Expand Up @@ -180,18 +198,27 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
client.tls_set_context(tls)

client.connect(hostname, port, keepalive)
client.loop_forever()

if timeout == None:
client.loop_forever()
else:
lock.acquire()
client.loop_start()
lock.acquire(timeout=timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
lock.acquire(timeout=timeout)
event.wait(timeout=timeout)

And using a threading.Event() rather than threading.Lock(). An Event seems a better fit than an Lock

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a one-on-one communication I believe that a Lock is actually modelling the relationship better. Do you have any additional use in mind e.g., an observer/debugger watching these events? Otherwise I would tend to see Lock as better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that an Event is even better at modelling this: the finishing processing is an event and we wait for that event.

With the lock, we have two consecutive call to acquire within the same thread isn't usual way to work with a lock.

client.loop_stop()
client.disconnect()


def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=paho.MQTTv311, transport="tcp",
clean_session=True, proxy_args=None):
clean_session=True, proxy_args=None, timeout=None):
"""Subscribe to a list of topics and return msg_count messages.

This function creates an MQTT client, connects to a broker and subscribes
to a list of topics. Once "msg_count" messages have been received, it
disconnects cleanly from the broker and returns the messages.
to a list of topics. Once "msg_count" messages have been received or the
timeout has been reached, it disconnects cleanly from the broker and
returns the received messages.

topics : either a string containing a single topic to subscribe to, or a
list of topics to subscribe to.
Expand Down Expand Up @@ -253,6 +280,10 @@ def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
Defaults to True.

proxy_args: a dictionary that will be given to the client.

timeout: the timeout value after which the client disconnects from the
broker. If no timeout is given, the client disconnects only
after "msg_count" messages have been received.
"""

if msg_count < 1:
Expand All @@ -265,10 +296,11 @@ def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
else:
messages = []

userdata = {'retained':retained, 'msg_count':msg_count, 'messages':messages}
userdata = {'retained': retained,
'msg_count': msg_count, 'messages': messages}

callback(_on_message_simple, topics, qos, userdata, hostname, port,
client_id, keepalive, will, auth, tls, protocol, transport,
clean_session, proxy_args)
clean_session, proxy_args, timeout)

return userdata['messages']