Skip to content

Commit

Permalink
Merge c32e7ff into 07c4c70
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Kriegerowski committed Jan 10, 2021
2 parents 07c4c70 + c32e7ff commit 0af4970
Show file tree
Hide file tree
Showing 42 changed files with 774 additions and 982 deletions.
6 changes: 4 additions & 2 deletions .travis.yml
@@ -1,11 +1,13 @@
dist: trusty
dist: xenial
sudo: false
language: python

python:
- "3.4"
- "3.5"
- "3.6"
- "3.7"
- "3.8"
- "3.9"
- "nightly"
matrix:
allow_failures:
Expand Down
5 changes: 2 additions & 3 deletions docs/references/broker.rst
Expand Up @@ -16,10 +16,9 @@ The following example shows how to start a broker using the default configuratio
from hbmqtt.broker import Broker
@asyncio.coroutine
def broker_coro():
async def broker_coro():
broker = Broker()
yield from broker.start()
await broker.start()
if __name__ == '__main__':
Expand Down
35 changes: 16 additions & 19 deletions docs/references/mqttclient.rst
Expand Up @@ -22,23 +22,22 @@ The example below shows how to write a simple MQTT client which subscribes a top
logger = logging.getLogger(__name__)
@asyncio.coroutine
def uptime_coro():
async def uptime_coro():
C = MQTTClient()
yield from C.connect('mqtt://test.mosquitto.org/')
await C.connect('mqtt://test.mosquitto.org/')
# Subscribe to '$SYS/broker/uptime' with QOS=1
# Subscribe to '$SYS/broker/load/#' with QOS=2
yield from C.subscribe([
await C.subscribe([
('$SYS/broker/uptime', QOS_1),
('$SYS/broker/load/#', QOS_2),
])
try:
for i in range(1, 100):
message = yield from C.deliver_message()
message = await C.deliver_message()
packet = message.publish_packet
print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
yield from C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
yield from C.disconnect()
await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
await C.disconnect()
except ClientException as ce:
logger.error("Client exception: %s" % ce)
Expand Down Expand Up @@ -71,31 +70,29 @@ This example also shows to method for publishing message asynchronously.
logger = logging.getLogger(__name__)
@asyncio.coroutine
def test_coro():
async def test_coro():
C = MQTTClient()
yield from C.connect('mqtt://test.mosquitto.org/')
await C.connect('mqtt://test.mosquitto.org/')
tasks = [
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0')),
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
]
yield from asyncio.wait(tasks)
await asyncio.wait(tasks)
logger.info("messages published")
yield from C.disconnect()
await C.disconnect()
@asyncio.coroutine
def test_coro2():
async def test_coro2():
try:
C = MQTTClient()
ret = yield from C.connect('mqtt://test.mosquitto.org:1883/')
message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)
message = yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
ret = await C.connect('mqtt://test.mosquitto.org:1883/')
message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)
message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
#print(message)
logger.info("messages published")
yield from C.disconnect()
await C.disconnect()
except ConnectException as ce:
logger.error("Connection failed: %s" % ce)
asyncio.get_event_loop().stop()
Expand Down
57 changes: 22 additions & 35 deletions hbmqtt/adapters.py
Expand Up @@ -16,8 +16,7 @@ class ReaderAdapter:
Reader adapters are used to adapt read operations on the network depending on the protocol used
"""

@asyncio.coroutine
def read(self, n=-1) -> bytes:
async def read(self, n=-1) -> bytes:
"""
Read up to n bytes. If n is not provided, or set to -1, read until EOF and return all read bytes.
If the EOF was received and the internal buffer is empty, return an empty bytes object.
Expand All @@ -42,8 +41,7 @@ def write(self, data):
write some data to the protocol layer
"""

@asyncio.coroutine
def drain(self):
async def drain(self):
"""
Let the write buffer of the underlying transport a chance to be flushed.
"""
Expand All @@ -53,8 +51,7 @@ def get_peer_info(self):
Return peer socket info (remote address and remote port as tuple
"""

@asyncio.coroutine
def close(self):
async def close(self):
"""
Close the protocol connection
"""
Expand All @@ -69,22 +66,20 @@ def __init__(self, protocol: WebSocketCommonProtocol):
self._protocol = protocol
self._stream = io.BytesIO(b'')

@asyncio.coroutine
def read(self, n=-1) -> bytes:
yield from self._feed_buffer(n)
async def read(self, n=-1) -> bytes:
await self._feed_buffer(n)
data = self._stream.read(n)
return data

@asyncio.coroutine
def _feed_buffer(self, n=1):
async def _feed_buffer(self, n=1):
"""
Feed the data buffer by reading a Websocket message.
:param n: if given, feed buffer until it contains at least n bytes
"""
buffer = bytearray(self._stream.read())
while len(buffer) < n:
try:
message = yield from self._protocol.recv()
message = await self._protocol.recv()
except ConnectionClosed:
message = None
if message is None:
Expand All @@ -110,22 +105,20 @@ def write(self, data):
"""
self._stream.write(data)

@asyncio.coroutine
def drain(self):
async def drain(self):
"""
Let the write buffer of the underlying transport a chance to be flushed.
"""
data = self._stream.getvalue()
if len(data):
yield from self._protocol.send(data)
await self._protocol.send(data)
self._stream = io.BytesIO(b'')

def get_peer_info(self):
return self._protocol.remote_address

@asyncio.coroutine
def close(self):
yield from self._protocol.close()
async def close(self):
await self._protocol.close()


class StreamReaderAdapter(ReaderAdapter):
Expand All @@ -137,12 +130,11 @@ class StreamReaderAdapter(ReaderAdapter):
def __init__(self, reader: StreamReader):
self._reader = reader

@asyncio.coroutine
def read(self, n=-1) -> bytes:
async def read(self, n=-1) -> bytes:
if n == -1:
data = yield from self._reader.read(n)
data = await self._reader.read(n)
else:
data = yield from self._reader.readexactly(n)
data = await self._reader.readexactly(n)
return data

def feed_eof(self):
Expand All @@ -164,24 +156,22 @@ def write(self, data):
if not self.is_closed:
self._writer.write(data)

@asyncio.coroutine
def drain(self):
async def drain(self):
if not self.is_closed:
yield from self._writer.drain()
await self._writer.drain()

def get_peer_info(self):
extra_info = self._writer.get_extra_info('peername')
return extra_info[0], extra_info[1]

@asyncio.coroutine
def close(self):
async def close(self):
if not self.is_closed:
self.is_closed = True # we first mark this closed so yields below don't cause races with waiting writes
yield from self._writer.drain()
await self._writer.drain()
if self._writer.can_write_eof():
self._writer.write_eof()
self._writer.close()
try: yield from self._writer.wait_closed() # py37+
try: await self._writer.wait_closed() # py37+
except AttributeError: pass


Expand All @@ -193,8 +183,7 @@ class BufferReader(ReaderAdapter):
def __init__(self, buffer: bytes):
self._stream = io.BytesIO(buffer)

@asyncio.coroutine
def read(self, n=-1) -> bytes:
async def read(self, n=-1) -> bytes:
return self._stream.read(n)


Expand All @@ -212,8 +201,7 @@ def write(self, data):
"""
self._stream.write(data)

@asyncio.coroutine
def drain(self):
async def drain(self):
pass

def get_buffer(self):
Expand All @@ -222,6 +210,5 @@ def get_buffer(self):
def get_peer_info(self):
return "BufferWriter", 0

@asyncio.coroutine
def close(self):
async def close(self):
self._stream.close()

0 comments on commit 0af4970

Please sign in to comment.