/
awair.py
99 lines (87 loc) · 2.9 KB
/
awair.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""
A source that reads data from an Awair Element monitor.
"""
import logging
from datetime import datetime, timezone
import httpx
from marshmallow import Schema, fields
from senor_octopus.lib import configuration_schema
from senor_octopus.types import Stream
_logger = logging.getLogger(__name__)
class AwairConfig(Schema): # pylint: disable=too-few-public-methods
"""
A source that reads air quality data from an Awair Element monitor.
"""
access_token = fields.String(
required=True,
default=None,
title="Awair API access token",
description=(
"An Awair API access token. Can be obtained from "
"https://developer.getawair.com/console/access-token."
),
)
device_id = fields.Integer(
required=True,
default=None,
title="Device ID",
description=(
"The ID of the device to read data from. To find the device ID: "
"`curl 'https://developer-apis.awair.is/v1/users/self/devices' "
"-H 'Authorization: Bearer example-token'`"
),
)
device_type = fields.String(
required=False,
default="awair-element",
title="Device type",
description="The type of device to read data from.",
)
prefix = fields.String(
required=False,
default="hub.awair",
title="The prefix for events from this source",
description=(
"The prefix for events from this source. For example, if the "
"prefix is `awair` an event name `awair.score` will be emitted "
"for the air quality score."
),
)
@configuration_schema(AwairConfig())
async def awair(
access_token: str,
device_id: int,
device_type: str = "awair-element",
prefix: str = "hub.awair",
) -> Stream:
"""
Fetch air quality data from Awair Element monitor.
This source will periodically retrieve air quality data from
an Awair Element monitor.
"""
_logger.info("Fetching air quality data")
url = (
"https://developer-apis.awair.is/v1/users/self/devices/"
f"{device_type}/{device_id}/air-data/latest?fahrenheit=false"
)
headers = {"Authorization": f"Bearer {access_token}"}
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
payload = response.json()
_logger.debug("Received %s", payload)
for row in payload["data"]:
timestamp = datetime.strptime(
row["timestamp"],
"%Y-%m-%dT%H:%M:%S.%fZ",
).replace(tzinfo=timezone.utc)
yield {
"timestamp": timestamp,
"name": f"{prefix}.score",
"value": row["score"],
}
for sensor in row["sensors"]:
yield {
"timestamp": timestamp,
"name": f"{prefix}.{sensor['comp']}",
"value": sensor["value"],
}