Skip to content

Commit

Permalink
Documentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerimoq committed May 8, 2019
1 parent 3dd2019 commit e67913e
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions mqttools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ def __init__(self,
self._connack_event = None
self._pingresp_event = None
self.transactions = None
self.messages = None
self._messages = None
self._connack = None
self._next_packet_identifier = None
self._broker_receive_maximum = None
Expand Down Expand Up @@ -954,7 +954,7 @@ async def start(self):
self._connack_event = asyncio.Event()
self._pingresp_event = asyncio.Event()
self.transactions = {}
self.messages = asyncio.Queue()
self._messages = asyncio.Queue()
self._connack = None
self._next_packet_identifier = 1
self._broker_receive_maximum = None
Expand All @@ -964,15 +964,33 @@ async def start(self):
self._host,
self._port,
**self._kwargs)
self._reader_task = asyncio.create_task(self.reader_main())
self._reader_task = asyncio.create_task(self._reader_main())

if self._keep_alive_s != 0:
self._keep_alive_task = asyncio.create_task(self.keep_alive_main())
self._keep_alive_task = asyncio.create_task(self._keep_alive_main())
else:
self._keep_alive_task = None

await self.connect()

@property
def messages(self):
"""Received messages from the broker. Each message is a tuple of the
topic and the message.
>>> await client.messages.get()
('/my/topic', b'my-message')
A ``(None, None)`` message is put in the queue if the broker
connection is lost.
>>> await client.messages.get()
(None, None)
"""

return self._messages

async def stop(self):
"""Try to cleanly disconnect from the broker and then close the TCP
connection.
Expand Down Expand Up @@ -1057,6 +1075,10 @@ def disconnect(self):
async def subscribe(self, topic, qos):
"""Subscribe to given topic with given QoS.
>>> await client.subscribe('/my/topic', 0)
>>> await client.messages.get()
('/my/topic', b'my-message')
"""

with Transaction(self) as transaction:
Expand Down Expand Up @@ -1117,6 +1139,8 @@ async def publish_qos_2(self, topic, alias, message):
async def publish(self, topic, message, qos):
"""Publish given message to given topic with given QoS.
>>> await client.publish('/my/topic', b'my-message', 0)
"""

if topic in self._broker_topic_aliases:
Expand Down Expand Up @@ -1184,11 +1208,11 @@ async def on_publish(self, flags, payload):
return

if qos == 0:
await self.messages.put((topic, message))
await self._messages.put((topic, message))
elif qos == 1:
self._write_packet(pack_puback(packet_identifier,
PubackReasonCode.SUCCESS))
await self.messages.put((topic, message))
await self._messages.put((topic, message))
elif qos == 2:
if packet_identifier in self._on_publish_qos_2_transactions:
reason = PubrecReasonCode.PACKET_IDENTIFIER_IN_USE
Expand Down Expand Up @@ -1244,7 +1268,7 @@ async def on_pubrel(self, payload):
PubcompReasonCode.SUCCESS))
task, message = self._on_publish_qos_2_transactions[packet_identifier]
task.cancel()
await self.messages.put(message)
await self._messages.put(message)

del self._on_publish_qos_2_transactions[packet_identifier]
else:
Expand Down Expand Up @@ -1308,7 +1332,7 @@ async def reader_loop(self):
control_packet_type_to_string(packet_type),
payload.getvalue())

async def reader_main(self):
async def _reader_main(self):
"""Read packets from the broker.
"""
Expand All @@ -1318,7 +1342,7 @@ async def reader_main(self):
except Exception as e:
LOGGER.info('Reader task stopped by %r.', e)
self._writer.close()
await self.messages.put((None, None))
await self._messages.put((None, None))

async def keep_alive_loop(self):
while True:
Expand All @@ -1331,7 +1355,7 @@ async def keep_alive_loop(self):
await asyncio.wait_for(self._pingresp_event.wait(),
self.response_timeout)

async def keep_alive_main(self):
async def _keep_alive_main(self):
"""Ping the broker periodically to keep the connection alive.
"""
Expand All @@ -1341,7 +1365,7 @@ async def keep_alive_main(self):
except Exception as e:
LOGGER.info('Keep alive task stopped by %r.', e)
self._writer.close()
await self.messages.put((None, None))
await self._messages.put((None, None))

def _write_packet(self, message):
if LOGGER.isEnabledFor(logging.DEBUG):
Expand Down

0 comments on commit e67913e

Please sign in to comment.