Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repeat subscribe when reconnected to MQTT broker #337

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions mqtt_io/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ class StreamDataSentEvent(Event):
stream_name: str
data: bytes

@dataclass
class StreamDataSubscribeEvent(Event):
"""
Trigger MQTT subscribe
"""

@dataclass
class DigitalSubscribeEvent(Event):
"""
Trigger MQTT subscribe
"""


class EventBus:
"""
Expand Down
90 changes: 89 additions & 1 deletion mqtt_io/mqtt/asyncio_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,27 @@


def _map_exception(func: Func) -> Func:
"""
Creates a decorator that wraps a function and maps any raised `MqttError`
exception to a `MQTTException`.

:param func: The function to be wrapped.
:type func: Func
:return: The wrapped function.
:rtype: Func
"""
@wraps(func)
async def inner(*args: Any, **kwargs: Any) -> Any:
"""
Decorator for asynchronous functions that catches `MqttError` exceptions
and raises `MQTTException` instead.

Parameters:
func (Callable): The function to be decorated.

Returns:
Callable: The decorated function.
"""
try:
await func(*args, **kwargs)
except MqttError as exc:
Expand All @@ -42,6 +61,15 @@ class MQTTClient(AbstractMQTTClient):
"""

def __init__(self, options: MQTTClientOptions):
"""
Initializes a new instance of the MQTTClient class.

Args:
options (MQTTClientOptions): The options for the MQTT client.

Returns:
None
"""
super().__init__(options)
protocol_map = {
MQTTProtocol.V31: paho.MQTTv31,
Expand All @@ -66,7 +94,7 @@ def __init__(self, options: MQTTClientOptions):
username=options.username,
password=options.password,
client_id=options.client_id,
# keepalive=options.keepalive, # This isn't implemented yet on 0.8.1
#keepalive=options.keepalive,
tls_context=tls_context,
protocol=protocol_map[options.protocol],
will=will,
Expand All @@ -76,28 +104,82 @@ def __init__(self, options: MQTTClientOptions):

@_map_exception
async def connect(self, timeout: int = 10) -> None:
"""
Connects to the client asynchronously.

Args:
timeout (int): The timeout value in seconds (default: 10).

Returns:
None: This function does not return anything.
"""
await self._client.connect(timeout=timeout)

@_map_exception
async def disconnect(self) -> None:
"""
This function is an asynchronous method that handles the disconnection of the client.

Parameters:
self: The current instance of the class.

Returns:
None
"""
try:
await self._client.disconnect()
except TimeoutError:
await self._client.force_disconnect()

@_map_exception
async def subscribe(self, topics: List[Tuple[str, int]]) -> None:
"""
Subscribe to the given list of topics.

Args:
topics (List[Tuple[str, int]]): A list of tuples representing the topics
to subscribe to.
Each tuple should contain a string representing the topic name and
an integer representing the QoS level.

Returns:
None: This function does not return anything.

Raises:
Exception: If there is an error while subscribing to the topics.

"""
await self._client.subscribe(topics)

@_map_exception
async def publish(self, msg: MQTTMessageSend) -> None:
"""
Publishes an MQTT message to the specified topic.

Args:
msg (MQTTMessageSend): The MQTT message to be published.

Returns:
None: This function does not return anything.
"""
await self._client.publish(
topic=msg.topic, payload=msg.payload, qos=msg.qos, retain=msg.retain
)

def _on_message(
self, client: paho.Client, userdata: Any, msg: paho.MQTTMessage
) -> None:
"""
Callback function that is called when a message is received through MQTT.

Args:
client (paho.Client): The MQTT client instance.
userdata (Any): The user data associated with the client.
msg (paho.MQTTMessage): The received MQTT message.

Returns:
None: This function does not return anything.
"""
if self._message_queue is None:
_LOG.warning("Discarding MQTT message because queue is not initialised")
return
Expand All @@ -111,6 +193,12 @@ def _on_message(

@property
def message_queue(self) -> "asyncio.Queue[MQTTMessage]":
"""
Returns the message queue for receiving MQTT messages.

:return: The message queue for receiving MQTT messages.
:rtype: asyncio.Queue[MQTTMessage]
"""
if self._message_queue is None:
self._message_queue = asyncio.Queue(self._options.message_queue_size)
# pylint: disable=protected-access
Expand Down
Loading
Loading