This repository has been archived by the owner on Mar 25, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 42
/
api.py
133 lines (108 loc) · 4.26 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Define a base object for interacting with the Eufy camera API."""
from datetime import datetime
import logging
from typing import Dict, Optional
from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientError
from .camera import Camera
from .errors import InvalidCredentialsError, RequestError, raise_error
_LOGGER: logging.Logger = logging.getLogger(__name__)
API_BASE: str = "https://mysecurity.eufylife.com/api/v1"
class API:
"""Define the API object."""
def __init__(
self, email: str, password: str, websession: ClientSession
) -> None:
"""Initialize."""
self._email: str = email
self._password: str = password
self._retry_on_401: bool = False
self._session: ClientSession = websession
self._token: Optional[str] = None
self._token_expiration: Optional[datetime] = None
self.cameras: Dict[str, Camera] = {}
async def async_authenticate(self) -> None:
"""Authenticate and get an access token."""
auth_resp = await self.request(
"post",
"passport/login",
json={"email": self._email, "password": self._password},
)
self._retry_on_401 = False
self._token = auth_resp["data"]["auth_token"]
self._token_expiration = datetime.fromtimestamp(
auth_resp["data"]["token_expires_at"]
)
async def async_get_history(self) -> dict:
"""Get the camera's history."""
history_resp = await self.request(
"post", "event/app/get_all_history_record"
)
return history_resp["data"]
async def async_update_device_info(self) -> None:
"""Get the latest device info."""
devices_resp = await self.request("post", "app/get_devs_list")
for device_info in devices_resp["data"]:
if device_info["device_sn"] in self.cameras:
camera = self.cameras[device_info["device_sn"]]
camera.camera_info = device_info
continue
self.cameras[device_info["device_sn"]] = Camera(self, device_info)
async def request(
self,
method: str,
endpoint: str,
*,
headers: Optional[dict] = None,
json: Optional[dict] = None,
) -> dict:
"""Make a request the API.com."""
if self._token_expiration and datetime.now() >= self._token_expiration:
_LOGGER.info("Access token expired; fetching a new one")
self._token = None
self._token_expiration = None
await self.async_authenticate()
url: str = f"{API_BASE}/{endpoint}"
if not headers:
headers = {}
if self._token:
headers["x-auth-token"] = self._token
async with self._session.request(
method, url, headers=headers, json=json
) as resp:
try:
resp.raise_for_status()
data: dict = await resp.json(content_type=None)
if not data:
raise RequestError(
f"No response while requesting {endpoint}"
)
_raise_on_error(data)
return data
except ClientError as err:
if "401" in str(err):
if self._retry_on_401:
raise InvalidCredentialsError(
"Token failed multiple times"
)
self._retry_on_401 = True
await self.async_authenticate()
return await self.request(
method, endpoint, headers=headers, json=json
)
raise RequestError(
f"There was an unknown error while requesting {endpoint}: {err}"
) from None
def _raise_on_error(data: dict) -> None:
"""Raise appropriately when a returned data payload contains an error."""
if data["code"] == 0:
return
raise_error(data)
async def async_login(
email: str, password: str, websession: ClientSession
) -> API:
"""Return an authenticated API object."""
api: API = API(email, password, websession)
await api.async_authenticate()
await api.async_update_device_info()
return api