-
-
Notifications
You must be signed in to change notification settings - Fork 29.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
husqvarna_automower_ble: Initial commit
Signed-off-by: Alistair Francis <alistair@alistair23.me>
- Loading branch information
1 parent
addc4a8
commit fe9ca25
Showing
20 changed files
with
988 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Validating CODEOWNERS rules …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"domain": "husqvarna", | ||
"name": "Husqvarna", | ||
"integrations": ["husqvarna_automower", "husqvarna_automower_ble"] | ||
} |
77 changes: 77 additions & 0 deletions
77
homeassistant/components/husqvarna_automower_ble/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
"""The Husqvarna Autoconnect Bluetooth integration.""" | ||
|
||
from __future__ import annotations | ||
|
||
import logging | ||
|
||
from automower_ble.mower import Mower | ||
from bleak import BleakError | ||
from bleak_retry_connector import close_stale_connections_by_address, get_device | ||
|
||
from homeassistant.components import bluetooth | ||
from homeassistant.config_entries import ConfigEntry | ||
from homeassistant.const import CONF_ADDRESS, CONF_CLIENT_ID, Platform | ||
from homeassistant.core import HomeAssistant | ||
from homeassistant.exceptions import ConfigEntryNotReady | ||
from homeassistant.helpers.device_registry import DeviceInfo | ||
|
||
from .const import DOMAIN, MANUFACTURER | ||
from .coordinator import HusqvarnaCoordinator | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
PLATFORMS = [ | ||
Platform.LAWN_MOWER, | ||
] | ||
|
||
|
||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: | ||
"""Set up Husqvarna Autoconnect Bluetooth from a config entry.""" | ||
address = entry.data[CONF_ADDRESS] | ||
channel_id = entry.data[CONF_CLIENT_ID] | ||
|
||
mower = Mower(channel_id, address) | ||
|
||
await close_stale_connections_by_address(address) | ||
|
||
LOGGER.debug("connecting to %s with channel ID %s", address, str(channel_id)) | ||
try: | ||
device = bluetooth.async_ble_device_from_address( | ||
hass, address, connectable=True | ||
) or await get_device(address) | ||
if not await mower.connect(device): | ||
raise ConfigEntryNotReady | ||
except (TimeoutError, BleakError) as exception: | ||
raise ConfigEntryNotReady( | ||
f"Unable to connect to device {address} due to {exception}" | ||
) from exception | ||
LOGGER.debug("connected and paired") | ||
|
||
model = await mower.get_model() | ||
LOGGER.info("Connected to Automower: %s", model) | ||
|
||
device_info = DeviceInfo( | ||
identifiers={(DOMAIN, str(address) + str(channel_id))}, | ||
manufacturer=MANUFACTURER, | ||
model=model, | ||
) | ||
|
||
coordinator = HusqvarnaCoordinator(hass, LOGGER, mower, device_info, address, model) | ||
|
||
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator | ||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) | ||
await coordinator.async_refresh() | ||
|
||
return True | ||
|
||
|
||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: | ||
"""Unload a config entry.""" | ||
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): | ||
try: | ||
coordinator: HusqvarnaCoordinator = hass.data[DOMAIN].pop(entry.entry_id) | ||
await coordinator.async_shutdown() | ||
except KeyError: | ||
return unload_ok | ||
|
||
return unload_ok |
116 changes: 116 additions & 0 deletions
116
homeassistant/components/husqvarna_automower_ble/config_flow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
"""Config flow for Husqvarna Bluetooth integration.""" | ||
|
||
from __future__ import annotations | ||
|
||
import logging | ||
import random | ||
from typing import Any | ||
|
||
from bleak import BleakError | ||
from automower_ble.mower import Mower | ||
import voluptuous as vol | ||
|
||
from homeassistant.components import bluetooth | ||
from homeassistant.components.bluetooth import BluetoothServiceInfo | ||
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult | ||
from homeassistant.const import CONF_ADDRESS, CONF_CLIENT_ID | ||
from homeassistant.data_entry_flow import AbortFlow | ||
|
||
from .const import DOMAIN | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
def _is_supported(discovery_info: BluetoothServiceInfo): | ||
"""Check if device is supported.""" | ||
|
||
_LOGGER.debug( | ||
"%s manufacturer data: %s", | ||
discovery_info.address, | ||
discovery_info.manufacturer_data, | ||
) | ||
|
||
return any(key == 1062 for key in discovery_info.manufacturer_data) | ||
|
||
|
||
class HusqvarnaAutomowerBleConfigFlow(ConfigFlow, domain=DOMAIN): | ||
"""Handle a config flow for Husqvarna Bluetooth.""" | ||
|
||
VERSION = 1 | ||
|
||
def __init__(self) -> None: | ||
"""Initialize the config flow.""" | ||
self.address: str | None | ||
|
||
async def async_step_bluetooth( | ||
self, discovery_info: BluetoothServiceInfo | ||
) -> ConfigFlowResult: | ||
"""Handle the bluetooth discovery step.""" | ||
|
||
_LOGGER.debug("Discovered device: %s", discovery_info) | ||
if not _is_supported(discovery_info): | ||
return self.async_abort(reason="no_devices_found") | ||
|
||
self.address = discovery_info.address | ||
await self.async_set_unique_id(self.address) | ||
self._abort_if_unique_id_configured() | ||
return await self.async_step_confirm() | ||
|
||
async def async_step_confirm( | ||
self, user_input: dict[str, Any] | None = None | ||
) -> ConfigFlowResult: | ||
"""Confirm discovery.""" | ||
assert self.address | ||
|
||
device = bluetooth.async_ble_device_from_address( | ||
self.hass, self.address, connectable=True | ||
) | ||
channel_id = random.randint(1, 0xFFFFFFFF) | ||
|
||
try: | ||
(manufacture, device_type, model) = await Mower( | ||
channel_id, self.address | ||
).probe_gatts(device) | ||
except (BleakError, TimeoutError) as exception: | ||
raise AbortFlow( | ||
"cannot_connect", description_placeholders={"error": str(exception)} | ||
) from exception | ||
|
||
title = manufacture + " " + device_type | ||
|
||
_LOGGER.info("Found device: %s", title) | ||
|
||
if user_input is not None: | ||
return self.async_create_entry( | ||
title=title, | ||
data={CONF_ADDRESS: self.address, CONF_CLIENT_ID: channel_id}, | ||
) | ||
|
||
self.context["title_placeholders"] = { | ||
"name": title, | ||
} | ||
|
||
self._set_confirm_only() | ||
return self.async_show_form( | ||
step_id="confirm", | ||
description_placeholders=self.context["title_placeholders"], | ||
) | ||
|
||
async def async_step_user( | ||
self, user_input: dict[str, Any] | None = None | ||
) -> ConfigFlowResult: | ||
"""Handle the initial step.""" | ||
if user_input is not None: | ||
self.address = user_input[CONF_ADDRESS] | ||
await self.async_set_unique_id(self.address, raise_on_progress=False) | ||
self._abort_if_unique_id_configured() | ||
return await self.async_step_confirm() | ||
|
||
return self.async_show_form( | ||
step_id="user", | ||
data_schema=vol.Schema( | ||
{ | ||
vol.Required(CONF_ADDRESS): str, | ||
}, | ||
), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
"""Constants for the Husqvarna Automower Bluetooth integration.""" | ||
|
||
DOMAIN = "husqvarna_automower_ble" | ||
MANUFACTURER = "Husqvarna" |
127 changes: 127 additions & 0 deletions
127
homeassistant/components/husqvarna_automower_ble/coordinator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
"""Provides the DataUpdateCoordinator.""" | ||
|
||
from __future__ import annotations | ||
|
||
from datetime import timedelta | ||
import logging | ||
from typing import Any | ||
|
||
from automower_ble.mower import Mower | ||
from bleak import BleakError | ||
from bleak_retry_connector import close_stale_connections_by_address | ||
|
||
from homeassistant.components import bluetooth | ||
from homeassistant.core import HomeAssistant | ||
from homeassistant.helpers.device_registry import DeviceInfo | ||
from homeassistant.helpers.update_coordinator import ( | ||
CoordinatorEntity, | ||
DataUpdateCoordinator, | ||
UpdateFailed, | ||
) | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
SCAN_INTERVAL = timedelta(seconds=60) | ||
|
||
|
||
class HusqvarnaCoordinator(DataUpdateCoordinator[dict[str, bytes]]): | ||
"""Class to manage fetching data.""" | ||
|
||
def __init__( | ||
self, | ||
hass: HomeAssistant, | ||
logger: logging.Logger, | ||
mower: Mower, | ||
device_info: DeviceInfo, | ||
address: str, | ||
model: str, | ||
) -> None: | ||
"""Initialize global data updater.""" | ||
super().__init__( | ||
hass=hass, | ||
logger=logger, | ||
name="Husqvarna Automower BLE Data Update Coordinator", | ||
update_interval=SCAN_INTERVAL, | ||
) | ||
self.address = address | ||
self.model = model | ||
self.mower = mower | ||
self.device_info = device_info | ||
|
||
async def async_shutdown(self) -> None: | ||
"""Shutdown coordinator and any connection.""" | ||
_LOGGER.debug("Shutdown") | ||
await super().async_shutdown() | ||
if self.mower.is_connected(): | ||
await self.mower.disconnect() | ||
|
||
async def _async_find_device(self): | ||
_LOGGER.debug("Trying to reconnect") | ||
await close_stale_connections_by_address(self.address) | ||
|
||
device = bluetooth.async_ble_device_from_address( | ||
self.hass, self.address, connectable=True | ||
) | ||
if not device: | ||
_LOGGER.error("Can't find device") | ||
raise UpdateFailed("Can't find device") | ||
|
||
try: | ||
if not await self.mower.connect(device): | ||
raise UpdateFailed("Failed to connect") | ||
except BleakError as err: | ||
raise UpdateFailed("Failed to connect") from err | ||
|
||
async def _async_update_data(self) -> dict[str, bytes]: | ||
"""Poll the device.""" | ||
_LOGGER.debug("Polling device") | ||
|
||
data: dict[str, bytes] = {} | ||
|
||
try: | ||
if not self.mower.is_connected(): | ||
await self._async_find_device() | ||
except BleakError as err: | ||
raise UpdateFailed("Failed to connect") from err | ||
|
||
try: | ||
data["battery_level"] = await self.mower.battery_level() | ||
_LOGGER.debug(data["battery_level"]) | ||
if data["battery_level"] is None: | ||
await self._async_find_device() | ||
raise UpdateFailed("Error getting data from device") | ||
|
||
data["activity"] = await self.mower.mower_activity() | ||
_LOGGER.debug(data["activity"]) | ||
if data["activity"] is None: | ||
await self._async_find_device() | ||
raise UpdateFailed("Error getting data from device") | ||
|
||
data["state"] = await self.mower.mower_state() | ||
_LOGGER.debug(data["state"]) | ||
if data["state"] is None: | ||
await self._async_find_device() | ||
raise UpdateFailed("Error getting data from device") | ||
|
||
except BleakError as err: | ||
_LOGGER.error("Error getting data from device") | ||
await self._async_find_device() | ||
raise UpdateFailed("Error getting data from device") from err | ||
|
||
return data | ||
|
||
|
||
class HusqvarnaAutomowerBleEntity(CoordinatorEntity[HusqvarnaCoordinator]): | ||
"""HusqvarnaCoordinator entity for Husqvarna Automower Bluetooth.""" | ||
|
||
_attr_has_entity_name = True | ||
|
||
def __init__(self, coordinator: HusqvarnaCoordinator, context: Any = None) -> None: | ||
"""Initialize coordinator entity.""" | ||
super().__init__(coordinator, context) | ||
self._attr_device_info = coordinator.device_info | ||
|
||
@property | ||
def available(self) -> bool: | ||
"""Return if entity is available.""" | ||
return self.coordinator.mower.is_connected() |
Oops, something went wrong.