Skip to content

Commit

Permalink
Remove QoS 1 and 2 support as they didn't work. Client persistent sta…
Browse files Browse the repository at this point in the history
…te is needed. Added session resume functionality.
  • Loading branch information
eerimoq committed May 8, 2019
1 parent 334941a commit 7a7c31e
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 675 deletions.
9 changes: 5 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ MQTT Tools

Features:

- ``asyncio`` MQTT 5.0 client.
- ``asyncio`` MQTT 5.0 client. Only QoS 0 (fire and forget) is
supported.

Requires Python 3.7 or later!

Expand Down Expand Up @@ -60,7 +61,7 @@ benchmark the client and the broker.

.. code-block:: text
$ mqttools publish --count 100 --qos 1 /test/mqttools/foo
$ mqttools publish --count 100 /test/mqttools/foo
Published 100 message(s) in 0.39 seconds from 10 concurrent task(s).
Monitor
Expand Down Expand Up @@ -99,7 +100,7 @@ An example connecting to an MQTT broker, subscribing to the topic
client = mqttools.Client('broker.hivemq.com', 1883)
await client.start()
await client.subscribe('/test/#', 0)
await client.subscribe('/test/#')
while True:
topic, message = await client.messages.get()
Expand Down Expand Up @@ -128,7 +129,7 @@ An example connecting to an MQTT broker and publishing the message
client = mqttools.Client('broker.hivemq.com', 1883)
await client.start()
await client.publish('/test/mqttools/foo', b'bar', 0)
await client.publish('/test/mqttools/foo', b'bar')
await client.stop()
asyncio.run(publisher())
Expand Down
7 changes: 3 additions & 4 deletions examples/incrementer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async def main():
client = mqttools.Client(HOST, PORT)
await client.start()
print(f'Connected to {HOST}:{PORT}.')
await client.subscribe('/mqttools/incrementer/value/request', 0)
await client.subscribe('/mqttools/incrementer/value/request')

print('Subscribed to topic /mqttools/incrementer/value/request.')

Expand All @@ -25,9 +25,8 @@ async def main():
print(f'Request count: {count}')
count += 1
print(f'Response count: {count}')
await client.publish('/mqttools/counter-client/value/response',
str(count).encode('ascii'),
0)
client.publish('/mqttools/counter-client/value/response',
str(count).encode('ascii'))


asyncio.run(main())
2 changes: 1 addition & 1 deletion examples/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ async def publisher():
client = mqttools.Client('broker.hivemq.com', 1883)

await client.start()
await client.publish('/test/mqttools/foo', b'bar', 0)
client.publish('/test/mqttools/foo', b'bar')
await client.stop()
print("Successfully published b'bar' on /test/mqttools/foo.")

Expand Down
40 changes: 40 additions & 0 deletions examples/resume_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
import mqttools
from uuid import uuid1


async def resume_session():
client = mqttools.Client('broker.hivemq.com',
1883,
client_id='mqttools-{}'.format(uuid1().node),
session_expiry_interval=15)

try:
await client.start(resume_session=True)
print('Session resumed.')
except mqttools.SessionResumeError:
print('No session to resume. Subscribing to topics.')

# Subscribe to three topics in parallel.
print('Subscribing to topics.')
await asyncio.gather(
client.subscribe('$SYS/#'),
client.subscribe('/test/mqttools/foo')
)

print('Waiting for messages.')

while True:
topic, message = await client.messages.get()

if topic is None:
print('Broker connection lost!')
break

print(f'Topic: {topic}')
print(f'Message: {message}')

import logging
logging.basicConfig(level=logging.DEBUG)

asyncio.run(resume_session())
7 changes: 4 additions & 3 deletions examples/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ async def subscriber():

# Subscribe to three topics in parallel.
await asyncio.gather(
client.subscribe('$SYS/broker/uptime', 0),
client.subscribe('$SYS/broker/bytes/sent', 0),
client.subscribe('/test/mqttools/foo', 0)
client.subscribe('$SYS/#'),
client.subscribe('/test/mqttools/foo')
)

print('Waiting for messages.')

while True:
topic, message = await client.messages.get()

Expand Down
8 changes: 4 additions & 4 deletions examples/topic_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ async def publisher():
])

await client.start()
await client.publish('/test/mqttools/foo', b'sets-alias-in-broker', 0)
await client.publish('/test/mqttools/foo', b'published-with-alias', 0)
await client.publish('/test/mqttools/fie', b'not-using-alias', 0)
await client.publish('/test/mqttools/fie', b'not-using-alias', 0)
client.publish('/test/mqttools/foo', b'sets-alias-in-broker')
client.publish('/test/mqttools/foo', b'published-with-alias')
client.publish('/test/mqttools/fie', b'not-using-alias')
client.publish('/test/mqttools/fie', b'not-using-alias')
await client.stop()
print("Successfully published b'bar' on /test/mqttools/foo.")

Expand Down
2 changes: 1 addition & 1 deletion examples/unsubscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async def unsubscriber():
await client.start()

print('Subscribing to /test/mqttools/foo.')
await client.subscribe('/test/mqttools/foo', 0)
await client.subscribe('/test/mqttools/foo')
topic, message = await client.messages.get()

print(f'Topic: {topic}')
Expand Down
2 changes: 1 addition & 1 deletion examples/will.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async def will():
1883,
will_topic='/my/will/topic',
will_message=b'my-will-message',
will_qos=0)
will_qos=mqttools.QoS.AT_MOST_ONCE)

await client.start()
await client.stop()
Expand Down
1 change: 1 addition & 0 deletions mqttools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .client import Client
from .client import MalformedPacketError
from .client import ConnectError
from .client import SessionResumeError
from .client import PublishError
from .client import QoS

Expand Down

0 comments on commit 7a7c31e

Please sign in to comment.