-
-
Notifications
You must be signed in to change notification settings - Fork 28.5k
/
api.py
51 lines (40 loc) · 1.72 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
"""API for Ondilo ICO bound to Home Assistant OAuth."""
from asyncio import run_coroutine_threadsafe
import logging
from typing import Any
from ondilo import Ondilo
from homeassistant import config_entries, core
from homeassistant.helpers import config_entry_oauth2_flow
_LOGGER = logging.getLogger(__name__)
class OndiloClient(Ondilo):
"""Provide Ondilo ICO authentication tied to an OAuth2 based config entry."""
def __init__(
self,
hass: core.HomeAssistant,
config_entry: config_entries.ConfigEntry,
implementation: config_entry_oauth2_flow.AbstractOAuth2Implementation,
) -> None:
"""Initialize Ondilo ICO Auth."""
self.hass = hass
self.config_entry = config_entry
self.session = config_entry_oauth2_flow.OAuth2Session(
hass, config_entry, implementation
)
super().__init__(self.session.token)
def refresh_tokens(self) -> dict:
"""Refresh and return new Ondilo ICO tokens using Home Assistant OAuth2 session."""
run_coroutine_threadsafe(
self.session.async_ensure_token_valid(), self.hass.loop
).result()
return self.session.token
def get_all_pools_data(self) -> list[dict[str, Any]]:
"""Fetch pools and add pool details and last measures to pool data."""
pools = self.get_pools()
for pool in pools:
_LOGGER.debug(
"Retrieving data for pool/spa: %s, id: %d", pool["name"], pool["id"]
)
pool["ICO"] = self.get_ICO_details(pool["id"])
pool["sensors"] = self.get_last_pool_measures(pool["id"])
_LOGGER.debug("Retrieved the following sensors data: %s", pool["sensors"])
return pools