/
weatherapi.py
64 lines (53 loc) · 1.61 KB
/
weatherapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""
A source that fetches data from WeatherAPI.
"""
import logging
from datetime import datetime, timezone
import httpx
from senor_octopus.lib import flatten
from senor_octopus.types import Stream
_logger = logging.getLogger(__name__)
async def weatherapi(
access_token: str,
location: str,
prefix: str = "hub.weatherapi",
) -> Stream:
"""
Fetch weather forecast data from weatherapi.com.
This source will periodically retrieve weather forecast
data from weatherapi.com for a given location.
Parameters
----------
access_token
Access token from https://www.weatherapi.com/my/.
location
Location name or ZIP code
prefix
Prefix for events from this source
Yields
------
Event
Events with weather forecast data
"""
_logger.info("Fetching weather data")
url = (
f"http://api.weatherapi.com/v1/forecast.json?key={access_token}"
f"&q={location}&days=2&aqi=no&alerts=no"
)
async with httpx.AsyncClient() as client:
response = await client.get(url)
payload = response.json()
_logger.debug("Received %s", payload)
for key, value in flatten(payload["current"]).items():
yield {
"timestamp": datetime.now(timezone.utc),
"name": f"{prefix}.current.{key}",
"value": value,
}
tomorrow = payload["forecast"]["forecastday"][1]
for key, value in flatten(tomorrow["day"]).items():
yield {
"timestamp": datetime.now(timezone.utc),
"name": f"{prefix}.forecast.forecastday.{key}",
"value": value,
}