-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Álvaro Fernández Rojas <noltari@gmail.com>
- Loading branch information
Showing
9 changed files
with
642 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
"""Airzone Local API simulation.""" | ||
|
||
from aiohttp import web | ||
from demo import AirzoneDemo | ||
from helpers import api_json_error | ||
from hvac import AirzoneACSStatus, AirzoneHVAC | ||
from integration import AirzoneIntegration | ||
from version import AirzoneVersion | ||
from webserver import AirzoneWebServer | ||
|
||
from aioairzone.common import TemperatureUnit | ||
from aioairzone.const import API_ERROR_METHOD_NOT_SUPPORTED, API_V1 | ||
|
||
# Airzone device simulation | ||
|
||
|
||
class Airzone: | ||
"""Airzone Local API.""" | ||
|
||
def __init__(self): | ||
"""Local API Version init.""" | ||
self.demo: AirzoneDemo = AirzoneDemo() | ||
self.hvac: AirzoneHVAC = AirzoneHVAC() | ||
self.integration: AirzoneIntegration = AirzoneIntegration() | ||
self.version: AirzoneVersion = AirzoneVersion() | ||
self.webserver: AirzoneWebServer = AirzoneWebServer() | ||
|
||
|
||
airzone = Airzone() | ||
airzone.hvac.add_zone(1, 1, TemperatureUnit.CELSIUS) | ||
# airzone.acs.set_status(AirzoneACSStatus.DISABLED) | ||
|
||
# Airzone Local API simulation | ||
|
||
routes = web.RouteTableDef() | ||
|
||
|
||
@routes.get("/{tail:.*}") | ||
async def root_get_handler(request): | ||
"""GET root.""" | ||
raise web.HTTPInternalServerError() | ||
|
||
|
||
@routes.post(f"/{API_V1}/demo") | ||
async def demo_post_handler(request): | ||
"""POST /demo.""" | ||
return await airzone.demo.post(request) | ||
|
||
|
||
@routes.post(f"/{API_V1}/hvac") | ||
async def hvac_post_handler(request): | ||
"""POST /hvac.""" | ||
return await airzone.hvac.post(request) | ||
|
||
|
||
@routes.put(f"/{API_V1}/hvac") | ||
async def hvac_put_handler(request): | ||
"""PUT /hvac.""" | ||
return await airzone.hvac.put(request) | ||
|
||
|
||
@routes.post(f"/{API_V1}/integration") | ||
async def integration_post_handler(request): | ||
"""POST /integration.""" | ||
return await airzone.integration.post(request) | ||
|
||
|
||
@routes.put(f"/{API_V1}/integration") | ||
async def integration_put_handler(request): | ||
"""PUT /integration.""" | ||
return await airzone.integration.put(request) | ||
|
||
|
||
@routes.post(f"/{API_V1}/version") | ||
async def version_post_handler(request): | ||
"""POST /version.""" | ||
return await airzone.version.post(request) | ||
|
||
|
||
@routes.post(f"/{API_V1}/webserver") | ||
async def webserver_post_handler(request): | ||
"""POST /webserver.""" | ||
return await airzone.webserver.post(request) | ||
|
||
|
||
@routes.post("/{tail:.*}") | ||
async def root_post_handler(request): | ||
"""POST root.""" | ||
return api_json_error(API_ERROR_METHOD_NOT_SUPPORTED) | ||
|
||
|
||
app = web.Application() | ||
app.add_routes(routes) | ||
web.run_app(app, port=3000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
"""Airzone Local API Demo.""" | ||
|
||
from typing import Any | ||
|
||
from aiohttp import web | ||
from helpers import api_json_response | ||
|
||
from aioairzone.const import API_DATA | ||
|
||
|
||
class AirzoneDemo: | ||
"""Airzone Local API Demo.""" | ||
|
||
def __init__(self): | ||
"""Local API Demo init.""" | ||
|
||
def data(self) -> dict[str, Any]: | ||
"""Return Local API Demo data.""" | ||
return { | ||
API_DATA: [ | ||
{ | ||
"systemID": 1, | ||
"zoneID": 1, | ||
"name": "zona01", | ||
"on": 1, | ||
"coolsetpoint": 25, | ||
"coolmaxtemp": 30, | ||
"coolmintemp": 18, | ||
"heatsetpoint": 22, | ||
"heatmaxtemp": 30, | ||
"heatmintemp": 15, | ||
"maxTemp": 30, | ||
"minTemp": 15, | ||
"setpoint": 26, | ||
"roomTemp": 20, | ||
"modes": [1, 2, 3, 4], | ||
"mode": 3, | ||
"speeds": 5, | ||
"speed": 2, | ||
"speed_values": [0, 1, 2, 3, 4, 5], | ||
"speed_type": 0, | ||
"coldStages": 3, | ||
"coldStage": 2, | ||
"heatStages": 3, | ||
"heatStage": 1, | ||
"humidity": 43, | ||
"units": 0, | ||
"errors": [ | ||
{"Zone": "Error 3"}, | ||
{"Zone": "Error 4"}, | ||
{"Zone": "Error 5"}, | ||
{"Zone": "Error 6"}, | ||
{"Zone": "Error 7"}, | ||
{"Zone": "Error 8"}, | ||
{"Zone": "Presence alarm"}, | ||
{"Zone": "Window alarm"}, | ||
{"Zone": "Anti-freezing alarm"}, | ||
{"Zone": "Zone without thermostat"}, | ||
{"Zone": "Low battery"}, | ||
{"Zone": "Active dew"}, | ||
{"Zone": "Active dew protection"}, | ||
{"Zone": "F05-H"}, | ||
{"Zone": "F06-H"}, | ||
{"Zone": "F05-C"}, | ||
{"Zone": "F06-C"}, | ||
{"system": "Error 9"}, | ||
{"system": "Error 13"}, | ||
{"system": "Error 11"}, | ||
{"system": "Error 15"}, | ||
{"system": "Error 16"}, | ||
{"system": "Error C09"}, | ||
{"system": "Error C11"}, | ||
{"system": "IU error IU4"}, | ||
{"system": "Error IAQ1"}, | ||
{"system": "Error IAQ4"}, | ||
{"Zone": "Error IAQ2"}, | ||
{"Zone": "Error IAQ3"}, | ||
], | ||
"air_demand": 1, | ||
"floor_demand": 1, | ||
"aq_mode": 2, | ||
"aq_quality": 2, | ||
"aq_thrlow": 10, | ||
"aq_thrhigh": 40, | ||
"slats_vertical": 2, | ||
"slats_horizontal": 3, | ||
"slats_vswing": 1, | ||
"slats_hswing": 0, | ||
"antifreeze": 1, | ||
"eco_adapt": "manual", | ||
} | ||
] | ||
} | ||
|
||
async def post(self, request: web.Request) -> Any: | ||
"""POST Local API Demo.""" | ||
return api_json_response(self.data()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
"""Airzone Local API Helpers.""" | ||
|
||
from collections.abc import Mapping | ||
import json | ||
from typing import Any | ||
|
||
from aiohttp import web | ||
|
||
from aioairzone.const import API_ERROR, API_ERRORS | ||
|
||
|
||
def api_error_dict(error: str) -> dict[str, Any]: | ||
"""Local API error dict.""" | ||
return { | ||
API_ERRORS: [ | ||
{ | ||
API_ERROR: error, | ||
}, | ||
], | ||
} | ||
|
||
|
||
def api_filter_dict(data: Any, keep: list[str]) -> Any: | ||
"""Local API dict filter.""" | ||
|
||
if not isinstance(data, (Mapping, list)): | ||
return data | ||
|
||
if isinstance(data, list): | ||
return [api_filter_dict(val, keep) for val in data] | ||
|
||
filtered = {**data} | ||
|
||
keys = list(filtered) | ||
for key in keys: | ||
if key not in keep: | ||
filtered.pop(key) | ||
elif isinstance(filtered[key], Mapping): | ||
filtered[key] = api_filter_dict(filtered[key], keep) | ||
elif isinstance(filtered[key], list): | ||
filtered[key] = [api_filter_dict(item, keep) for item in filtered[key]] | ||
|
||
return filtered | ||
|
||
|
||
def api_json_dumps(obj: Any) -> Any: | ||
"""Local API JSON dumps.""" | ||
return json.dumps(obj, indent=4, sort_keys=True) | ||
|
||
|
||
def api_json_error(error: str) -> dict[str, Any]: | ||
"""Local API error.""" | ||
return api_json_response(api_error_dict(error)) | ||
|
||
|
||
def api_json_response(data: dict[str, Any]) -> Any: | ||
"""Return Local API error.""" | ||
return web.json_response(data, dumps=api_json_dumps) |
Oops, something went wrong.