Skip to content

Commit

Permalink
MQTT311 extra connection callbacks (#442)
Browse files Browse the repository at this point in the history
Adds V1-SDK like callbacks to the MQTT311 connection for easier migration from V1 to V2.
  • Loading branch information
TwistedTwigleg committed Jun 9, 2023
1 parent ce22185 commit 3cad101
Show file tree
Hide file tree
Showing 3 changed files with 312 additions and 18 deletions.
89 changes: 88 additions & 1 deletion awscrt/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,39 @@ def __init__(self, topic, qos, payload, retain):
self.retain = retain


@dataclass
class OnConnectionSuccessData:
"""Dataclass containing data related to a on_connection_success Callback
Args:
return_code (ConnectReturnCode): Connect return. code received from the server.
session_present (bool): True if the connection resumes an existing session.
False if new session. Note that the server has forgotten all previous subscriptions
if this is False.
Subscriptions can be re-established via resubscribe_existing_topics() if the connection was a reconnection.
"""
return_code: ConnectReturnCode = None
session_present: bool = False


@dataclass
class OnConnectionFailureData:
"""Dataclass containing data related to a on_connection_failure Callback
Args:
error (ConnectReturnCode): Error code with reason for connection failure
"""
error: awscrt.exceptions.AwsCrtError = None


@dataclass
class OnConnectionClosedData:
"""Dataclass containing data related to a on_connection_closed Callback.
Currently unused.
"""
pass


class Client(NativeResource):
"""MQTT client.
Expand Down Expand Up @@ -213,6 +246,31 @@ class Connection(NativeResource):
* `**kwargs` (dict): Forward-compatibility kwargs.
on_connection_success: Optional callback invoked whenever the connection successfully connects.
This callback is invoked for every successful connect and every successful reconnect.
Function should take the following arguments and return nothing:
* `connection` (:class:`Connection`): This MQTT Connection
* `callback_data` (:class:`OnConnectionSuccessData`): The data returned from the connection success.
on_connection_failure: Optional callback invoked whenever the connection fails to connect.
This callback is invoked for every failed connect and every failed reconnect.
Function should take the following arguments and return nothing:
* `connection` (:class:`Connection`): This MQTT Connection
* `callback_data` (:class:`OnConnectionFailureData`): The data returned from the connection failure.
on_connection_closed: Optional callback invoked whenever the connection has been disconnected and shutdown successfully.
Function should take the following arguments and return nothing:
* `connection` (:class:`Connection`): This MQTT Connection
* `callback_data` (:class:`OnConnectionClosedData`): The data returned from the connection close.
reconnect_min_timeout_secs (int): Minimum time to wait between reconnect attempts.
Must be <= `reconnect_max_timeout_secs`.
Wait starts at min and doubles with each attempt until max is reached.
Expand Down Expand Up @@ -286,7 +344,10 @@ def __init__(self,
use_websockets=False,
websocket_proxy_options=None,
websocket_handshake_transform=None,
proxy_options=None
proxy_options=None,
on_connection_success=None,
on_connection_failure=None,
on_connection_closed=None
):

assert isinstance(client, Client)
Expand All @@ -297,6 +358,9 @@ def __init__(self,
assert isinstance(websocket_proxy_options, HttpProxyOptions) or websocket_proxy_options is None
assert isinstance(proxy_options, HttpProxyOptions) or proxy_options is None
assert callable(websocket_handshake_transform) or websocket_handshake_transform is None
assert callable(on_connection_success) or on_connection_success is None
assert callable(on_connection_failure) or on_connection_failure is None
assert callable(on_connection_closed) or on_connection_closed is None

if reconnect_min_timeout_secs > reconnect_max_timeout_secs:
raise ValueError("'reconnect_min_timeout_secs' cannot exceed 'reconnect_max_timeout_secs'")
Expand All @@ -316,6 +380,9 @@ def __init__(self,
self._on_connection_resumed_cb = on_connection_resumed
self._use_websockets = use_websockets
self._ws_handshake_transform_cb = websocket_handshake_transform
self._on_connection_success_cb = on_connection_success
self._on_connection_failure_cb = on_connection_failure
self._on_connection_closed_cb = on_connection_closed

# may be changed at runtime, take effect the the next time connect/reconnect occurs
self.client_id = client_id
Expand Down Expand Up @@ -385,6 +452,26 @@ def _on_complete(f):
if not future.done():
transform_args.set_done(e)

def _on_connection_closed(self):
if self:
if self._on_connection_closed_cb:
data = OnConnectionClosedData()
self._on_connection_closed_cb(connection=self, callback_data=data)

def _on_connection_success(self, return_code, session_present):
if self:
if self._on_connection_success_cb:
data = OnConnectionSuccessData(
return_code=ConnectReturnCode(return_code),
session_present=session_present)
self._on_connection_success_cb(connection=self, callback_data=data)

def _on_connection_failure(self, error_code):
if self:
if self._on_connection_failure_cb:
data = OnConnectionFailureData(error=awscrt.exceptions.from_code(error_code))
self._on_connection_failure_cb(connection=self, callback_data=data)

def connect(self):
"""Open the actual connection to the server (async).
Expand Down
Loading

0 comments on commit 3cad101

Please sign in to comment.