@@ -86,7 +86,13 @@ def publish(self, message: Message) -> None:
8686
8787class MQTTSubscriber (Subscriber ):
8888 def __init__ (self , client : MQTTClient ):
89- self ._client = client
89+ self .client = client
90+ # Save the active subscriptions on subscribe() so we can resubscribe
91+ # after reconnect
92+ self .subscriptions : dict = {}
93+
94+ self .client .on_connect = self .on_connect
95+ self .client .user_data_set (self .subscriptions )
9096
9197 def subscribe (
9298 self , message_class : Type [Message ], callback : Callable [[Message ], None ]
@@ -96,8 +102,21 @@ def subscribe(
96102
97103 logger .debug ("Subscribing to topic %s" , message_class .topic )
98104
99- self ._client .subscribe (message_class .topic , qos = QOS_AT_LEAST_ONCE )
100- self ._client .message_callback_add (message_class .topic , func )
105+ self .client .subscribe (message_class .topic , qos = QOS_AT_LEAST_ONCE )
106+ self .client .message_callback_add (message_class .topic , func )
107+
108+ self .subscriptions [message_class .topic ] = func
109+
110+ @staticmethod
111+ def on_connect (_client , _userdata , _flags , rc , _properties ):
112+ if rc == 0 :
113+ # If we previously had active subscription we subscribe to them
114+ # again because they got lost after a broker disconnect.
115+ # Userdata is set in __init__() and filled in subscribe()
116+ if _userdata :
117+ for topic , func in _userdata .items ():
118+ _client .subscribe (topic , qos = QOS_AT_LEAST_ONCE )
119+ _client .message_callback_add (topic , func )
101120
102121 @staticmethod
103122 def _handle_message (
0 commit comments