Skip to content

Commit

Permalink
typecheck xknx.py
Browse files Browse the repository at this point in the history
  • Loading branch information
farmio committed Dec 26, 2020
1 parent d70f44c commit e7eb0b8
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 35 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Expand Up @@ -43,7 +43,7 @@ warn_unused_configs = true

# add the modules below once we add typing for them so that we fail the build in the future if someone changes something without updating the typings
# fully typechecked modules
[mypy-xknx.core.*,xknx.exceptions.*,xknx.io.*,xknx.knxip.*,xknx.telegram.*,]
[mypy-xknx.xknx,xknx.core.*,xknx.exceptions.*,xknx.io.*,xknx.knxip.*,xknx.telegram.*,]
strict = true
ignore_errors = false
warn_unreachable = true
Expand Down
19 changes: 19 additions & 0 deletions xknx/config/__init__.py
Expand Up @@ -17,3 +17,22 @@
WeatherSchema,
XKNXSchema,
)

__all__ = [
"Config",
"ConfigV1",
"BinarySensorSchema",
"ClimateSchema",
"ConnectionSchema",
"CoverSchema",
"DateTimeSchema",
"ExposeSchema",
"FanSchema",
"LightSchema",
"NotificationSchema",
"SceneSchema",
"SensorSchema",
"SwitchSchema",
"WeatherSchema",
"XKNXSchema",
]
12 changes: 8 additions & 4 deletions xknx/config/config.py
Expand Up @@ -6,10 +6,14 @@
"""
from enum import Enum
import logging
from typing import TYPE_CHECKING

from .config_v1 import ConfigV1
from .yaml_loader import load_yaml

if TYPE_CHECKING:
from xknx.xknx import XKNX

logger = logging.getLogger("xknx.log")


Expand All @@ -23,24 +27,24 @@ class Version(Enum):
class Config:
"""Class for parsing xknx.yaml."""

def __init__(self, xknx):
def __init__(self, xknx: "XKNX") -> None:
"""Initialize Config class."""
self.xknx = xknx

def read(self, file="xknx.yaml"):
def read(self, file: str = "xknx.yaml") -> None:
"""Read config."""
logger.debug("Reading %s", file)
doc = load_yaml(file)
self.parse(doc)

@staticmethod
def parse_version(doc):
def parse_version(doc) -> Version:
"""Parse the version of the xknx.yaml."""
if "version" in doc:
return Version(doc["version"])
return Version.VERSION_1

def parse(self, doc):
def parse(self, doc) -> None:
"""Parse the config from the YAML."""
version = Config.parse_version(doc)
if version is Version.VERSION_1:
Expand Down
2 changes: 2 additions & 0 deletions xknx/core/telegram_queue.py
Expand Up @@ -137,6 +137,8 @@ async def _process_all_telegrams(self) -> None:
while not self.xknx.telegrams.empty():
try:
telegram = self.xknx.telegrams.get_nowait()
if telegram is None:
return
if telegram.direction == TelegramDirection.INCOMING:
await self.process_telegram_incoming(telegram)
elif telegram.direction == TelegramDirection.OUTGOING:
Expand Down
23 changes: 23 additions & 0 deletions xknx/devices/__init__.py
Expand Up @@ -17,3 +17,26 @@
from .switch import Switch
from .travelcalculator import TravelCalculator, TravelStatus
from .weather import Weather

__all__ = [
"Action",
"ActionBase",
"ActionCallback",
"BinarySensor",
"Climate",
"ClimateMode",
"Cover",
"DateTime",
"Device",
"Devices",
"ExposeSensor",
"Fan",
"Light",
"Notification",
"Scene",
"Sensor",
"Switch",
"TravelCalculator",
"TravelStatus",
"Weather",
]
12 changes: 9 additions & 3 deletions xknx/devices/devices.py
Expand Up @@ -3,18 +3,24 @@
More or less an array with devices. Adds some search functionality to find devices.
"""
from typing import Awaitable, Callable

from xknx.telegram import Telegram

from .device import Device


class Devices:
"""Class for handling a vector/array of devices."""

def __init__(self):
def __init__(self) -> None:
"""Initialize Devices class."""
self.__devices = []
self.device_updated_cbs = []

def register_device_updated_cb(self, device_updated_cb):
def register_device_updated_cb(
self, device_updated_cb: Callable[[Device], Awaitable[None]]
) -> None:
"""Register callback for devices beeing updated."""
self.device_updated_cbs.append(device_updated_cb)

Expand Down Expand Up @@ -64,7 +70,7 @@ async def device_updated(self, device):
for device_updated_cb in self.device_updated_cbs:
await device_updated_cb(device)

async def process(self, telegram):
async def process(self, telegram: Telegram) -> None:
"""Process telegram."""
for device in self.devices_by_group_address(telegram.destination_address):
await device.process(telegram)
Expand Down
61 changes: 34 additions & 27 deletions xknx/xknx.py
Expand Up @@ -5,17 +5,19 @@
import os
import signal
from sys import platform
from types import TracebackType
from typing import Awaitable, Callable, Optional, Type, Union

from xknx.config import Config
from xknx.core import StateUpdater, TelegramQueue
from xknx.devices import Devices
from xknx.devices import Device, Devices
from xknx.io import (
DEFAULT_MCAST_GRP,
DEFAULT_MCAST_PORT,
ConnectionConfig,
KNXIPInterface,
)
from xknx.telegram import GroupAddressType, IndividualAddress
from xknx.telegram import GroupAddressType, IndividualAddress, Telegram

from .__version__ import __version__ as VERSION

Expand All @@ -32,27 +34,27 @@ class XKNX:

def __init__(
self,
config=None,
own_address=DEFAULT_ADDRESS,
address_format=GroupAddressType.LONG,
telegram_received_cb=None,
device_updated_cb=None,
rate_limit=DEFAULT_RATE_LIMIT,
multicast_group=DEFAULT_MCAST_GRP,
multicast_port=DEFAULT_MCAST_PORT,
log_directory=None,
state_updater=False,
daemon_mode=False,
connection_config=ConnectionConfig(),
):
config: Optional[str] = None,
own_address: Union[str, IndividualAddress] = DEFAULT_ADDRESS,
address_format: GroupAddressType = GroupAddressType.LONG,
telegram_received_cb: Optional[Callable[[Telegram], Awaitable[None]]] = None,
device_updated_cb: Optional[Callable[[Device], Awaitable[None]]] = None,
rate_limit: int = DEFAULT_RATE_LIMIT,
multicast_group: str = DEFAULT_MCAST_GRP,
multicast_port: int = DEFAULT_MCAST_PORT,
log_directory: Optional[str] = None,
state_updater: bool = False,
daemon_mode: bool = False,
connection_config: ConnectionConfig = ConnectionConfig(),
) -> None:
"""Initialize XKNX class."""
# pylint: disable=too-many-arguments
self.devices = Devices()
self.telegrams = asyncio.Queue()
self.telegrams: asyncio.Queue[Optional[Telegram]] = asyncio.Queue()
self.sigint_received = asyncio.Event()
self.telegram_queue = TelegramQueue(self)
self.state_updater = StateUpdater(self)
self.knxip_interface = None
self.knxip_interface: Optional[KNXIPInterface] = None
self.started = asyncio.Event()
self.connected = asyncio.Event()
self.address_format = address_format
Expand All @@ -77,7 +79,7 @@ def __init__(
if device_updated_cb is not None:
self.devices.register_device_updated_cb(device_updated_cb)

def __del__(self):
def __del__(self) -> None:
"""Destructor. Cleaning up if this was not done before."""
if self.started.is_set():
try:
Expand All @@ -86,16 +88,21 @@ def __del__(self):
except RuntimeError as exp:
logger.warning("Could not close loop, reason: %s", exp)

async def __aenter__(self):
async def __aenter__(self) -> "XKNX":
"""Start XKNX from context manager."""
await self.start()
return self

async def __aexit__(self, exc_type, exc, traceback):
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Stop XKNX from context manager."""
await self.stop()

async def start(self):
async def start(self) -> None:
"""Start XKNX module. Connect to KNX/IP devices and start state updater."""
self.knxip_interface = KNXIPInterface(
self, connection_config=self.connection_config
Expand All @@ -113,28 +120,28 @@ async def start(self):
if self.daemon_mode:
await self.loop_until_sigint()

async def join(self):
async def join(self) -> None:
"""Wait until all telegrams were processed."""
await self.telegrams.join()

async def _stop_knxip_interface_if_exists(self):
async def _stop_knxip_interface_if_exists(self) -> None:
"""Stop KNXIPInterface if initialized."""
if self.knxip_interface is not None:
await self.knxip_interface.stop()
self.knxip_interface = None

async def stop(self):
async def stop(self) -> None:
"""Stop XKNX module."""
self.state_updater.stop()
await self.join()
await self.telegram_queue.stop()
await self._stop_knxip_interface_if_exists()
self.started.clear()

async def loop_until_sigint(self):
async def loop_until_sigint(self) -> None:
"""Loop until Crtl-C was pressed."""

def sigint_handler():
def sigint_handler() -> None:
"""End loop."""
self.sigint_received.set()

Expand All @@ -147,7 +154,7 @@ def sigint_handler():
await self.sigint_received.wait()

@staticmethod
def setup_logging(log_directory: str):
def setup_logging(log_directory: str) -> None:
"""Configure logging to file."""
if not os.path.isdir(log_directory):
logger.warning("The provided log directory does not exist.")
Expand Down

0 comments on commit e7eb0b8

Please sign in to comment.