-
-
Notifications
You must be signed in to change notification settings - Fork 38
/
command.py
190 lines (160 loc) · 5.85 KB
/
command.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""Base command."""
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, final
from .authentication import Authenticator
from .const import PATH_API_IOT_DEVMANAGER, REQUEST_HEADERS
from .events.event_bus import EventBus
from .logging_filter import get_logger
from .message import HandlingResult, HandlingState
from .models import DeviceInfo
_LOGGER = get_logger(__name__)
@dataclass(frozen=True)
class CommandResult(HandlingResult):
"""Command result object."""
requested_commands: list["Command"] = field(default_factory=lambda: [])
@classmethod
def success(cls) -> "CommandResult":
"""Create result with handling success."""
return CommandResult(HandlingState.SUCCESS)
@classmethod
def analyse(cls) -> "CommandResult":
"""Create result with handling analyse."""
return CommandResult(HandlingState.ANALYSE)
class Command(ABC):
"""Abstract command object."""
_targets_bot: bool = True
def __init__(self, args: dict | list | None = None) -> None:
if args is None:
args = {}
self._args = args
@property # type: ignore[misc]
@classmethod
@abstractmethod
def name(cls) -> str:
"""Command name."""
@final
async def execute(
self, authenticator: Authenticator, device_info: DeviceInfo, event_bus: EventBus
) -> bool:
"""Execute command.
Returns:
bot_reached (bool): True if the command was targeting the bot and it responded in time. False otherwise.
This value is not indicating if the command was executed successfully.
"""
try:
result = await self._execute(authenticator, device_info, event_bus)
if result.state == HandlingState.SUCCESS:
# Execute command which are requested by the handler
async with asyncio.TaskGroup() as tg:
for requested_command in result.requested_commands:
tg.create_task(
requested_command.execute(
authenticator, device_info, event_bus
)
)
return self._targets_bot
except Exception: # pylint: disable=broad-except
_LOGGER.warning(
"Could not execute command %s",
self.name,
exc_info=True,
)
return False
async def _execute(
self, authenticator: Authenticator, device_info: DeviceInfo, event_bus: EventBus
) -> CommandResult:
"""Execute command."""
response = await self._execute_api_request(authenticator, device_info)
result = self.__handle_response(event_bus, response)
if result.state == HandlingState.ANALYSE:
_LOGGER.debug(
"ANALYSE: Could not handle command: %s with %s", self.name, response
)
return CommandResult(
HandlingState.ANALYSE_LOGGED,
result.args,
result.requested_commands,
)
return result
def _get_payload(self) -> dict[str, Any] | list:
payload = {
"header": {
"pri": "1",
"ts": datetime.now().timestamp(),
"tzm": 480,
"ver": "0.0.50",
}
}
if len(self._args) > 0:
payload["body"] = {"data": self._args}
return payload
async def _execute_api_request(
self, authenticator: Authenticator, device_info: DeviceInfo
) -> dict[str, Any]:
json = {
"cmdName": self.name,
"payload": self._get_payload(),
"payloadType": "j",
"td": "q",
"toId": device_info.did,
"toRes": device_info.resource,
"toType": device_info.get_class,
}
credentials = await authenticator.authenticate()
query_params = {
"mid": json["toType"],
"did": json["toId"],
"td": json["td"],
"u": credentials.user_id,
"cv": "1.67.3",
"t": "a",
"av": "1.3.1",
}
return await authenticator.post_authenticated(
PATH_API_IOT_DEVMANAGER,
json,
query_params=query_params,
headers=REQUEST_HEADERS,
)
def __handle_response(
self, event_bus: EventBus, response: dict[str, Any]
) -> CommandResult:
"""Handle response from a command.
:return: A message response
"""
try:
result = self._handle_response(event_bus, response)
if result.state == HandlingState.ANALYSE:
_LOGGER.debug(
"ANALYSE: Could not handle command: %s with %s", self.name, response
)
return CommandResult(
HandlingState.ANALYSE_LOGGED,
result.args,
result.requested_commands,
)
return result
except Exception: # pylint: disable=broad-except
_LOGGER.warning(
"Could not parse response for %s: %s",
self.name,
response,
exc_info=True,
)
return CommandResult(HandlingState.ERROR)
@abstractmethod
def _handle_response(
self, event_bus: EventBus, response: dict[str, Any]
) -> CommandResult:
"""Handle response from a command.
:return: A message response
"""
def __eq__(self, obj: object) -> bool:
if isinstance(obj, Command):
return self.name == obj.name and self._args == obj._args
return False
def __hash__(self) -> int:
return hash(self.name) + hash(self._args)