Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert dhcp watcher to asyncio #109938

Merged
merged 19 commits into from Feb 9, 2024
242 changes: 39 additions & 203 deletions homeassistant/components/dhcp/__init__.py
Expand Up @@ -3,28 +3,24 @@

from abc import ABC, abstractmethod
import asyncio
from collections.abc import Callable, Iterable
import contextlib
from collections.abc import Callable
from dataclasses import dataclass
from datetime import timedelta
from fnmatch import translate
from functools import lru_cache
import itertools
import logging
import os
import re
import threading
from typing import TYPE_CHECKING, Any, Final, cast
from typing import Any, Final

import aiodhcpwatcher
from aiodiscover import DiscoverHosts
from aiodiscover.discovery import (
HOSTNAME as DISCOVERY_HOSTNAME,
IP_ADDRESS as DISCOVERY_IP_ADDRESS,
MAC_ADDRESS as DISCOVERY_MAC_ADDRESS,
)
from cached_ipaddress import cached_ip_addresses
from scapy.config import conf
from scapy.error import Scapy_Exception

from homeassistant import config_entries
from homeassistant.components.device_tracker import (
Expand Down Expand Up @@ -61,20 +57,13 @@

from .const import DOMAIN

if TYPE_CHECKING:
from scapy.packet import Packet
from scapy.sendrecv import AsyncSniffer

CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)

FILTER = "udp and (port 67 or 68)"
REQUESTED_ADDR = "requested_addr"
MESSAGE_TYPE = "message-type"
HOSTNAME: Final = "hostname"
MAC_ADDRESS: Final = "macaddress"
IP_ADDRESS: Final = "ip"
REGISTERED_DEVICES: Final = "registered_devices"
DHCP_REQUEST = 3
SCAN_INTERVAL = timedelta(minutes=60)


Expand Down Expand Up @@ -144,22 +133,25 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
# everything else starts up or we will miss events
for passive_cls in (DeviceTrackerRegisteredWatcher, DeviceTrackerWatcher):
passive_watcher = passive_cls(hass, address_data, integration_matchers)
await passive_watcher.async_start()
passive_watcher.async_start()
watchers.append(passive_watcher)

async def _initialize(event: Event) -> None:
async def _async_initialize(event: Event) -> None:
await aiodhcpwatcher.async_init()

for active_cls in (DHCPWatcher, NetworkWatcher):
active_watcher = active_cls(hass, address_data, integration_matchers)
await active_watcher.async_start()
active_watcher.async_start()
watchers.append(active_watcher)

async def _async_stop(event: Event) -> None:
@callback
def _async_stop(event: Event) -> None:
for watcher in watchers:
await watcher.async_stop()
watcher.async_stop()

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop)

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _initialize)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _async_initialize)
return True


Expand All @@ -178,21 +170,20 @@ def __init__(
self.hass = hass
self._integration_matchers = integration_matchers
self._address_data = address_data
self._unsub: Callable[[], None] | None = None

@abstractmethod
async def async_stop(self) -> None:
"""Stop the watcher."""
@callback
def async_stop(self) -> None:
"""Stop scanning for new devices on the network."""
if self._unsub:
self._unsub()
self._unsub = None

@abstractmethod
async def async_start(self) -> None:
@callback
def async_start(self) -> None:
"""Start the watcher."""

def process_client(self, ip_address: str, hostname: str, mac_address: str) -> None:
"""Process a client."""
self.hass.loop.call_soon_threadsafe(
self.async_process_client, ip_address, hostname, mac_address
)

@callback
def async_process_client(
self, ip_address: str, hostname: str, mac_address: str
Expand Down Expand Up @@ -291,20 +282,19 @@ def __init__(
) -> None:
"""Initialize class."""
super().__init__(hass, address_data, integration_matchers)
self._unsub: Callable[[], None] | None = None
self._discover_hosts: DiscoverHosts | None = None
self._discover_task: asyncio.Task | None = None

async def async_stop(self) -> None:
@callback
def async_stop(self) -> None:
"""Stop scanning for new devices on the network."""
if self._unsub:
self._unsub()
self._unsub = None
super().async_stop()
if self._discover_task:
self._discover_task.cancel()
self._discover_task = None

async def async_start(self) -> None:
@callback
def async_start(self) -> None:
"""Start scanning for new devices on the network."""
self._discover_hosts = DiscoverHosts()
self._unsub = async_track_time_interval(
Expand Down Expand Up @@ -336,23 +326,8 @@ async def async_discover(self) -> None:
class DeviceTrackerWatcher(WatcherBase):
"""Class to watch dhcp data from routers."""

def __init__(
self,
hass: HomeAssistant,
address_data: dict[str, dict[str, str]],
integration_matchers: DhcpMatchers,
) -> None:
"""Initialize class."""
super().__init__(hass, address_data, integration_matchers)
self._unsub: Callable[[], None] | None = None

async def async_stop(self) -> None:
"""Stop watching for new device trackers."""
if self._unsub:
self._unsub()
self._unsub = None

async def async_start(self) -> None:
@callback
def async_start(self) -> None:
"""Stop watching for new device trackers."""
self._unsub = async_track_state_added_domain(
self.hass, [DEVICE_TRACKER_DOMAIN], self._async_process_device_event
Expand Down Expand Up @@ -391,23 +366,8 @@ def _async_process_device_state(self, state: State | None) -> None:
class DeviceTrackerRegisteredWatcher(WatcherBase):
"""Class to watch data from device tracker registrations."""

def __init__(
self,
hass: HomeAssistant,
address_data: dict[str, dict[str, str]],
integration_matchers: DhcpMatchers,
) -> None:
"""Initialize class."""
super().__init__(hass, address_data, integration_matchers)
self._unsub: Callable[[], None] | None = None

async def async_stop(self) -> None:
"""Stop watching for device tracker registrations."""
if self._unsub:
self._unsub()
self._unsub = None

async def async_start(self) -> None:
@callback
def async_start(self) -> None:
"""Stop watching for device tracker registrations."""
self._unsub = async_dispatcher_connect(
self.hass, CONNECTED_DEVICE_REGISTERED, self._async_process_device_data
Expand All @@ -429,148 +389,24 @@ def _async_process_device_data(self, data: dict[str, str | None]) -> None:
class DHCPWatcher(WatcherBase):
"""Class to watch dhcp requests."""

def __init__(
self,
hass: HomeAssistant,
address_data: dict[str, dict[str, str]],
integration_matchers: DhcpMatchers,
) -> None:
"""Initialize class."""
super().__init__(hass, address_data, integration_matchers)
self._sniffer: AsyncSniffer | None = None
self._started = threading.Event()

async def async_stop(self) -> None:
"""Stop watching for new device trackers."""
await self.hass.async_add_executor_job(self._stop)

def _stop(self) -> None:
"""Stop the thread."""
if self._started.is_set():
assert self._sniffer is not None
self._sniffer.stop()

async def async_start(self) -> None:
"""Start watching for dhcp packets."""
await self.hass.async_add_executor_job(self._start)

def _start(self) -> None:
"""Start watching for dhcp packets."""
# Local import because importing from scapy has side effects such as opening
# sockets
from scapy import arch # pylint: disable=import-outside-toplevel # noqa: F401
from scapy.layers.dhcp import DHCP # pylint: disable=import-outside-toplevel
from scapy.layers.inet import IP # pylint: disable=import-outside-toplevel
from scapy.layers.l2 import Ether # pylint: disable=import-outside-toplevel

#
# Importing scapy.sendrecv will cause a scapy resync which will
# import scapy.arch.read_routes which will import scapy.sendrecv
#
# We avoid this circular import by importing arch above to ensure
# the module is loaded and avoid the problem
#
from scapy.sendrecv import ( # pylint: disable=import-outside-toplevel
AsyncSniffer,
)

def _handle_dhcp_packet(packet: Packet) -> None:
"""Process a dhcp packet."""
if DHCP not in packet:
return

options_dict = _dhcp_options_as_dict(packet[DHCP].options)
if options_dict.get(MESSAGE_TYPE) != DHCP_REQUEST:
# Not a DHCP request
return

ip_address = options_dict.get(REQUESTED_ADDR) or cast(str, packet[IP].src)
assert isinstance(ip_address, str)
hostname = ""
if (hostname_bytes := options_dict.get(HOSTNAME)) and isinstance(
hostname_bytes, bytes
):
with contextlib.suppress(AttributeError, UnicodeDecodeError):
hostname = hostname_bytes.decode()
mac_address = _format_mac(cast(str, packet[Ether].src))

if ip_address is not None and mac_address is not None:
self.process_client(ip_address, hostname, mac_address)

# disable scapy promiscuous mode as we do not need it
conf.sniff_promisc = 0

try:
_verify_l2socket_setup(FILTER)
except (Scapy_Exception, OSError) as ex:
if os.geteuid() == 0:
_LOGGER.error("Cannot watch for dhcp packets: %s", ex)
else:
_LOGGER.debug(
"Cannot watch for dhcp packets without root or CAP_NET_RAW: %s", ex
)
return

try:
_verify_working_pcap(FILTER)
except (Scapy_Exception, ImportError) as ex:
_LOGGER.error(
"Cannot watch for dhcp packets without a functional packet filter: %s",
ex,
)
return

self._sniffer = AsyncSniffer(
filter=FILTER,
started_callback=self._started.set,
prn=_handle_dhcp_packet,
store=0,
@callback
def _async_process_dhcp_request(self, response: aiodhcpwatcher.DHCPRequest) -> None:
"""Process a dhcp request."""
self.async_process_client(
response.ip_address, response.hostname, _format_mac(response.mac_address)
)

self._sniffer.start()
if self._sniffer.thread:
self._sniffer.thread.name = self.__class__.__name__


def _dhcp_options_as_dict(
dhcp_options: Iterable[tuple[str, int | bytes | None]],
) -> dict[str, str | int | bytes | None]:
"""Extract data from packet options as a dict."""
return {option[0]: option[1] for option in dhcp_options if len(option) >= 2}
@callback
def async_start(self) -> None:
"""Start watching for dhcp packets."""
self._unsub = aiodhcpwatcher.start(self._async_process_dhcp_request)


def _format_mac(mac_address: str) -> str:
"""Format a mac address for matching."""
return format_mac(mac_address).replace(":", "")


def _verify_l2socket_setup(cap_filter: str) -> None:
"""Create a socket using the scapy configured l2socket.

Try to create the socket
to see if we have permissions
since AsyncSniffer will do it another
thread so we will not be able to capture
any permission or bind errors.
"""
conf.L2socket(filter=cap_filter)


def _verify_working_pcap(cap_filter: str) -> None:
"""Verify we can create a packet filter.

If we cannot create a filter we will be listening for
all traffic which is too intensive.
"""
# Local import because importing from scapy has side effects such as opening
# sockets
from scapy.arch.common import ( # pylint: disable=import-outside-toplevel
compile_filter,
)

compile_filter(cap_filter)


@lru_cache(maxsize=4096, typed=True)
def _compile_fnmatch(pattern: str) -> re.Pattern:
"""Compile a fnmatch pattern."""
Expand Down
10 changes: 8 additions & 2 deletions homeassistant/components/dhcp/manifest.json
Expand Up @@ -5,10 +5,16 @@
"documentation": "https://www.home-assistant.io/integrations/dhcp",
"integration_type": "system",
"iot_class": "local_push",
"loggers": ["aiodiscover", "dnspython", "pyroute2", "scapy"],
"loggers": [
"aiodiscover",
"aiodhcpwatcher",
"dnspython",
"pyroute2",
"scapy"
],
"quality_scale": "internal",
"requirements": [
"scapy==2.5.0",
"aiodhcpwatcher==0.8.0",
"aiodiscover==1.6.1",
"cached_ipaddress==0.3.0"
]
Expand Down
2 changes: 1 addition & 1 deletion homeassistant/package_constraints.txt
@@ -1,5 +1,6 @@
# Automatically generated by gen_requirements_all.py, do not edit

aiodhcpwatcher==0.8.0
aiodiscover==1.6.1
aiohttp-fast-url-dispatcher==0.3.0
aiohttp-zlib-ng==0.3.1
Expand Down Expand Up @@ -51,7 +52,6 @@ PyTurboJPEG==1.7.1
pyudev==0.23.2
PyYAML==6.0.1
requests==2.31.0
scapy==2.5.0
SQLAlchemy==2.0.25
typing-extensions>=4.9.0,<5.0
ulid-transform==0.9.0
Expand Down