From 9e69c2a056d4d96c57d5dfdd8404683ee13f5410 Mon Sep 17 00:00:00 2001 From: zariiii9003 Date: Fri, 10 Oct 2025 11:28:50 +0200 Subject: [PATCH] drop support for Python 3.9 --- .github/dependabot.yml | 5 +- .github/workflows/ci.yml | 1 - README.rst | 1 + can/_entry_points.py | 21 ++------ can/broadcastmanager.py | 37 ++++++------- can/bus.py | 43 +++++++-------- can/cli.py | 26 ++++----- can/ctypesutil.py | 7 +-- can/exceptions.py | 5 +- can/interface.py | 10 ++-- can/interfaces/canalystii.py | 20 +++---- can/interfaces/cantact.py | 12 ++--- can/interfaces/etas/__init__.py | 14 +++-- can/interfaces/gs_usb.py | 7 +-- can/interfaces/iscan.py | 9 ++-- can/interfaces/ixxat/canlib.py | 31 ++++++----- can/interfaces/ixxat/canlib_vcinpl.py | 13 +++-- can/interfaces/ixxat/canlib_vcinpl2.py | 27 +++++----- can/interfaces/kvaser/canlib.py | 5 +- can/interfaces/nican.py | 11 ++-- can/interfaces/nixnet.py | 16 +++--- can/interfaces/pcan/pcan.py | 14 +++-- can/interfaces/robotell.py | 3 +- can/interfaces/serial/serial_can.py | 8 ++- can/interfaces/slcan.py | 26 ++++----- can/interfaces/socketcan/socketcan.py | 37 ++++++------- can/interfaces/socketcan/utils.py | 5 +- can/interfaces/udp_multicast/bus.py | 18 +++---- can/interfaces/udp_multicast/utils.py | 4 +- can/interfaces/usb2can/usb2canInterface.py | 9 ++-- can/interfaces/vector/canlib.py | 61 ++++++++++------------ can/interfaces/vector/exceptions.py | 6 +-- can/interfaces/virtual.py | 8 ++- can/io/asc.py | 14 ++--- can/io/blf.py | 18 +++---- can/io/canutils.py | 10 ++-- can/io/csv.py | 6 +-- can/io/generic.py | 42 +++++++-------- can/io/logger.py | 15 +++--- can/io/mf4.py | 10 ++-- can/io/printer.py | 4 +- can/io/sqlite.py | 4 +- can/io/trc.py | 32 ++++++------ can/listener.py | 11 +--- can/logger.py | 3 +- can/message.py | 14 ++--- can/notifier.py | 23 ++++---- can/thread_safe_bus.py | 18 +++---- can/typechecking.py | 25 ++++----- can/util.py | 31 +++++------ can/viewer.py | 4 +- doc/changelog.d/1996.removed.md | 1 + pyproject.toml | 12 ++--- tox.ini | 2 +- 54 files changed, 363 insertions(+), 456 deletions(-) create mode 100644 doc/changelog.d/1996.removed.md diff --git a/.github/dependabot.yml b/.github/dependabot.yml index e2781e2ef..dbe907783 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -5,11 +5,12 @@ version: 2 updates: - - package-ecosystem: "uv" + - package-ecosystem: "pip" # Enable version updates for development dependencies directory: "/" schedule: interval: "monthly" + versioning-strategy: "increase-if-necessary" groups: dev-deps: patterns: @@ -23,4 +24,4 @@ updates: groups: github-actions: patterns: - - "*" \ No newline at end of file + - "*" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5078a33ee..12022ba35 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,6 @@ jobs: matrix: os: [ubuntu-latest, macos-latest, windows-latest] env: [ - "py39", "py310", "py311", "py312", diff --git a/README.rst b/README.rst index 2579871b9..6e75d8d7d 100644 --- a/README.rst +++ b/README.rst @@ -61,6 +61,7 @@ Library Version Python 4.0+ 3.7+ 4.3+ 3.8+ 4.6+ 3.9+ + main branch 3.10+ ============================== =========== diff --git a/can/_entry_points.py b/can/_entry_points.py index 6320b797b..fd1a62d24 100644 --- a/can/_entry_points.py +++ b/can/_entry_points.py @@ -1,5 +1,4 @@ import importlib -import sys from dataclasses import dataclass from importlib.metadata import entry_points from typing import Any @@ -16,19 +15,7 @@ def load(self) -> Any: return getattr(module, self.class_name) -# See https://docs.python.org/3/library/importlib.metadata.html#entry-points, -# "Compatibility Note". -if sys.version_info >= (3, 10): - - def read_entry_points(group: str) -> list[_EntryPoint]: - return [ - _EntryPoint(ep.name, ep.module, ep.attr) for ep in entry_points(group=group) - ] - -else: - - def read_entry_points(group: str) -> list[_EntryPoint]: - return [ - _EntryPoint(ep.name, *ep.value.split(":", maxsplit=1)) - for ep in entry_points().get(group, []) # pylint: disable=no-member - ] +def read_entry_points(group: str) -> list[_EntryPoint]: + return [ + _EntryPoint(ep.name, ep.module, ep.attr) for ep in entry_points(group=group) + ] diff --git a/can/broadcastmanager.py b/can/broadcastmanager.py index a71f6fd11..1fea9ac50 100644 --- a/can/broadcastmanager.py +++ b/can/broadcastmanager.py @@ -12,13 +12,10 @@ import threading import time import warnings -from collections.abc import Sequence +from collections.abc import Callable, Sequence from typing import ( TYPE_CHECKING, - Callable, Final, - Optional, - Union, cast, ) @@ -78,7 +75,7 @@ def wait_inf(self, event: _Pywin32Event) -> None: ) -PYWIN32: Optional[_Pywin32] = None +PYWIN32: _Pywin32 | None = None if sys.platform == "win32" and sys.version_info < (3, 11): try: PYWIN32 = _Pywin32() @@ -105,9 +102,7 @@ class CyclicSendTaskABC(CyclicTask, abc.ABC): Message send task with defined period """ - def __init__( - self, messages: Union[Sequence[Message], Message], period: float - ) -> None: + def __init__(self, messages: Sequence[Message] | Message, period: float) -> None: """ :param messages: The messages to be sent periodically. @@ -125,7 +120,7 @@ def __init__( @staticmethod def _check_and_convert_messages( - messages: Union[Sequence[Message], Message], + messages: Sequence[Message] | Message, ) -> tuple[Message, ...]: """Helper function to convert a Message or Sequence of messages into a tuple, and raises an error when the given value is invalid. @@ -164,9 +159,9 @@ def _check_and_convert_messages( class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC): def __init__( self, - messages: Union[Sequence[Message], Message], + messages: Sequence[Message] | Message, period: float, - duration: Optional[float], + duration: float | None, ) -> None: """Message send task with a defined duration and period. @@ -181,7 +176,7 @@ def __init__( """ super().__init__(messages, period) self.duration = duration - self.end_time: Optional[float] = None + self.end_time: float | None = None class RestartableCyclicTaskABC(CyclicSendTaskABC, abc.ABC): @@ -215,7 +210,7 @@ def _check_modified_messages(self, messages: tuple[Message, ...]) -> None: "from when the task was created" ) - def modify_data(self, messages: Union[Sequence[Message], Message]) -> None: + def modify_data(self, messages: Sequence[Message] | Message) -> None: """Update the contents of the periodically sent messages, without altering the timing. @@ -242,7 +237,7 @@ class MultiRateCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC): def __init__( self, channel: typechecking.Channel, - messages: Union[Sequence[Message], Message], + messages: Sequence[Message] | Message, count: int, # pylint: disable=unused-argument initial_period: float, # pylint: disable=unused-argument subsequent_period: float, @@ -272,12 +267,12 @@ def __init__( self, bus: "BusABC", lock: threading.Lock, - messages: Union[Sequence[Message], Message], + messages: Sequence[Message] | Message, period: float, - duration: Optional[float] = None, - on_error: Optional[Callable[[Exception], bool]] = None, + duration: float | None = None, + on_error: Callable[[Exception], bool] | None = None, autostart: bool = True, - modifier_callback: Optional[Callable[[Message], None]] = None, + modifier_callback: Callable[[Message], None] | None = None, ) -> None: """Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`. @@ -298,13 +293,13 @@ def __init__( self.bus = bus self.send_lock = lock self.stopped = True - self.thread: Optional[threading.Thread] = None + self.thread: threading.Thread | None = None self.on_error = on_error self.modifier_callback = modifier_callback self.period_ms = int(round(period * 1000, 0)) - self.event: Optional[_Pywin32Event] = None + self.event: _Pywin32Event | None = None if PYWIN32: if self.period_ms == 0: # A period of 0 would mean that the timer is signaled only once @@ -338,7 +333,7 @@ def start(self) -> None: self.thread = threading.Thread(target=self._run, name=name) self.thread.daemon = True - self.end_time: Optional[float] = ( + self.end_time: float | None = ( time.perf_counter() + self.duration if self.duration else None ) diff --git a/can/bus.py b/can/bus.py index ec9eb09b7..03425caaa 100644 --- a/can/bus.py +++ b/can/bus.py @@ -6,14 +6,11 @@ import logging import threading from abc import ABC, abstractmethod -from collections.abc import Iterator, Sequence +from collections.abc import Callable, Iterator, Sequence from enum import Enum, auto from time import time from types import TracebackType from typing import ( - Callable, - Optional, - Union, cast, ) @@ -68,7 +65,7 @@ class BusABC(ABC): def __init__( self, channel: can.typechecking.Channel, - can_filters: Optional[can.typechecking.CanFilters] = None, + can_filters: can.typechecking.CanFilters | None = None, **kwargs: object, ): """Construct and open a CAN bus instance of the specified type. @@ -101,7 +98,7 @@ def __init__( def __str__(self) -> str: return self.channel_info - def recv(self, timeout: Optional[float] = None) -> Optional[Message]: + def recv(self, timeout: float | None = None) -> Message | None: """Block waiting for a message from the Bus. :param timeout: @@ -139,9 +136,7 @@ def recv(self, timeout: Optional[float] = None) -> Optional[Message]: return None - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: """ Read a message from the bus and tell whether it was filtered. This methods may be called by :meth:`~can.BusABC.recv` @@ -184,7 +179,7 @@ def _recv_internal( raise NotImplementedError("Trying to read from a write only bus?") @abstractmethod - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: """Transmit a message to the CAN bus. Override this method to enable the transmit path. @@ -205,12 +200,12 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: def send_periodic( self, - msgs: Union[Message, Sequence[Message]], + msgs: Message | Sequence[Message], period: float, - duration: Optional[float] = None, + duration: float | None = None, store_task: bool = True, autostart: bool = True, - modifier_callback: Optional[Callable[[Message], None]] = None, + modifier_callback: Callable[[Message], None] | None = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Start sending messages at a given period on this bus. @@ -297,11 +292,11 @@ def wrapped_stop_method(remove_task: bool = True) -> None: def _send_periodic_internal( self, - msgs: Union[Sequence[Message], Message], + msgs: Sequence[Message] | Message, period: float, - duration: Optional[float] = None, + duration: float | None = None, autostart: bool = True, - modifier_callback: Optional[Callable[[Message], None]] = None, + modifier_callback: Callable[[Message], None] | None = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Default implementation of periodic message sending using threading. @@ -378,7 +373,7 @@ def __iter__(self) -> Iterator[Message]: yield msg @property - def filters(self) -> Optional[can.typechecking.CanFilters]: + def filters(self) -> can.typechecking.CanFilters | None: """ Modify the filters of this bus. See :meth:`~can.BusABC.set_filters` for details. @@ -386,12 +381,10 @@ def filters(self) -> Optional[can.typechecking.CanFilters]: return self._filters @filters.setter - def filters(self, filters: Optional[can.typechecking.CanFilters]) -> None: + def filters(self, filters: can.typechecking.CanFilters | None) -> None: self.set_filters(filters) - def set_filters( - self, filters: Optional[can.typechecking.CanFilters] = None - ) -> None: + def set_filters(self, filters: can.typechecking.CanFilters | None = None) -> None: """Apply filtering to all messages received by this Bus. All messages that match at least one filter are returned. @@ -417,7 +410,7 @@ def set_filters( with contextlib.suppress(NotImplementedError): self._apply_filters(self._filters) - def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None: + def _apply_filters(self, filters: can.typechecking.CanFilters | None) -> None: """ Hook for applying the filters to the underlying kernel or hardware if supported/implemented by the interface. @@ -484,9 +477,9 @@ def __enter__(self) -> Self: def __exit__( self, - exc_type: Optional[type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, ) -> None: self.shutdown() diff --git a/can/cli.py b/can/cli.py index 6e3850354..d0ff70126 100644 --- a/can/cli.py +++ b/can/cli.py @@ -1,7 +1,7 @@ import argparse import re from collections.abc import Sequence -from typing import Any, Optional, Union +from typing import Any import can from can.typechecking import CanFilter, TAdditionalCliArgs @@ -12,8 +12,8 @@ def add_bus_arguments( parser: argparse.ArgumentParser, *, filter_arg: bool = False, - prefix: Optional[str] = None, - group_title: Optional[str] = None, + prefix: str | None = None, + group_title: str | None = None, ) -> None: """Adds CAN bus configuration options to an argument parser. @@ -144,7 +144,7 @@ def add_bus_arguments( def create_bus_from_namespace( namespace: argparse.Namespace, *, - prefix: Optional[str] = None, + prefix: str | None = None, **kwargs: Any, ) -> can.BusABC: """Creates and returns a CAN bus instance based on the provided namespace and arguments. @@ -192,8 +192,8 @@ def __call__( self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, - values: Union[str, Sequence[Any], None], - option_string: Optional[str] = None, + values: str | Sequence[Any] | None, + option_string: str | None = None, ) -> None: if not isinstance(values, list): raise argparse.ArgumentError(self, "Invalid filter argument") @@ -222,8 +222,8 @@ def __call__( self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, - values: Union[str, Sequence[Any], None], - option_string: Optional[str] = None, + values: str | Sequence[Any] | None, + option_string: str | None = None, ) -> None: if not isinstance(values, list): raise argparse.ArgumentError(self, "Invalid --timing argument") @@ -252,13 +252,13 @@ def __call__( self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, - values: Union[str, Sequence[Any], None], - option_string: Optional[str] = None, + values: str | Sequence[Any] | None, + option_string: str | None = None, ) -> None: if not isinstance(values, list): raise argparse.ArgumentError(self, "Invalid --bus-kwargs argument") - bus_kwargs: dict[str, Union[str, int, float, bool]] = {} + bus_kwargs: dict[str, str | int | float | bool] = {} for arg in values: try: @@ -281,7 +281,7 @@ def __call__( def _add_extra_args( - parser: Union[argparse.ArgumentParser, argparse._ArgumentGroup], + parser: argparse.ArgumentParser | argparse._ArgumentGroup, ) -> None: parser.add_argument( "extra_args", @@ -301,7 +301,7 @@ def _split_arg(_arg: str) -> tuple[str, str]: left, right = _arg.split("=", 1) return left.lstrip("-").replace("-", "_"), right - args: dict[str, Union[str, int, float, bool]] = {} + args: dict[str, str | int | float | bool] = {} for key, string_val in map(_split_arg, unknown_args): args[key] = cast_from_string(string_val) return args diff --git a/can/ctypesutil.py b/can/ctypesutil.py index 8336941be..0798de910 100644 --- a/can/ctypesutil.py +++ b/can/ctypesutil.py @@ -5,7 +5,8 @@ import ctypes import logging import sys -from typing import Any, Callable, Optional, Union +from collections.abc import Callable +from typing import Any log = logging.getLogger("can.ctypesutil") @@ -20,7 +21,7 @@ class CLibrary(_LibBase): - def __init__(self, library_or_path: Union[str, ctypes.CDLL]) -> None: + def __init__(self, library_or_path: str | ctypes.CDLL) -> None: self.func_name: Any if isinstance(library_or_path, str): @@ -33,7 +34,7 @@ def map_symbol( func_name: str, restype: Any = None, argtypes: tuple[Any, ...] = (), - errcheck: Optional[Callable[..., Any]] = None, + errcheck: Callable[..., Any] | None = None, ) -> Any: """ Map and return a symbol (function) from a C library. A reference to the diff --git a/can/exceptions.py b/can/exceptions.py index 8abc75147..696701399 100644 --- a/can/exceptions.py +++ b/can/exceptions.py @@ -17,7 +17,6 @@ from collections.abc import Generator from contextlib import contextmanager -from typing import Optional class CanError(Exception): @@ -51,7 +50,7 @@ class CanError(Exception): def __init__( self, message: str = "", - error_code: Optional[int] = None, + error_code: int | None = None, ) -> None: self.error_code = error_code super().__init__( @@ -108,7 +107,7 @@ class CanTimeoutError(CanError, TimeoutError): @contextmanager def error_check( - error_message: Optional[str] = None, + error_message: str | None = None, exception_type: type[CanError] = CanOperationError, ) -> Generator[None, None, None]: """Catches any exceptions and turns them into the new type while preserving the stack trace.""" diff --git a/can/interface.py b/can/interface.py index 4aa010a36..efde5b214 100644 --- a/can/interface.py +++ b/can/interface.py @@ -8,7 +8,7 @@ import importlib import logging from collections.abc import Callable, Iterable, Sequence -from typing import Any, Optional, Union, cast +from typing import Any, cast from . import util from .bus import BusABC @@ -64,9 +64,9 @@ def _get_class_for_interface(interface: str) -> type[BusABC]: context="config_context", ) def Bus( # noqa: N802 - channel: Optional[Channel] = None, - interface: Optional[str] = None, - config_context: Optional[str] = None, + channel: Channel | None = None, + interface: str | None = None, + config_context: str | None = None, ignore_config: bool = False, **kwargs: Any, ) -> BusABC: @@ -140,7 +140,7 @@ def Bus( # noqa: N802 def detect_available_configs( - interfaces: Union[None, str, Iterable[str]] = None, + interfaces: None | str | Iterable[str] = None, timeout: float = 5.0, ) -> Sequence[AutoDetectedConfig]: """Detect all configurations/channels that the interfaces could diff --git a/can/interfaces/canalystii.py b/can/interfaces/canalystii.py index d85211130..e2bf7555e 100644 --- a/can/interfaces/canalystii.py +++ b/can/interfaces/canalystii.py @@ -3,7 +3,7 @@ from collections import deque from collections.abc import Sequence from ctypes import c_ubyte -from typing import Any, Optional, Union +from typing import Any import canalystii as driver @@ -21,12 +21,12 @@ class CANalystIIBus(BusABC): ) def __init__( self, - channel: Union[int, Sequence[int], str] = (0, 1), + channel: int | Sequence[int] | str = (0, 1), device: int = 0, - bitrate: Optional[int] = None, - timing: Optional[Union[BitTiming, BitTimingFd]] = None, - can_filters: Optional[CanFilters] = None, - rx_queue_size: Optional[int] = None, + bitrate: int | None = None, + timing: BitTiming | BitTimingFd | None = None, + can_filters: CanFilters | None = None, + rx_queue_size: int | None = None, **kwargs: dict[str, Any], ): """ @@ -94,7 +94,7 @@ def __init__( # system. RX_POLL_DELAY = 0.020 - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: """Send a CAN message to the bus :param msg: message to send @@ -166,8 +166,8 @@ def poll_received_messages(self) -> None: ) def _recv_internal( - self, timeout: Optional[float] = None - ) -> tuple[Optional[Message], bool]: + self, timeout: float | None = None + ) -> tuple[Message | None, bool]: """ :param timeout: float in seconds @@ -194,7 +194,7 @@ def _recv_internal( return (None, False) - def flush_tx_buffer(self, channel: Optional[int] = None) -> None: + def flush_tx_buffer(self, channel: int | None = None) -> None: """Flush the TX buffer of the device. :param channel: diff --git a/can/interfaces/cantact.py b/can/interfaces/cantact.py index 08fe15b72..ee01fbf94 100644 --- a/can/interfaces/cantact.py +++ b/can/interfaces/cantact.py @@ -5,7 +5,7 @@ import logging import time from collections.abc import Sequence -from typing import Any, Optional, Union +from typing import Any from unittest.mock import Mock from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message @@ -56,7 +56,7 @@ def __init__( bitrate: int = 500_000, poll_interval: float = 0.01, monitor: bool = False, - timing: Optional[Union[BitTiming, BitTimingFd]] = None, + timing: BitTiming | BitTimingFd | None = None, **kwargs: Any, ) -> None: """ @@ -123,9 +123,7 @@ def __init__( **kwargs, ) - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: if timeout is None: raise TypeError( f"{self.__class__.__name__} expects a numeric `timeout` value." @@ -149,7 +147,7 @@ def _recv_internal( ) return msg, False - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: with error_check("Cannot send message"): self.interface.send( self.channel, @@ -166,7 +164,7 @@ def shutdown(self) -> None: self.interface.stop() -def mock_recv(timeout: int) -> Optional[dict[str, Any]]: +def mock_recv(timeout: int) -> dict[str, Any] | None: if timeout > 0: return { "id": 0x123, diff --git a/can/interfaces/etas/__init__.py b/can/interfaces/etas/__init__.py index 9d4d0bd2a..f8364a3fd 100644 --- a/can/interfaces/etas/__init__.py +++ b/can/interfaces/etas/__init__.py @@ -1,5 +1,5 @@ import time -from typing import Optional +from typing import Any import can from can.exceptions import CanInitializationError @@ -11,12 +11,12 @@ class EtasBus(can.BusABC): def __init__( self, channel: str, - can_filters: Optional[can.typechecking.CanFilters] = None, + can_filters: can.typechecking.CanFilters | None = None, receive_own_messages: bool = False, bitrate: int = 1000000, fd: bool = True, data_bitrate: int = 2000000, - **kwargs: dict[str, any], + **kwargs: dict[str, Any], ): self.receive_own_messages = receive_own_messages self._can_protocol = can.CanProtocol.CAN_FD if fd else can.CanProtocol.CAN_20 @@ -120,9 +120,7 @@ def __init__( # Super call must be after child init since super calls set_filters super().__init__(channel=channel, **kwargs) - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[can.Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, bool]: ociMsgs = (ctypes.POINTER(OCI_CANMessageEx) * 1)() ociMsg = OCI_CANMessageEx() ociMsgs[0] = ctypes.pointer(ociMsg) @@ -189,7 +187,7 @@ def _recv_internal( return (msg, True) - def send(self, msg: can.Message, timeout: Optional[float] = None) -> None: + def send(self, msg: can.Message, timeout: float | None = None) -> None: ociMsgs = (ctypes.POINTER(OCI_CANMessageEx) * 1)() ociMsg = OCI_CANMessageEx() ociMsgs[0] = ctypes.pointer(ociMsg) @@ -219,7 +217,7 @@ def send(self, msg: can.Message, timeout: Optional[float] = None) -> None: OCI_WriteCANDataEx(self.txQueue, OCI_NO_TIME, ociMsgs, 1, None) - def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None: + def _apply_filters(self, filters: can.typechecking.CanFilters | None) -> None: if self._oci_filters: OCI_RemoveCANFrameFilterEx(self.rxQueue, self._oci_filters, 1) diff --git a/can/interfaces/gs_usb.py b/can/interfaces/gs_usb.py index 4ab541f43..6297fc1f5 100644 --- a/can/interfaces/gs_usb.py +++ b/can/interfaces/gs_usb.py @@ -1,5 +1,4 @@ import logging -from typing import Optional import usb from gs_usb.constants import CAN_EFF_FLAG, CAN_ERR_FLAG, CAN_MAX_DLC, CAN_RTR_FLAG @@ -82,7 +81,7 @@ def __init__( **kwargs, ) - def send(self, msg: can.Message, timeout: Optional[float] = None): + def send(self, msg: can.Message, timeout: float | None = None): """Transmit a message to the CAN bus. :param Message msg: A message object. @@ -117,9 +116,7 @@ def send(self, msg: can.Message, timeout: Optional[float] = None): except usb.core.USBError as exc: raise CanOperationError("The message could not be sent") from exc - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[can.Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, bool]: """ Read a message from the bus and tell whether it was filtered. This methods may be called by :meth:`~can.BusABC.recv` diff --git a/can/interfaces/iscan.py b/can/interfaces/iscan.py index 79b4f754d..2fa19942a 100644 --- a/can/interfaces/iscan.py +++ b/can/interfaces/iscan.py @@ -5,7 +5,6 @@ import ctypes import logging import time -from typing import Optional, Union from can import ( BusABC, @@ -82,7 +81,7 @@ class IscanBus(BusABC): def __init__( self, - channel: Union[str, int], + channel: str | int, bitrate: int = 500000, poll_interval: float = 0.01, **kwargs, @@ -115,9 +114,7 @@ def __init__( **kwargs, ) - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: raw_msg = MessageExStruct() end_time = time.time() + timeout if timeout is not None else None while True: @@ -147,7 +144,7 @@ def _recv_internal( ) return msg, False - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: raw_msg = MessageExStruct( msg.arbitration_id, bool(msg.is_extended_id), diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index 6192367c4..528e86d5e 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -1,5 +1,4 @@ -from collections.abc import Sequence -from typing import Callable, Optional, Union +from collections.abc import Callable, Sequence import can.interfaces.ixxat.canlib_vcinpl as vcinpl import can.interfaces.ixxat.canlib_vcinpl2 as vcinpl2 @@ -26,20 +25,20 @@ def __init__( channel: int, can_filters=None, receive_own_messages: bool = False, - unique_hardware_id: Optional[int] = None, + unique_hardware_id: int | None = None, extended: bool = True, fd: bool = False, - rx_fifo_size: Optional[int] = None, - tx_fifo_size: Optional[int] = None, + rx_fifo_size: int | None = None, + tx_fifo_size: int | None = None, bitrate: int = 500000, data_bitrate: int = 2000000, - sjw_abr: Optional[int] = None, - tseg1_abr: Optional[int] = None, - tseg2_abr: Optional[int] = None, - sjw_dbr: Optional[int] = None, - tseg1_dbr: Optional[int] = None, - tseg2_dbr: Optional[int] = None, - ssp_dbr: Optional[int] = None, + sjw_abr: int | None = None, + tseg1_abr: int | None = None, + tseg2_abr: int | None = None, + sjw_dbr: int | None = None, + tseg1_dbr: int | None = None, + tseg2_dbr: int | None = None, + ssp_dbr: int | None = None, **kwargs, ): """ @@ -147,16 +146,16 @@ def _recv_internal(self, timeout): """Read a message from IXXAT device.""" return self.bus._recv_internal(timeout) - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: return self.bus.send(msg, timeout) def _send_periodic_internal( self, - msgs: Union[Sequence[Message], Message], + msgs: Sequence[Message] | Message, period: float, - duration: Optional[float] = None, + duration: float | None = None, autostart: bool = True, - modifier_callback: Optional[Callable[[Message], None]] = None, + modifier_callback: Callable[[Message], None] | None = None, ) -> CyclicSendTaskABC: return self.bus._send_periodic_internal( msgs, period, duration, autostart, modifier_callback diff --git a/can/interfaces/ixxat/canlib_vcinpl.py b/can/interfaces/ixxat/canlib_vcinpl.py index 59f98417d..7c4becafd 100644 --- a/can/interfaces/ixxat/canlib_vcinpl.py +++ b/can/interfaces/ixxat/canlib_vcinpl.py @@ -15,8 +15,7 @@ import sys import time import warnings -from collections.abc import Sequence -from typing import Callable, Optional, Union +from collections.abc import Callable, Sequence from can import ( BusABC, @@ -436,7 +435,7 @@ def __init__( channel: int, can_filters=None, receive_own_messages: bool = False, - unique_hardware_id: Optional[int] = None, + unique_hardware_id: int | None = None, extended: bool = True, rx_fifo_size: int = 16, tx_fifo_size: int = 16, @@ -770,7 +769,7 @@ def _recv_internal(self, timeout): return rx_msg, True - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: """ Sends a message on the bus. The interface may buffer the message. @@ -805,11 +804,11 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: def _send_periodic_internal( self, - msgs: Union[Sequence[Message], Message], + msgs: Sequence[Message] | Message, period: float, - duration: Optional[float] = None, + duration: float | None = None, autostart: bool = True, - modifier_callback: Optional[Callable[[Message], None]] = None, + modifier_callback: Callable[[Message], None] | None = None, ) -> CyclicSendTaskABC: """Send a message using built-in cyclic transmit list functionality.""" if modifier_callback is None: diff --git a/can/interfaces/ixxat/canlib_vcinpl2.py b/can/interfaces/ixxat/canlib_vcinpl2.py index b7698277f..b6789885a 100644 --- a/can/interfaces/ixxat/canlib_vcinpl2.py +++ b/can/interfaces/ixxat/canlib_vcinpl2.py @@ -15,8 +15,7 @@ import sys import time import warnings -from collections.abc import Sequence -from typing import Callable, Optional, Union +from collections.abc import Callable, Sequence from can import ( BusABC, @@ -431,19 +430,19 @@ def __init__( channel: int, can_filters=None, receive_own_messages: int = False, - unique_hardware_id: Optional[int] = None, + unique_hardware_id: int | None = None, extended: bool = True, rx_fifo_size: int = 1024, tx_fifo_size: int = 128, bitrate: int = 500000, data_bitrate: int = 2000000, - sjw_abr: Optional[int] = None, - tseg1_abr: Optional[int] = None, - tseg2_abr: Optional[int] = None, - sjw_dbr: Optional[int] = None, - tseg1_dbr: Optional[int] = None, - tseg2_dbr: Optional[int] = None, - ssp_dbr: Optional[int] = None, + sjw_abr: int | None = None, + tseg1_abr: int | None = None, + tseg2_abr: int | None = None, + sjw_dbr: int | None = None, + tseg1_dbr: int | None = None, + tseg2_dbr: int | None = None, + ssp_dbr: int | None = None, **kwargs, ): """ @@ -902,7 +901,7 @@ def _recv_internal(self, timeout): return rx_msg, True - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: """ Sends a message on the bus. The interface may buffer the message. @@ -947,11 +946,11 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: def _send_periodic_internal( self, - msgs: Union[Sequence[Message], Message], + msgs: Sequence[Message] | Message, period: float, - duration: Optional[float] = None, + duration: float | None = None, autostart: bool = True, - modifier_callback: Optional[Callable[[Message], None]] = None, + modifier_callback: Callable[[Message], None] | None = None, ) -> CyclicSendTaskABC: """Send a message using built-in cyclic transmit list functionality.""" if modifier_callback is None: diff --git a/can/interfaces/kvaser/canlib.py b/can/interfaces/kvaser/canlib.py index a1dd03e58..4403b60ca 100644 --- a/can/interfaces/kvaser/canlib.py +++ b/can/interfaces/kvaser/canlib.py @@ -10,7 +10,6 @@ import logging import sys import time -from typing import Optional, Union from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message from can.exceptions import CanError, CanInitializationError, CanOperationError @@ -375,8 +374,8 @@ class KvaserBus(BusABC): def __init__( self, channel: int, - can_filters: Optional[CanFilters] = None, - timing: Optional[Union[BitTiming, BitTimingFd]] = None, + can_filters: CanFilters | None = None, + timing: BitTiming | BitTimingFd | None = None, **kwargs, ): """ diff --git a/can/interfaces/nican.py b/can/interfaces/nican.py index 1abf0b35f..ba5b991c9 100644 --- a/can/interfaces/nican.py +++ b/can/interfaces/nican.py @@ -16,7 +16,6 @@ import ctypes import logging import sys -from typing import Optional import can.typechecking from can import ( @@ -187,8 +186,8 @@ class NicanBus(BusABC): def __init__( self, channel: str, - can_filters: Optional[can.typechecking.CanFilters] = None, - bitrate: Optional[int] = None, + can_filters: can.typechecking.CanFilters | None = None, + bitrate: int | None = None, log_errors: bool = True, **kwargs, ) -> None: @@ -279,9 +278,7 @@ def __init__( **kwargs, ) - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: """ Read a message from a NI-CAN bus. @@ -330,7 +327,7 @@ def _recv_internal( ) return msg, True - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: """ Send a message to NI-CAN. diff --git a/can/interfaces/nixnet.py b/can/interfaces/nixnet.py index c723d1f52..ec303a364 100644 --- a/can/interfaces/nixnet.py +++ b/can/interfaces/nixnet.py @@ -14,7 +14,7 @@ import warnings from queue import SimpleQueue from types import ModuleType -from typing import Any, Optional, Union +from typing import Any import can.typechecking from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) -nixnet: Optional[ModuleType] = None +nixnet: ModuleType | None = None try: import nixnet # type: ignore import nixnet.constants # type: ignore @@ -52,12 +52,12 @@ def __init__( self, channel: str = "CAN1", bitrate: int = 500_000, - timing: Optional[Union[BitTiming, BitTimingFd]] = None, - can_filters: Optional[can.typechecking.CanFilters] = None, + timing: BitTiming | BitTimingFd | None = None, + can_filters: can.typechecking.CanFilters | None = None, receive_own_messages: bool = False, can_termination: bool = False, fd: bool = False, - fd_bitrate: Optional[int] = None, + fd_bitrate: int | None = None, poll_interval: float = 0.001, **kwargs: Any, ) -> None: @@ -201,9 +201,7 @@ def fd(self) -> bool: ) return self._can_protocol is CanProtocol.CAN_FD - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: end_time = time.perf_counter() + timeout if timeout is not None else None while True: @@ -256,7 +254,7 @@ def _recv_internal( ) return msg, False - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: """ Send a message using NI-XNET. diff --git a/can/interfaces/pcan/pcan.py b/can/interfaces/pcan/pcan.py index d63981580..a2f5f361f 100644 --- a/can/interfaces/pcan/pcan.py +++ b/can/interfaces/pcan/pcan.py @@ -6,7 +6,7 @@ import platform import time import warnings -from typing import Any, Optional, Union +from typing import Any from packaging import version @@ -120,9 +120,9 @@ class PcanBus(BusABC): def __init__( self, channel: str = "PCAN_USBBUS1", - device_id: Optional[int] = None, + device_id: int | None = None, state: BusState = BusState.ACTIVE, - timing: Optional[Union[BitTiming, BitTimingFd]] = None, + timing: BitTiming | BitTimingFd | None = None, bitrate: int = 500000, receive_own_messages: bool = False, **kwargs: Any, @@ -500,9 +500,7 @@ def set_device_number(self, device_number): return False return True - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: end_time = time.time() + timeout if timeout is not None else None while True: @@ -523,7 +521,7 @@ def _recv_internal( # receive queue is empty, wait or return on timeout if end_time is None: - time_left: Optional[float] = None + time_left: float | None = None timed_out = False else: time_left = max(0.0, end_time - time.time()) @@ -793,7 +791,7 @@ def _detect_available_configs(): pass return channels - def status_string(self) -> Optional[str]: + def status_string(self) -> str | None: """ Query the PCAN bus status. diff --git a/can/interfaces/robotell.py b/can/interfaces/robotell.py index 16668bdda..b24543856 100644 --- a/can/interfaces/robotell.py +++ b/can/interfaces/robotell.py @@ -5,7 +5,6 @@ import io import logging import time -from typing import Optional from can import BusABC, CanProtocol, Message @@ -380,7 +379,7 @@ def fileno(self): except Exception as exception: raise CanOperationError("Cannot fetch fileno") from exception - def get_serial_number(self, timeout: Optional[int]) -> Optional[str]: + def get_serial_number(self, timeout: int | None) -> str | None: """Get serial number of the slcan interface. :param timeout: diff --git a/can/interfaces/serial/serial_can.py b/can/interfaces/serial/serial_can.py index 680cf10f6..efef6cf59 100644 --- a/can/interfaces/serial/serial_can.py +++ b/can/interfaces/serial/serial_can.py @@ -11,7 +11,7 @@ import logging import struct from collections.abc import Sequence -from typing import Any, Optional, cast +from typing import Any, cast from can import ( BusABC, @@ -109,7 +109,7 @@ def shutdown(self) -> None: super().shutdown() self._ser.close() - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: """ Send a message over the serial device. @@ -161,9 +161,7 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: except serial.SerialTimeoutException as error: raise CanTimeoutError() from error - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: """ Read a message from the serial device. diff --git a/can/interfaces/slcan.py b/can/interfaces/slcan.py index d9eab5edf..086d9ed32 100644 --- a/can/interfaces/slcan.py +++ b/can/interfaces/slcan.py @@ -7,7 +7,7 @@ import time import warnings from queue import SimpleQueue -from typing import Any, Optional, Union, cast +from typing import Any, cast from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message, typechecking from can.exceptions import ( @@ -75,8 +75,8 @@ def __init__( self, channel: typechecking.ChannelStr, tty_baudrate: int = 115200, - bitrate: Optional[int] = None, - timing: Optional[Union[BitTiming, BitTimingFd]] = None, + bitrate: int | None = None, + timing: BitTiming | BitTimingFd | None = None, sleep_after_open: float = _SLEEP_AFTER_SERIAL_OPEN, rtscts: bool = False, listen_only: bool = False, @@ -119,7 +119,7 @@ def __init__( if serial is None: raise CanInterfaceNotImplementedError("The serial module is not installed") - btr: Optional[str] = kwargs.get("btr", None) + btr: str | None = kwargs.get("btr", None) if btr is not None: warnings.warn( "The 'btr' argument is deprecated since python-can v4.5.0 " @@ -166,7 +166,7 @@ def __init__( super().__init__(channel, **kwargs) - def set_bitrate(self, bitrate: int, data_bitrate: Optional[int] = None) -> None: + def set_bitrate(self, bitrate: int, data_bitrate: int | None = None) -> None: """ :param bitrate: Bitrate in bit/s @@ -211,7 +211,7 @@ def _write(self, string: str) -> None: self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR) self.serialPortOrig.flush() - def _read(self, timeout: Optional[float]) -> Optional[str]: + def _read(self, timeout: float | None) -> str | None: _timeout = serial.Timeout(timeout) with error_check("Could not read from serial device"): @@ -250,9 +250,7 @@ def open(self) -> None: def close(self) -> None: self._write("C") - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: canId = None remote = False extended = False @@ -261,7 +259,7 @@ def _recv_internal( fdBrs = False if self._queue.qsize(): - string: Optional[str] = self._queue.get_nowait() + string: str | None = self._queue.get_nowait() else: string = self._read(timeout) @@ -335,7 +333,7 @@ def _recv_internal( return msg, False return None, False - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: if timeout != self.serialPortOrig.write_timeout: self.serialPortOrig.write_timeout = timeout if msg.is_remote_frame: @@ -381,9 +379,7 @@ def fileno(self) -> int: except Exception as exception: raise CanOperationError("Cannot fetch fileno") from exception - def get_version( - self, timeout: Optional[float] - ) -> tuple[Optional[int], Optional[int]]: + def get_version(self, timeout: float | None) -> tuple[int | None, int | None]: """Get HW and SW version of the slcan interface. :param timeout: @@ -411,7 +407,7 @@ def get_version( break return None, None - def get_serial_number(self, timeout: Optional[float]) -> Optional[str]: + def get_serial_number(self, timeout: float | None) -> str | None: """Get serial number of the slcan interface. :param timeout: diff --git a/can/interfaces/socketcan/socketcan.py b/can/interfaces/socketcan/socketcan.py index 30b75108a..6dc856cbf 100644 --- a/can/interfaces/socketcan/socketcan.py +++ b/can/interfaces/socketcan/socketcan.py @@ -15,8 +15,7 @@ import threading import time import warnings -from collections.abc import Sequence -from typing import Callable, Optional, Union +from collections.abc import Callable, Sequence import can from can import BusABC, CanProtocol, Message @@ -51,14 +50,12 @@ # Setup BCM struct def bcm_header_factory( - fields: list[tuple[str, Union[type[ctypes.c_uint32], type[ctypes.c_long]]]], + fields: list[tuple[str, type[ctypes.c_uint32] | type[ctypes.c_long]]], alignment: int = 8, ): curr_stride = 0 results: list[ - tuple[ - str, Union[type[ctypes.c_uint8], type[ctypes.c_uint32], type[ctypes.c_long]] - ] + tuple[str, type[ctypes.c_uint8] | type[ctypes.c_uint32] | type[ctypes.c_long]] ] = [] pad_index = 0 for field in fields: @@ -405,9 +402,9 @@ def __init__( self, bcm_socket: socket.socket, task_id: int, - messages: Union[Sequence[Message], Message], + messages: Sequence[Message] | Message, period: float, - duration: Optional[float] = None, + duration: float | None = None, autostart: bool = True, ) -> None: """Construct and :meth:`~start` a task. @@ -507,7 +504,7 @@ def stop(self) -> None: stopframe = build_bcm_tx_delete_header(self.task_id, self.flags) send_bcm(self.bcm_socket, stopframe) - def modify_data(self, messages: Union[Sequence[Message], Message]) -> None: + def modify_data(self, messages: Sequence[Message] | Message) -> None: """Update the contents of the periodically sent CAN messages by sending TX_SETUP message to Linux kernel. @@ -605,9 +602,7 @@ def bind_socket(sock: socket.socket, channel: str = "can0") -> None: log.debug("Bound socket.") -def capture_message( - sock: socket.socket, get_channel: bool = False -) -> Optional[Message]: +def capture_message(sock: socket.socket, get_channel: bool = False) -> Message | None: """ Captures a message from given socket. @@ -702,7 +697,7 @@ def __init__( receive_own_messages: bool = False, local_loopback: bool = True, fd: bool = False, - can_filters: Optional[CanFilters] = None, + can_filters: CanFilters | None = None, ignore_rx_error_frames=False, **kwargs, ) -> None: @@ -818,9 +813,7 @@ def shutdown(self) -> None: log.debug("Closing raw can socket") self.socket.close() - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: try: # get all sockets that are ready (can be a list with a single value # being self.socket or an empty list if self.socket is not ready) @@ -842,7 +835,7 @@ def _recv_internal( # socket wasn't readable or timeout occurred return None, self._is_filtered - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: """Transmit a message to the CAN bus. :param msg: A message object. @@ -880,7 +873,7 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: raise can.CanOperationError("Transmit buffer full") - def _send_once(self, data: bytes, channel: Optional[str] = None) -> int: + def _send_once(self, data: bytes, channel: str | None = None) -> int: try: if self.channel == "" and channel: # Message must be addressed to a specific channel @@ -895,11 +888,11 @@ def _send_once(self, data: bytes, channel: Optional[str] = None) -> int: def _send_periodic_internal( self, - msgs: Union[Sequence[Message], Message], + msgs: Sequence[Message] | Message, period: float, - duration: Optional[float] = None, + duration: float | None = None, autostart: bool = True, - modifier_callback: Optional[Callable[[Message], None]] = None, + modifier_callback: Callable[[Message], None] | None = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Start sending messages at a given period on this bus. @@ -974,7 +967,7 @@ def _get_bcm_socket(self, channel: str) -> socket.socket: self._bcm_sockets[channel] = create_bcm_socket(self.channel) return self._bcm_sockets[channel] - def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None: + def _apply_filters(self, filters: can.typechecking.CanFilters | None) -> None: try: self.socket.setsockopt( constants.SOL_CAN_RAW, constants.CAN_RAW_FILTER, pack_filters(filters) diff --git a/can/interfaces/socketcan/utils.py b/can/interfaces/socketcan/utils.py index 1c096f66e..0740f769d 100644 --- a/can/interfaces/socketcan/utils.py +++ b/can/interfaces/socketcan/utils.py @@ -9,7 +9,6 @@ import struct import subprocess import sys -from typing import Optional from can import typechecking from can.interfaces.socketcan.constants import CAN_EFF_FLAG @@ -17,7 +16,7 @@ log = logging.getLogger(__name__) -def pack_filters(can_filters: Optional[typechecking.CanFilters] = None) -> bytes: +def pack_filters(can_filters: typechecking.CanFilters | None = None) -> bytes: if can_filters is None: # Pass all messages can_filters = [{"can_id": 0, "can_mask": 0}] @@ -72,7 +71,7 @@ def find_available_interfaces() -> list[str]: return interfaces -def error_code_to_str(code: Optional[int]) -> str: +def error_code_to_str(code: int | None) -> str: """ Converts a given error code (errno) to a useful and human readable string. diff --git a/can/interfaces/udp_multicast/bus.py b/can/interfaces/udp_multicast/bus.py index 9e0187ea2..87a0800fa 100644 --- a/can/interfaces/udp_multicast/bus.py +++ b/can/interfaces/udp_multicast/bus.py @@ -6,7 +6,7 @@ import struct import time import warnings -from typing import Any, Optional, Union +from typing import Any import can from can import BusABC, CanProtocol, Message @@ -24,7 +24,7 @@ # see socket.getaddrinfo() IPv4_ADDRESS_INFO = tuple[str, int] # address, port IPv6_ADDRESS_INFO = tuple[str, int, int, int] # address, port, flowinfo, scope_id -IP_ADDRESS_INFO = Union[IPv4_ADDRESS_INFO, IPv6_ADDRESS_INFO] +IP_ADDRESS_INFO = IPv4_ADDRESS_INFO | IPv6_ADDRESS_INFO # Additional constants for the interaction with Unix kernels SO_TIMESTAMPNS = 35 @@ -126,9 +126,7 @@ def is_fd(self) -> bool: ) return self._can_protocol is CanProtocol.CAN_FD - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: result = self._multicast.recv(timeout) if not result: return None, False @@ -148,7 +146,7 @@ def _recv_internal( return can_message, False - def send(self, msg: can.Message, timeout: Optional[float] = None) -> None: + def send(self, msg: can.Message, timeout: float | None = None) -> None: if self._can_protocol is not CanProtocol.CAN_FD and msg.is_fd: raise can.CanOperationError( "cannot send FD message over bus with CAN FD disabled" @@ -242,7 +240,7 @@ def __init__( # used by send() self._send_destination = (self.group, self.port) - self._last_send_timeout: Optional[float] = None + self._last_send_timeout: float | None = None def _create_socket(self, address_family: socket.AddressFamily) -> socket.socket: """Creates a new socket. This might fail and raise an exception! @@ -319,7 +317,7 @@ def _create_socket(self, address_family: socket.AddressFamily) -> socket.socket: "could not create or configure socket" ) from error - def send(self, data: bytes, timeout: Optional[float] = None) -> None: + def send(self, data: bytes, timeout: float | None = None) -> None: """Send data to all group members. This call blocks. :param timeout: the timeout in seconds after which an Exception is raised is sending has failed @@ -342,8 +340,8 @@ def send(self, data: bytes, timeout: Optional[float] = None) -> None: raise can.CanOperationError("failed to send via socket") from error def recv( - self, timeout: Optional[float] = None - ) -> Optional[tuple[bytes, IP_ADDRESS_INFO, float]]: + self, timeout: float | None = None + ) -> tuple[bytes, IP_ADDRESS_INFO, float] | None: """ Receive up to **max_buffer** bytes. diff --git a/can/interfaces/udp_multicast/utils.py b/can/interfaces/udp_multicast/utils.py index de39833a3..1e1d62c23 100644 --- a/can/interfaces/udp_multicast/utils.py +++ b/can/interfaces/udp_multicast/utils.py @@ -2,7 +2,7 @@ Defines common functions. """ -from typing import Any, Optional, cast +from typing import Any, cast from can import CanInterfaceNotImplementedError, Message from can.typechecking import ReadableBytesLike @@ -56,7 +56,7 @@ def pack_message(message: Message) -> bytes: def unpack_message( data: ReadableBytesLike, - replace: Optional[dict[str, Any]] = None, + replace: dict[str, Any] | None = None, check: bool = False, ) -> Message: """Unpack a can.Message from a msgpack byte blob. diff --git a/can/interfaces/usb2can/usb2canInterface.py b/can/interfaces/usb2can/usb2canInterface.py index adc16e8b3..66c171f4d 100644 --- a/can/interfaces/usb2can/usb2canInterface.py +++ b/can/interfaces/usb2can/usb2canInterface.py @@ -4,7 +4,6 @@ import logging from ctypes import byref -from typing import Optional, Union from can import ( BitTiming, @@ -110,12 +109,12 @@ class Usb2canBus(BusABC): def __init__( self, - channel: Optional[str] = None, + channel: str | None = None, dll: str = "usb2can.dll", flags: int = 0x00000008, bitrate: int = 500000, - timing: Optional[Union[BitTiming, BitTimingFd]] = None, - serial: Optional[str] = None, + timing: BitTiming | BitTimingFd | None = None, + serial: str | None = None, **kwargs, ): self.can = Usb2CanAbstractionLayer(dll) @@ -207,7 +206,7 @@ def _detect_available_configs(): return Usb2canBus.detect_available_configs() @staticmethod - def detect_available_configs(serial_matcher: Optional[str] = None): + def detect_available_configs(serial_matcher: str | None = None): """ Uses the *Windows Management Instrumentation* to identify serial devices. diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index d15b89803..8bdd77b83 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -10,14 +10,11 @@ import os import time import warnings -from collections.abc import Iterator, Sequence +from collections.abc import Callable, Iterator, Sequence from types import ModuleType from typing import ( Any, - Callable, NamedTuple, - Optional, - Union, cast, ) @@ -45,14 +42,14 @@ LOG = logging.getLogger(__name__) # Import safely Vector API module for Travis tests -xldriver: Optional[ModuleType] = None +xldriver: ModuleType | None = None try: from . import xldriver except FileNotFoundError as exc: LOG.warning("Could not import vxlapi: %s", exc) -WaitForSingleObject: Optional[Callable[[int, int], int]] -INFINITE: Optional[int] +WaitForSingleObject: Callable[[int, int], int] | None +INFINITE: int | None try: # Try builtin Python 3 Windows API from _winapi import ( # type: ignore[attr-defined,no-redef,unused-ignore] @@ -83,24 +80,24 @@ class VectorBus(BusABC): ) def __init__( self, - channel: Union[int, Sequence[int], str], - can_filters: Optional[CanFilters] = None, + channel: int | Sequence[int] | str, + can_filters: CanFilters | None = None, poll_interval: float = 0.01, receive_own_messages: bool = False, - timing: Optional[Union[BitTiming, BitTimingFd]] = None, - bitrate: Optional[int] = None, + timing: BitTiming | BitTimingFd | None = None, + bitrate: int | None = None, rx_queue_size: int = 2**14, - app_name: Optional[str] = "CANalyzer", - serial: Optional[int] = None, + app_name: str | None = "CANalyzer", + serial: int | None = None, fd: bool = False, - data_bitrate: Optional[int] = None, + data_bitrate: int | None = None, sjw_abr: int = 2, tseg1_abr: int = 6, tseg2_abr: int = 3, sjw_dbr: int = 2, tseg1_dbr: int = 6, tseg2_dbr: int = 3, - listen_only: Optional[bool] = False, + listen_only: bool | None = False, **kwargs: Any, ) -> None: """ @@ -377,8 +374,8 @@ def fd(self) -> bool: def _find_global_channel_idx( self, channel: int, - serial: Optional[int], - app_name: Optional[str], + serial: int | None, + app_name: str | None, channel_configs: list["VectorChannelConfig"], ) -> int: if serial is not None: @@ -565,10 +562,10 @@ def _check_can_settings( self, channel_mask: int, bitrate: int, - sample_point: Optional[float] = None, + sample_point: float | None = None, fd: bool = False, - data_bitrate: Optional[int] = None, - data_sample_point: Optional[float] = None, + data_bitrate: int | None = None, + data_sample_point: float | None = None, ) -> None: """Compare requested CAN settings to active settings in driver.""" vcc_list = get_channel_configs() @@ -656,7 +653,7 @@ def _check_can_settings( f"These are the currently active settings: {settings_string}." ) - def _apply_filters(self, filters: Optional[CanFilters]) -> None: + def _apply_filters(self, filters: CanFilters | None) -> None: if filters: # Only up to one filter per ID type allowed if len(filters) == 1 or ( @@ -706,9 +703,7 @@ def _apply_filters(self, filters: Optional[CanFilters]) -> None: except VectorOperationError as exc: LOG.warning("Could not reset filters: %s", exc) - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: end_time = time.time() + timeout if timeout is not None else None while True: @@ -741,7 +736,7 @@ def _recv_internal( # Wait a short time until we try again time.sleep(self.poll_interval) - def _recv_canfd(self) -> Optional[Message]: + def _recv_canfd(self) -> Message | None: xl_can_rx_event = xlclass.XLcanRxEvent() self.xldriver.xlCanReceive(self.port_handle, xl_can_rx_event) @@ -786,7 +781,7 @@ def _recv_canfd(self) -> Optional[Message]: data=data_struct.data[:dlc], ) - def _recv_can(self) -> Optional[Message]: + def _recv_can(self) -> Message | None: xl_event = xlclass.XLevent() event_count = ctypes.c_uint(1) self.xldriver.xlReceive(self.port_handle, event_count, xl_event) @@ -842,7 +837,7 @@ def handle_canfd_event(self, event: xlclass.XLcanRxEvent) -> None: `XL_CAN_EV_TAG_TX_ERROR`, `XL_TIMER` or `XL_CAN_EV_TAG_CHIP_STATE` tag. """ - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: self._send_sequence([msg]) def _send_sequence(self, msgs: Sequence[Message]) -> int: @@ -1030,7 +1025,7 @@ def popup_vector_hw_configuration(wait_for_finish: int = 0) -> None: @staticmethod def get_application_config( app_name: str, app_channel: int - ) -> tuple[Union[int, xldefine.XL_HardwareType], int, int]: + ) -> tuple[int | xldefine.XL_HardwareType, int, int]: """Retrieve information for an application in Vector Hardware Configuration. :param app_name: @@ -1076,7 +1071,7 @@ def get_application_config( def set_application_config( app_name: str, app_channel: int, - hw_type: Union[int, xldefine.XL_HardwareType], + hw_type: int | xldefine.XL_HardwareType, hw_index: int, hw_channel: int, **kwargs: Any, @@ -1170,7 +1165,7 @@ class VectorChannelConfig(NamedTuple): """NamedTuple which contains the channel properties from Vector XL API.""" name: str - hw_type: Union[int, xldefine.XL_HardwareType] + hw_type: int | xldefine.XL_HardwareType hw_index: int hw_channel: int channel_index: int @@ -1179,7 +1174,7 @@ class VectorChannelConfig(NamedTuple): channel_bus_capabilities: xldefine.XL_BusCapabilities is_on_bus: bool connected_bus_type: xldefine.XL_BusTypes - bus_params: Optional[VectorBusParams] + bus_params: VectorBusParams | None serial_number: int article_number: int transceiver_name: str @@ -1214,7 +1209,7 @@ def _get_xl_driver_config() -> xlclass.XLdriverConfig: def _read_bus_params_from_c_struct( bus_params: xlclass.XLbusParams, -) -> Optional[VectorBusParams]: +) -> VectorBusParams | None: bus_type = xldefine.XL_BusTypes(bus_params.busType) if bus_type is not xldefine.XL_BusTypes.XL_BUS_TYPE_CAN: return None @@ -1283,7 +1278,7 @@ def get_channel_configs() -> list[VectorChannelConfig]: return channel_list -def _hw_type(hw_type: int) -> Union[int, xldefine.XL_HardwareType]: +def _hw_type(hw_type: int) -> int | xldefine.XL_HardwareType: try: return xldefine.XL_HardwareType(hw_type) except ValueError: diff --git a/can/interfaces/vector/exceptions.py b/can/interfaces/vector/exceptions.py index b43df5e6c..779365893 100644 --- a/can/interfaces/vector/exceptions.py +++ b/can/interfaces/vector/exceptions.py @@ -1,13 +1,13 @@ """Exception/error declarations for the vector interface.""" -from typing import Any, Optional, Union +from typing import Any from can import CanError, CanInitializationError, CanOperationError class VectorError(CanError): def __init__( - self, error_code: Optional[int], error_string: str, function: str + self, error_code: int | None, error_string: str, function: str ) -> None: super().__init__( message=f"{function} failed ({error_string})", error_code=error_code @@ -16,7 +16,7 @@ def __init__( # keep reference to args for pickling self._args = error_code, error_string, function - def __reduce__(self) -> Union[str, tuple[Any, ...]]: + def __reduce__(self) -> str | tuple[Any, ...]: return type(self), self._args, {} diff --git a/can/interfaces/virtual.py b/can/interfaces/virtual.py index e4f68b0c4..ba33a6ea8 100644 --- a/can/interfaces/virtual.py +++ b/can/interfaces/virtual.py @@ -12,7 +12,7 @@ from copy import deepcopy from random import randint from threading import RLock -from typing import Any, Final, Optional +from typing import Any, Final from can import CanOperationError from can.bus import BusABC, CanProtocol @@ -118,9 +118,7 @@ def _check_if_open(self) -> None: if not self._open: raise CanOperationError("Cannot operate on a closed bus") - def _recv_internal( - self, timeout: Optional[float] - ) -> tuple[Optional[Message], bool]: + def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]: self._check_if_open() try: msg = self.queue.get(block=True, timeout=timeout) @@ -129,7 +127,7 @@ def _recv_internal( else: return msg, False - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: self._check_if_open() timestamp = msg.timestamp if self.preserve_timestamps else time.time() diff --git a/can/io/asc.py b/can/io/asc.py index e917953ff..2c80458c4 100644 --- a/can/io/asc.py +++ b/can/io/asc.py @@ -10,7 +10,7 @@ import re from collections.abc import Generator from datetime import datetime -from typing import Any, Final, Optional, TextIO, Union +from typing import Any, Final, TextIO from ..message import Message from ..typechecking import StringPathLike @@ -41,7 +41,7 @@ class ASCReader(TextIOMessageReader): def __init__( self, - file: Union[StringPathLike, TextIO], + file: StringPathLike | TextIO, base: str = "hex", relative_timestamp: bool = True, **kwargs: Any, @@ -64,10 +64,10 @@ def __init__( self.base = base self._converted_base = self._check_base(base) self.relative_timestamp = relative_timestamp - self.date: Optional[str] = None + self.date: str | None = None self.start_time = 0.0 # TODO - what is this used for? The ASC Writer only prints `absolute` - self.timestamps_format: Optional[str] = None + self.timestamps_format: str | None = None self.internal_events_logged = False def _extract_header(self) -> None: @@ -284,7 +284,7 @@ def __iter__(self) -> Generator[Message, None, None]: # J1939 message or some other unsupported event continue - msg_kwargs: dict[str, Union[float, bool, int]] = {} + msg_kwargs: dict[str, float | bool | int] = {} try: _timestamp, channel, rest_of_message = line.split(None, 2) timestamp = float(_timestamp) + self.start_time @@ -347,7 +347,7 @@ class ASCWriter(TextIOMessageWriter): def __init__( self, - file: Union[StringPathLike, TextIO], + file: StringPathLike | TextIO, channel: int = 1, **kwargs: Any, ) -> None: @@ -393,7 +393,7 @@ def stop(self) -> None: self.file.write("End TriggerBlock\n") super().stop() - def log_event(self, message: str, timestamp: Optional[float] = None) -> None: + def log_event(self, message: str, timestamp: float | None = None) -> None: """Add a message to the log file. :param message: an arbitrary message diff --git a/can/io/blf.py b/can/io/blf.py index 2c9050d54..77bd02fae 100644 --- a/can/io/blf.py +++ b/can/io/blf.py @@ -19,7 +19,7 @@ import zlib from collections.abc import Generator, Iterator from decimal import Decimal -from typing import Any, BinaryIO, Optional, Union, cast +from typing import Any, BinaryIO, cast from ..message import Message from ..typechecking import StringPathLike @@ -104,7 +104,7 @@ class BLFParseError(Exception): TIME_ONE_NANS_FACTOR = Decimal("1e-9") -def timestamp_to_systemtime(timestamp: Optional[float]) -> TSystemTime: +def timestamp_to_systemtime(timestamp: float | None) -> TSystemTime: if timestamp is None or timestamp < 631152000: # Probably not a Unix timestamp return 0, 0, 0, 0, 0, 0, 0, 0 @@ -148,7 +148,7 @@ class BLFReader(BinaryIOMessageReader): def __init__( self, - file: Union[StringPathLike, BinaryIO], + file: StringPathLike | BinaryIO, **kwargs: Any, ) -> None: """ @@ -386,7 +386,7 @@ class BLFWriter(BinaryIOMessageWriter): def __init__( self, - file: Union[StringPathLike, BinaryIO], + file: StringPathLike | BinaryIO, append: bool = False, channel: int = 1, compression_level: int = -1, @@ -430,10 +430,10 @@ def __init__( raise BLFParseError("Unexpected file format") self.uncompressed_size = header[11] self.object_count = header[12] - self.start_timestamp: Optional[float] = systemtime_to_timestamp( + self.start_timestamp: float | None = systemtime_to_timestamp( cast("TSystemTime", header[14:22]) ) - self.stop_timestamp: Optional[float] = systemtime_to_timestamp( + self.stop_timestamp: float | None = systemtime_to_timestamp( cast("TSystemTime", header[22:30]) ) # Jump to the end of the file @@ -508,7 +508,7 @@ def on_message_received(self, msg: Message) -> None: data = CAN_MSG_STRUCT.pack(channel, flags, msg.dlc, arb_id, can_data) self._add_object(CAN_MESSAGE, data, msg.timestamp) - def log_event(self, text: str, timestamp: Optional[float] = None) -> None: + def log_event(self, text: str, timestamp: float | None = None) -> None: """Add an arbitrary message to the log file as a global marker. :param str text: @@ -530,7 +530,7 @@ def log_event(self, text: str, timestamp: Optional[float] = None) -> None: self._add_object(GLOBAL_MARKER, data + encoded + marker + comment, timestamp) def _add_object( - self, obj_type: int, data: bytes, timestamp: Optional[float] = None + self, obj_type: int, data: bytes, timestamp: float | None = None ) -> None: if timestamp is None: timestamp = self.stop_timestamp or time.time() @@ -574,7 +574,7 @@ def _flush(self) -> None: self._buffer = [tail] self._buffer_size = len(tail) if not self.compression_level: - data: "Union[bytes, memoryview[int]]" = uncompressed_data # noqa: UP037 + data: "bytes | memoryview[int]" = uncompressed_data # noqa: UP037 method = NO_COMPRESSION else: data = zlib.compress(uncompressed_data, self.compression_level) diff --git a/can/io/canutils.py b/can/io/canutils.py index 78d081637..800125b73 100644 --- a/can/io/canutils.py +++ b/can/io/canutils.py @@ -6,7 +6,7 @@ import logging from collections.abc import Generator -from typing import Any, Optional, TextIO, Union +from typing import Any, TextIO from can.message import Message @@ -36,7 +36,7 @@ class CanutilsLogReader(TextIOMessageReader): def __init__( self, - file: Union[StringPathLike, TextIO], + file: StringPathLike | TextIO, **kwargs: Any, ) -> None: """ @@ -63,7 +63,7 @@ def __iter__(self) -> Generator[Message, None, None]: timestamp = float(timestamp_string[1:-1]) can_id_string, data = frame.split("#", maxsplit=1) - channel: Union[int, str] + channel: int | str if channel_string.isdigit(): channel = int(channel_string) else: @@ -132,7 +132,7 @@ class CanutilsLogWriter(TextIOMessageWriter): def __init__( self, - file: Union[StringPathLike, TextIO], + file: StringPathLike | TextIO, channel: str = "vcan0", append: bool = False, **kwargs: Any, @@ -149,7 +149,7 @@ def __init__( super().__init__(file, mode="a" if append else "w") self.channel = channel - self.last_timestamp: Optional[float] = None + self.last_timestamp: float | None = None def on_message_received(self, msg: Message) -> None: # this is the case for the very first message: diff --git a/can/io/csv.py b/can/io/csv.py index 865ef9af0..0c8ba02a4 100644 --- a/can/io/csv.py +++ b/can/io/csv.py @@ -11,7 +11,7 @@ from base64 import b64decode, b64encode from collections.abc import Generator -from typing import Any, TextIO, Union +from typing import Any, TextIO from can.message import Message @@ -30,7 +30,7 @@ class CSVReader(TextIOMessageReader): def __init__( self, - file: Union[StringPathLike, TextIO], + file: StringPathLike | TextIO, **kwargs: Any, ) -> None: """ @@ -89,7 +89,7 @@ class CSVWriter(TextIOMessageWriter): def __init__( self, - file: Union[StringPathLike, TextIO], + file: StringPathLike | TextIO, append: bool = False, **kwargs: Any, ) -> None: diff --git a/can/io/generic.py b/can/io/generic.py index 21fc3e8e8..bda4e1cce 100644 --- a/can/io/generic.py +++ b/can/io/generic.py @@ -20,10 +20,8 @@ BinaryIO, Generic, Literal, - Optional, TextIO, TypeVar, - Union, ) from typing_extensions import Self @@ -71,9 +69,9 @@ def __enter__(self) -> Self: def __exit__( self, - exc_type: Optional[type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, ) -> Literal[False]: """Exit the context manager and ensure proper cleanup.""" self.stop() @@ -110,7 +108,7 @@ class FileIOMessageWriter(SizedMessageWriter, Generic[_IoTypeVar]): file: _IoTypeVar @abstractmethod - def __init__(self, file: Union[StringPathLike, _IoTypeVar], **kwargs: Any) -> None: + def __init__(self, file: StringPathLike | _IoTypeVar, **kwargs: Any) -> None: pass def stop(self) -> None: @@ -122,7 +120,7 @@ def file_size(self) -> int: return self.file.tell() -class TextIOMessageWriter(FileIOMessageWriter[Union[TextIO, TextIOWrapper]], ABC): +class TextIOMessageWriter(FileIOMessageWriter[TextIO | TextIOWrapper], ABC): """Text-based message writer implementation. :param file: Text file to write to @@ -132,8 +130,8 @@ class TextIOMessageWriter(FileIOMessageWriter[Union[TextIO, TextIOWrapper]], ABC def __init__( self, - file: Union[StringPathLike, TextIO, TextIOWrapper], - mode: "Union[OpenTextModeUpdating, OpenTextModeWriting]" = "w", + file: StringPathLike | TextIO | TextIOWrapper, + mode: "OpenTextModeUpdating | OpenTextModeWriting" = "w", **kwargs: Any, ) -> None: if isinstance(file, (str, os.PathLike)): @@ -144,7 +142,7 @@ def __init__( self.file = file -class BinaryIOMessageWriter(FileIOMessageWriter[Union[BinaryIO, BufferedIOBase]], ABC): +class BinaryIOMessageWriter(FileIOMessageWriter[BinaryIO | BufferedIOBase], ABC): """Binary file message writer implementation. :param file: Binary file to write to @@ -152,10 +150,10 @@ class BinaryIOMessageWriter(FileIOMessageWriter[Union[BinaryIO, BufferedIOBase]] :param kwargs: Additional implementation specific arguments """ - def __init__( + def __init__( # pylint: disable=unused-argument self, - file: Union[StringPathLike, BinaryIO, BufferedIOBase], - mode: "Union[OpenBinaryModeUpdating, OpenBinaryModeWriting]" = "wb", + file: StringPathLike | BinaryIO | BufferedIOBase, + mode: "OpenBinaryModeUpdating | OpenBinaryModeWriting" = "wb", **kwargs: Any, ) -> None: if isinstance(file, (str, os.PathLike)): @@ -188,9 +186,9 @@ def __enter__(self) -> Self: def __exit__( self, - exc_type: Optional[type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, ) -> Literal[False]: self.stop() return False @@ -210,14 +208,14 @@ class FileIOMessageReader(MessageReader, Generic[_IoTypeVar]): file: _IoTypeVar @abstractmethod - def __init__(self, file: Union[StringPathLike, _IoTypeVar], **kwargs: Any) -> None: + def __init__(self, file: StringPathLike | _IoTypeVar, **kwargs: Any) -> None: pass def stop(self) -> None: self.file.close() -class TextIOMessageReader(FileIOMessageReader[Union[TextIO, TextIOWrapper]], ABC): +class TextIOMessageReader(FileIOMessageReader[TextIO | TextIOWrapper], ABC): """Text-based message reader implementation. :param file: Text file to read from @@ -227,7 +225,7 @@ class TextIOMessageReader(FileIOMessageReader[Union[TextIO, TextIOWrapper]], ABC def __init__( self, - file: Union[StringPathLike, TextIO, TextIOWrapper], + file: StringPathLike | TextIO | TextIOWrapper, mode: "OpenTextModeReading" = "r", **kwargs: Any, ) -> None: @@ -239,7 +237,7 @@ def __init__( self.file = file -class BinaryIOMessageReader(FileIOMessageReader[Union[BinaryIO, BufferedIOBase]], ABC): +class BinaryIOMessageReader(FileIOMessageReader[BinaryIO | BufferedIOBase], ABC): """Binary file message reader implementation. :param file: Binary file to read from @@ -247,9 +245,9 @@ class BinaryIOMessageReader(FileIOMessageReader[Union[BinaryIO, BufferedIOBase]] :param kwargs: Additional implementation specific arguments """ - def __init__( + def __init__( # pylint: disable=unused-argument self, - file: Union[StringPathLike, BinaryIO, BufferedIOBase], + file: StringPathLike | BinaryIO | BufferedIOBase, mode: "OpenBinaryModeReading" = "rb", **kwargs: Any, ) -> None: diff --git a/can/io/logger.py b/can/io/logger.py index 4d8ddc070..5009c9756 100644 --- a/can/io/logger.py +++ b/can/io/logger.py @@ -6,15 +6,14 @@ import os import pathlib from abc import ABC, abstractmethod +from collections.abc import Callable from datetime import datetime from types import TracebackType from typing import ( Any, - Callable, ClassVar, Final, Literal, - Optional, ) from typing_extensions import Self @@ -107,7 +106,7 @@ def _compress(filename: StringPathLike, **kwargs: Any) -> FileIOMessageWriter[An def Logger( # noqa: N802 - filename: Optional[StringPathLike], **kwargs: Any + filename: StringPathLike | None, **kwargs: Any ) -> MessageWriter: """Find and return the appropriate :class:`~can.io.generic.MessageWriter` instance for a given file suffix. @@ -177,12 +176,12 @@ class BaseRotatingLogger(MessageWriter, ABC): #: If this attribute is set to a callable, the :meth:`~BaseRotatingLogger.rotation_filename` #: method delegates to this callable. The parameters passed to the callable are #: those passed to :meth:`~BaseRotatingLogger.rotation_filename`. - namer: Optional[Callable[[StringPathLike], StringPathLike]] = None + namer: Callable[[StringPathLike], StringPathLike] | None = None #: If this attribute is set to a callable, the :meth:`~BaseRotatingLogger.rotate` method #: delegates to this callable. The parameters passed to the callable are those #: passed to :meth:`~BaseRotatingLogger.rotate`. - rotator: Optional[Callable[[StringPathLike, StringPathLike], None]] = None + rotator: Callable[[StringPathLike, StringPathLike], None] | None = None #: An integer counter to track the number of rollovers. rollover_count: int = 0 @@ -286,9 +285,9 @@ def __enter__(self) -> Self: def __exit__( self, - exc_type: Optional[type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, ) -> Literal[False]: self.stop() return False diff --git a/can/io/mf4.py b/can/io/mf4.py index bf594e3a5..fcde2e193 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -13,7 +13,7 @@ from hashlib import md5 from io import BufferedIOBase, BytesIO from pathlib import Path -from typing import Any, BinaryIO, Optional, Union, cast +from typing import Any, BinaryIO, cast from ..message import Message from ..typechecking import StringPathLike @@ -93,8 +93,8 @@ class MF4Writer(BinaryIOMessageWriter): def __init__( self, - file: Union[StringPathLike, BinaryIO], - database: Optional[StringPathLike] = None, + file: StringPathLike | BinaryIO, + database: StringPathLike | None = None, compression_level: int = 2, **kwargs: Any, ) -> None: @@ -458,7 +458,7 @@ def __iter__(self) -> Generator[Message, None, None]: def __init__( self, - file: Union[StringPathLike, BinaryIO], + file: StringPathLike | BinaryIO, **kwargs: Any, ) -> None: """ @@ -497,7 +497,7 @@ def __iter__(self) -> Iterator[Message]: # No data, skip continue - acquisition_source: Optional[Source] = channel_group.acq_source + acquisition_source: Source | None = channel_group.acq_source if acquisition_source is None: # No source information, skip diff --git a/can/io/printer.py b/can/io/printer.py index 786cb7261..c41a83691 100644 --- a/can/io/printer.py +++ b/can/io/printer.py @@ -5,7 +5,7 @@ import logging import sys from io import TextIOWrapper -from typing import Any, TextIO, Union +from typing import Any, TextIO from ..message import Message from ..typechecking import StringPathLike @@ -26,7 +26,7 @@ class Printer(TextIOMessageWriter): def __init__( self, - file: Union[StringPathLike, TextIO, TextIOWrapper] = sys.stdout, + file: StringPathLike | TextIO | TextIOWrapper = sys.stdout, append: bool = False, **kwargs: Any, ) -> None: diff --git a/can/io/sqlite.py b/can/io/sqlite.py index 73aa2961c..5f4885adb 100644 --- a/can/io/sqlite.py +++ b/can/io/sqlite.py @@ -9,9 +9,7 @@ import threading import time from collections.abc import Generator, Iterator -from typing import Any - -from typing_extensions import TypeAlias +from typing import Any, TypeAlias from can.listener import BufferedReader from can.message import Message diff --git a/can/io/trc.py b/can/io/trc.py index fa8ee88e7..c02bdcfe9 100644 --- a/can/io/trc.py +++ b/can/io/trc.py @@ -9,11 +9,11 @@ import logging import os -from collections.abc import Generator +from collections.abc import Callable, Generator from datetime import datetime, timedelta, timezone from enum import Enum from io import TextIOWrapper -from typing import Any, Callable, Optional, TextIO, Union +from typing import Any, TextIO from ..message import Message from ..typechecking import StringPathLike @@ -45,7 +45,7 @@ class TRCReader(TextIOMessageReader): def __init__( self, - file: Union[StringPathLike, TextIO], + file: StringPathLike | TextIO, **kwargs: Any, ) -> None: """ @@ -62,12 +62,10 @@ def __init__( if not self.file: raise ValueError("The given file cannot be None") - self._parse_cols: Callable[[tuple[str, ...]], Optional[Message]] = ( - lambda x: None - ) + self._parse_cols: Callable[[tuple[str, ...]], Message | None] = lambda x: None @property - def start_time(self) -> Optional[datetime]: + def start_time(self) -> datetime | None: if self._start_time: return datetime.fromtimestamp(self._start_time, timezone.utc) return None @@ -140,7 +138,7 @@ def _extract_header(self) -> str: return line - def _parse_msg_v1_0(self, cols: tuple[str, ...]) -> Optional[Message]: + def _parse_msg_v1_0(self, cols: tuple[str, ...]) -> Message | None: arbit_id = cols[2] if arbit_id == "FFFFFFFF": logger.info("TRCReader: Dropping bus info line") @@ -158,7 +156,7 @@ def _parse_msg_v1_0(self, cols: tuple[str, ...]) -> Optional[Message]: msg.data = bytearray([int(cols[i + 4], 16) for i in range(msg.dlc)]) return msg - def _parse_msg_v1_1(self, cols: tuple[str, ...]) -> Optional[Message]: + def _parse_msg_v1_1(self, cols: tuple[str, ...]) -> Message | None: arbit_id = cols[3] msg = Message() @@ -174,7 +172,7 @@ def _parse_msg_v1_1(self, cols: tuple[str, ...]) -> Optional[Message]: msg.is_rx = cols[2] == "Rx" return msg - def _parse_msg_v1_3(self, cols: tuple[str, ...]) -> Optional[Message]: + def _parse_msg_v1_3(self, cols: tuple[str, ...]) -> Message | None: arbit_id = cols[4] msg = Message() @@ -190,7 +188,7 @@ def _parse_msg_v1_3(self, cols: tuple[str, ...]) -> Optional[Message]: msg.is_rx = cols[3] == "Rx" return msg - def _parse_msg_v2_x(self, cols: tuple[str, ...]) -> Optional[Message]: + def _parse_msg_v2_x(self, cols: tuple[str, ...]) -> Message | None: type_ = cols[self.columns["T"]] bus = self.columns.get("B", None) @@ -218,7 +216,7 @@ def _parse_msg_v2_x(self, cols: tuple[str, ...]) -> Optional[Message]: return msg - def _parse_cols_v1_1(self, cols: tuple[str, ...]) -> Optional[Message]: + def _parse_cols_v1_1(self, cols: tuple[str, ...]) -> Message | None: dtype = cols[2] if dtype in ("Tx", "Rx"): return self._parse_msg_v1_1(cols) @@ -226,7 +224,7 @@ def _parse_cols_v1_1(self, cols: tuple[str, ...]) -> Optional[Message]: logger.info("TRCReader: Unsupported type '%s'", dtype) return None - def _parse_cols_v1_3(self, cols: tuple[str, ...]) -> Optional[Message]: + def _parse_cols_v1_3(self, cols: tuple[str, ...]) -> Message | None: dtype = cols[3] if dtype in ("Tx", "Rx"): return self._parse_msg_v1_3(cols) @@ -234,7 +232,7 @@ def _parse_cols_v1_3(self, cols: tuple[str, ...]) -> Optional[Message]: logger.info("TRCReader: Unsupported type '%s'", dtype) return None - def _parse_cols_v2_x(self, cols: tuple[str, ...]) -> Optional[Message]: + def _parse_cols_v2_x(self, cols: tuple[str, ...]) -> Message | None: dtype = cols[self.columns["T"]] if dtype in {"DT", "FD", "FB", "FE", "BI", "RR"}: return self._parse_msg_v2_x(cols) @@ -242,7 +240,7 @@ def _parse_cols_v2_x(self, cols: tuple[str, ...]) -> Optional[Message]: logger.info("TRCReader: Unsupported type '%s'", dtype) return None - def _parse_line(self, line: str) -> Optional[Message]: + def _parse_line(self, line: str) -> Message | None: logger.debug("TRCReader: Parse '%s'", line) try: cols = tuple(line.split(maxsplit=self._num_columns)) @@ -292,7 +290,7 @@ class TRCWriter(TextIOMessageWriter): def __init__( self, - file: Union[StringPathLike, TextIO, TextIOWrapper], + file: StringPathLike | TextIO | TextIOWrapper, channel: int = 1, **kwargs: Any, ) -> None: @@ -314,7 +312,7 @@ def __init__( self.filepath = os.path.abspath(self.file.name) self.header_written = False self.msgnr = 0 - self.first_timestamp: Optional[float] = None + self.first_timestamp: float | None = None self.file_version = TRCFileVersion.V2_1 self._msg_fmt_string = self.FORMAT_MESSAGE_V1_0 self._format_message = self._format_message_init diff --git a/can/listener.py b/can/listener.py index 7f8f436a0..1e289bea6 100644 --- a/can/listener.py +++ b/can/listener.py @@ -3,12 +3,11 @@ """ import asyncio -import sys import warnings from abc import ABC, abstractmethod from collections.abc import AsyncIterator from queue import Empty, SimpleQueue -from typing import Any, Optional +from typing import Any from can.bus import BusABC from can.message import Message @@ -99,7 +98,7 @@ def on_message_received(self, msg: Message) -> None: else: self.buffer.put(msg) - def get_message(self, timeout: float = 0.5) -> Optional[Message]: + def get_message(self, timeout: float = 0.5) -> Message | None: """ Attempts to retrieve the message that has been in the queue for the longest amount of time (FIFO). If no message is available, it blocks for given timeout or until a @@ -146,12 +145,6 @@ def __init__(self, **kwargs: Any) -> None: DeprecationWarning, stacklevel=2, ) - if sys.version_info < (3, 10): - self.buffer = asyncio.Queue( # pylint: disable=unexpected-keyword-arg - loop=kwargs["loop"] - ) - return - self.buffer = asyncio.Queue() def on_message_received(self, msg: Message) -> None: diff --git a/can/logger.py b/can/logger.py index 8274d6668..537356643 100644 --- a/can/logger.py +++ b/can/logger.py @@ -4,7 +4,6 @@ from datetime import datetime from typing import ( TYPE_CHECKING, - Union, ) from can import BusState, Logger, SizedRotatingLogger @@ -110,7 +109,7 @@ def main() -> None: print(f"Connected to {bus.__class__.__name__}: {bus.channel_info}") print(f"Can Logger (Started on {datetime.now()})") - logger: Union[MessageWriter, BaseRotatingLogger] + logger: MessageWriter | BaseRotatingLogger if results.file_size: logger = SizedRotatingLogger( base_filename=results.log_file, diff --git a/can/message.py b/can/message.py index c1fbffd21..3e60ca641 100644 --- a/can/message.py +++ b/can/message.py @@ -8,7 +8,7 @@ from copy import deepcopy from math import isinf, isnan -from typing import Any, Optional +from typing import Any from . import typechecking @@ -54,9 +54,9 @@ def __init__( # pylint: disable=too-many-locals, too-many-arguments is_extended_id: bool = True, is_remote_frame: bool = False, is_error_frame: bool = False, - channel: Optional[typechecking.Channel] = None, - dlc: Optional[int] = None, - data: Optional[typechecking.CanData] = None, + channel: typechecking.Channel | None = None, + dlc: int | None = None, + data: typechecking.CanData | None = None, is_fd: bool = False, is_rx: bool = True, bitrate_switch: bool = False, @@ -185,7 +185,7 @@ def __repr__(self) -> str: return f"can.Message({', '.join(args)})" - def __format__(self, format_spec: Optional[str]) -> str: + def __format__(self, format_spec: str | None) -> str: if not format_spec: return self.__str__() else: @@ -210,7 +210,7 @@ def __copy__(self) -> "Message": error_state_indicator=self.error_state_indicator, ) - def __deepcopy__(self, memo: Optional[dict[int, Any]]) -> "Message": + def __deepcopy__(self, memo: dict[int, Any] | None) -> "Message": return Message( timestamp=self.timestamp, arbitration_id=self.arbitration_id, @@ -289,7 +289,7 @@ def _check(self) -> None: def equals( self, other: "Message", - timestamp_delta: Optional[float] = 1.0e-6, + timestamp_delta: float | None = 1.0e-6, check_channel: bool = True, check_direction: bool = True, ) -> bool: diff --git a/can/notifier.py b/can/notifier.py index a2ee512fc..cb91cf7b4 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -7,16 +7,13 @@ import logging import threading import time -from collections.abc import Awaitable, Iterable +from collections.abc import Awaitable, Callable, Iterable from contextlib import AbstractContextManager from types import TracebackType from typing import ( Any, - Callable, Final, NamedTuple, - Optional, - Union, ) from can.bus import BusABC @@ -25,7 +22,7 @@ logger = logging.getLogger("can.Notifier") -MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]] +MessageRecipient = Listener | Callable[[Message], Awaitable[None] | None] class _BusNotifierPair(NamedTuple): @@ -109,10 +106,10 @@ class Notifier(AbstractContextManager["Notifier"]): def __init__( self, - bus: Union[BusABC, list[BusABC]], + bus: BusABC | list[BusABC], listeners: Iterable[MessageRecipient], timeout: float = 1.0, - loop: Optional[asyncio.AbstractEventLoop] = None, + loop: asyncio.AbstractEventLoop | None = None, ) -> None: """Manages the distribution of :class:`~can.Message` instances to listeners. @@ -142,19 +139,19 @@ def __init__( self._loop = loop #: Exception raised in thread - self.exception: Optional[Exception] = None + self.exception: Exception | None = None self._stopped = False self._lock = threading.Lock() - self._readers: list[Union[int, threading.Thread]] = [] + self._readers: list[int | threading.Thread] = [] self._tasks: set[asyncio.Task] = set() _bus_list: list[BusABC] = bus if isinstance(bus, list) else [bus] for each_bus in _bus_list: self.add_bus(each_bus) @property - def bus(self) -> Union[BusABC, tuple["BusABC", ...]]: + def bus(self) -> BusABC | tuple["BusABC", ...]: """Return the associated bus or a tuple of buses.""" if len(self._bus_list) == 1: return self._bus_list[0] @@ -322,9 +319,9 @@ def find_instances(bus: BusABC) -> tuple["Notifier", ...]: def __exit__( self, - exc_type: Optional[type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, ) -> None: if not self._stopped: self.stop() diff --git a/can/thread_safe_bus.py b/can/thread_safe_bus.py index 35a4f400c..71d6a5536 100644 --- a/can/thread_safe_bus.py +++ b/can/thread_safe_bus.py @@ -1,6 +1,6 @@ from contextlib import nullcontext from threading import RLock -from typing import Any, Optional +from typing import Any from can import typechecking from can.bus import BusABC, BusState, CanProtocol @@ -40,9 +40,9 @@ class ThreadSafeBus(ObjectProxy): # pylint: disable=abstract-method def __init__( self, - channel: Optional[typechecking.Channel] = None, - interface: Optional[str] = None, - config_context: Optional[str] = None, + channel: typechecking.Channel | None = None, + interface: str | None = None, + config_context: str | None = None, ignore_config: bool = False, **kwargs: Any, ) -> None: @@ -67,11 +67,11 @@ def __init__( self._lock_send = RLock() self._lock_recv = RLock() - def recv(self, timeout: Optional[float] = None) -> Optional[Message]: + def recv(self, timeout: float | None = None) -> Message | None: with self._lock_recv: return self.__wrapped__.recv(timeout=timeout) - def send(self, msg: Message, timeout: Optional[float] = None) -> None: + def send(self, msg: Message, timeout: float | None = None) -> None: with self._lock_send: return self.__wrapped__.send(msg=msg, timeout=timeout) @@ -79,16 +79,16 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None: # `send` method is already synchronized @property - def filters(self) -> Optional[typechecking.CanFilters]: + def filters(self) -> typechecking.CanFilters | None: with self._lock_recv: return self.__wrapped__.filters @filters.setter - def filters(self, filters: Optional[typechecking.CanFilters]) -> None: + def filters(self, filters: typechecking.CanFilters | None) -> None: with self._lock_recv: self.__wrapped__.filters = filters - def set_filters(self, filters: Optional[typechecking.CanFilters] = None) -> None: + def set_filters(self, filters: typechecking.CanFilters | None = None) -> None: with self._lock_recv: return self.__wrapped__.set_filters(filters=filters) diff --git a/can/typechecking.py b/can/typechecking.py index 8c25e8b57..56ac5927f 100644 --- a/can/typechecking.py +++ b/can/typechecking.py @@ -1,14 +1,10 @@ """Types for mypy type-checking""" import io +import os import sys from collections.abc import Iterable, Sequence -from typing import IO, TYPE_CHECKING, Any, NewType, Union - -if sys.version_info >= (3, 10): - from typing import TypeAlias -else: - from typing_extensions import TypeAlias +from typing import IO, TYPE_CHECKING, Any, NewType, TypeAlias if sys.version_info >= (3, 12): from typing import TypedDict @@ -17,7 +13,6 @@ if TYPE_CHECKING: - import os import struct @@ -37,24 +32,24 @@ class CanFilter(_CanFilterBase, total=False): # this should have the same typing info. # # See: https://github.com/python/typing/issues/593 -CanData = Union[bytes, bytearray, int, Iterable[int]] +CanData = bytes | bytearray | int | Iterable[int] # Used for the Abstract Base Class ChannelStr = str ChannelInt = int -Channel = Union[ChannelInt, ChannelStr, Sequence[ChannelInt]] +Channel = ChannelInt | ChannelStr | Sequence[ChannelInt] # Used by the IO module -FileLike = Union[IO[Any], io.TextIOWrapper, io.BufferedIOBase] -StringPathLike = Union[str, "os.PathLike[str]"] +FileLike = IO[Any] | io.TextIOWrapper | io.BufferedIOBase +StringPathLike = str | os.PathLike[str] BusConfig = NewType("BusConfig", dict[str, Any]) # Used by CLI scripts -TAdditionalCliArgs: TypeAlias = dict[str, Union[str, int, float, bool]] +TAdditionalCliArgs: TypeAlias = dict[str, str | int | float | bool] TDataStructs: TypeAlias = dict[ - Union[int, tuple[int, ...]], - "Union[struct.Struct, tuple[struct.Struct, *tuple[float, ...]]]", + int | tuple[int, ...], + "struct.Struct | tuple[struct.Struct, *tuple[float, ...]]", ] @@ -63,7 +58,7 @@ class AutoDetectedConfig(TypedDict): channel: Channel -ReadableBytesLike = Union[bytes, bytearray, memoryview] +ReadableBytesLike = bytes | bytearray | memoryview class BitTimingDict(TypedDict): diff --git a/can/util.py b/can/util.py index 584b7dfa9..4cbeec60e 100644 --- a/can/util.py +++ b/can/util.py @@ -12,15 +12,12 @@ import platform import re import warnings -from collections.abc import Iterable +from collections.abc import Callable, Iterable from configparser import ConfigParser from time import get_clock_info, perf_counter, time from typing import ( Any, - Callable, - Optional, TypeVar, - Union, cast, ) @@ -50,7 +47,7 @@ def load_file_config( - path: Optional[typechecking.StringPathLike] = None, section: str = "default" + path: typechecking.StringPathLike | None = None, section: str = "default" ) -> dict[str, str]: """ Loads configuration from file with following content:: @@ -83,7 +80,7 @@ def load_file_config( return _config -def load_environment_config(context: Optional[str] = None) -> dict[str, str]: +def load_environment_config(context: str | None = None) -> dict[str, str]: """ Loads config dict from environmental variables (if set): @@ -120,9 +117,9 @@ def load_environment_config(context: Optional[str] = None) -> dict[str, str]: def load_config( - path: Optional[typechecking.StringPathLike] = None, - config: Optional[dict[str, Any]] = None, - context: Optional[str] = None, + path: typechecking.StringPathLike | None = None, + config: dict[str, Any] | None = None, + context: str | None = None, ) -> typechecking.BusConfig: """ Returns a dict with configuration details which is loaded from (in this order): @@ -176,7 +173,7 @@ def load_config( # Use the given dict for default values config_sources = cast( - "Iterable[Union[dict[str, Any], Callable[[Any], dict[str, Any]]]]", + "Iterable[dict[str, Any] | Callable[[Any], dict[str, Any]]]", [ given_config, can.rc, @@ -258,7 +255,7 @@ def _create_bus_config(config: dict[str, Any]) -> typechecking.BusConfig: return cast("typechecking.BusConfig", config) -def _dict2timing(data: dict[str, Any]) -> Union[BitTiming, BitTimingFd, None]: +def _dict2timing(data: dict[str, Any]) -> BitTiming | BitTimingFd | None: """Try to instantiate a :class:`~can.BitTiming` or :class:`~can.BitTimingFd` from a dictionary. Return `None` if not possible.""" @@ -325,7 +322,7 @@ def dlc2len(dlc: int) -> int: return CAN_FD_DLC[dlc] if dlc <= 15 else 64 -def channel2int(channel: Optional[typechecking.Channel]) -> Optional[int]: +def channel2int(channel: typechecking.Channel | None) -> int | None: """Try to convert the channel to an integer. :param channel: @@ -348,8 +345,8 @@ def channel2int(channel: Optional[typechecking.Channel]) -> Optional[int]: def deprecated_args_alias( deprecation_start: str, - deprecation_end: Optional[str] = None, - **aliases: Optional[str], + deprecation_end: str | None = None, + **aliases: str | None, ) -> Callable[[Callable[P1, T1]], Callable[P1, T1]]: """Allows to rename/deprecate a function kwarg(s) and optionally have the deprecated kwarg(s) set as alias(es) @@ -399,9 +396,9 @@ def wrapper(*args: P1.args, **kwargs: P1.kwargs) -> T1: def _rename_kwargs( func_name: str, start: str, - end: Optional[str], + end: str | None, kwargs: dict[str, Any], - aliases: dict[str, Optional[str]], + aliases: dict[str, str | None], ) -> None: """Helper function for `deprecated_args_alias`""" for alias, new in aliases.items(): @@ -501,7 +498,7 @@ def time_perfcounter_correlation() -> tuple[float, float]: return t1, performance_counter -def cast_from_string(string_val: str) -> Union[str, int, float, bool]: +def cast_from_string(string_val: str) -> str | int | float | bool: """Perform trivial type conversion from :class:`str` values. :param string_val: diff --git a/can/viewer.py b/can/viewer.py index 97bda1676..8d9d228bb 100644 --- a/can/viewer.py +++ b/can/viewer.py @@ -173,7 +173,9 @@ def unpack_data(cmd: int, cmd_to_struct: TDataStructs, data: bytes) -> list[floa # The conversion from raw values to SI-units are given in the rest of the tuple values = [ d // val if isinstance(val, int) else float(d) / val - for d, val in zip(struct_t.unpack(data), value[1:]) + for d, val in zip( + struct_t.unpack(data), value[1:], strict=False + ) ] else: # No conversion from SI-units is needed diff --git a/doc/changelog.d/1996.removed.md b/doc/changelog.d/1996.removed.md new file mode 100644 index 000000000..77458ad75 --- /dev/null +++ b/doc/changelog.d/1996.removed.md @@ -0,0 +1 @@ +Remove support for end-of-life Python 3.9. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index bc526ce73..551a7f3bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ dependencies = [ "packaging >= 23.1", "typing_extensions>=3.10.0.0", ] -requires-python = ">=3.9" +requires-python = ">=3.10" license = "LGPL-3.0-only" classifiers = [ "Development Status :: 5 - Production/Stable", @@ -27,7 +27,6 @@ classifiers = [ "Operating System :: Microsoft :: Windows", "Operating System :: POSIX :: Linux", "Programming Language :: Python", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", @@ -89,9 +88,9 @@ docs = [ ] lint = [ "pylint==3.3.*", - "ruff==0.12.11", - "black==25.1.*", - "mypy==1.17.*", + "ruff==0.14.*", + "black==25.9.*", + "mypy==1.18.*", ] test = [ "pytest==8.4.*", @@ -100,7 +99,7 @@ test = [ "coveralls==4.0.*", "pytest-cov==6.2.*", "coverage==7.10.*", - "hypothesis>=6.136,<6.138", + "hypothesis==6.*", "parameterized==0.9.*", ] dev = [ @@ -132,6 +131,7 @@ disallow_incomplete_defs = true warn_redundant_casts = true warn_unused_ignores = true exclude = [ + "^build", "^doc/conf.py$", "^test", "^can/interfaces/etas", diff --git a/tox.ini b/tox.ini index 5f393cb93..4e96291ed 100644 --- a/tox.ini +++ b/tox.ini @@ -69,11 +69,11 @@ dependency_groups = lint extras = commands = - mypy --python-version 3.9 . mypy --python-version 3.10 . mypy --python-version 3.11 . mypy --python-version 3.12 . mypy --python-version 3.13 . + mypy --python-version 3.14 . [pytest] testpaths = test