/
mqtt.py
56 lines (48 loc) · 1.47 KB
/
mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""
A sink that sends events to an MQTT topic.
"""
import logging
from typing import Optional
from asyncio_mqtt import Client
from senor_octopus.types import Stream
_logger = logging.getLogger(__name__)
async def mqtt( # pylint: disable=too-many-arguments
stream: Stream,
topic: str,
host: str = "localhost",
port: int = 1883,
username: Optional[str] = None,
password: Optional[str] = None,
qos: int = 1,
) -> None:
"""
Send events as messages to an MQTT topic.
This sink can be used to send events to an MQTT topic.
The value of the event is sent as the message; its name
is ignored.
Parameters
----------
stream
The incoming stream of events
topic
The MQTT topic where messages are sent to
host
Host where the MQTT server is running
port
Port which the MQTT is listening to
username
Optional username to use when connecting to the MQTT server
password
Optional password to use when connecting to the MQTT server
qos
Quality of Service (QoS) level
"""
async with Client(host, port, username=username, password=password) as client:
async for event in stream: # pragma: no cover
value = event["value"]
if (
not isinstance(value, (str, bytearray, int, float))
and value is not None
):
value = str(value)
await client.publish(topic, value, qos=qos)