diff --git a/.travis.yml b/.travis.yml index fe4253992..843bb9312 100644 --- a/.travis.yml +++ b/.travis.yml @@ -97,6 +97,7 @@ jobs: can/broadcastmanager.py can/bus.py can/interface.py + can/interfaces/slcan.py can/interfaces/socketcan/**.py can/interfaces/virtual.py can/listener.py diff --git a/can/interfaces/slcan.py b/can/interfaces/slcan.py index 5d7450987..a952fb7c4 100644 --- a/can/interfaces/slcan.py +++ b/can/interfaces/slcan.py @@ -7,6 +7,9 @@ """ +from typing import Any, Optional, Tuple +from can import typechecking + import time import logging @@ -52,38 +55,39 @@ class slcanBus(BusABC): def __init__( self, - channel, - ttyBaudrate=115200, - bitrate=None, - btr=None, - sleep_after_open=_SLEEP_AFTER_SERIAL_OPEN, - rtscts=False, - **kwargs - ): + channel: typechecking.ChannelStr, + ttyBaudrate: int = 115200, + bitrate: Optional[int] = None, + btr: Optional[str] = None, + sleep_after_open: float = _SLEEP_AFTER_SERIAL_OPEN, + rtscts: bool = False, + **kwargs: Any + ) -> None: """ :raise ValueError: if both *bitrate* and *btr* are set - :param str channel: + :param channel: port of underlying serial or usb device (e.g. /dev/ttyUSB0, COM8, ...) Must not be empty. - :param int ttyBaudrate: + :param ttyBaudrate: baudrate of underlying serial or usb device - :param int bitrate: + :param bitrate: Bitrate in bit/s - :param str btr: + :param btr: BTR register value to set custom can speed - :param float poll_interval: + :param poll_interval: Poll interval in seconds when reading messages - :param float sleep_after_open: + :param sleep_after_open: Time to wait in seconds after opening serial connection - :param bool rtscts: + :param rtscts: turn hardware handshake (RTS/CTS) on and off """ if not channel: # if None or empty raise TypeError("Must specify a serial port.") if "@" in channel: - (channel, ttyBaudrate) = channel.split("@") + (channel, baudrate) = channel.split("@") + ttyBaudrate = int(baudrate) self.serialPortOrig = serial.serial_for_url( channel, baudrate=ttyBaudrate, rtscts=rtscts ) @@ -104,11 +108,11 @@ def __init__( channel, ttyBaudrate=115200, bitrate=None, rtscts=False, **kwargs ) - def set_bitrate(self, bitrate): + def set_bitrate(self, bitrate: int) -> None: """ :raise ValueError: if both *bitrate* is not among the possible values - :param int bitrate: + :param bitrate: Bitrate in bit/s """ self.close() @@ -116,24 +120,26 @@ def set_bitrate(self, bitrate): self._write(self._BITRATES[bitrate]) else: raise ValueError( - "Invalid bitrate, choose one of " + (", ".join(self._BITRATES)) + "." + "Invalid bitrate, choose one of " + + (", ".join(str(k) for k in self._BITRATES.keys())) + + "." ) self.open() - def set_bitrate_reg(self, btr): + def set_bitrate_reg(self, btr: str) -> None: """ - :param str btr: + :param btr: BTR register value to set custom can speed """ self.close() self._write("s" + btr) self.open() - def _write(self, string): + def _write(self, string: str) -> None: self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR) self.serialPortOrig.flush() - def _read(self, timeout): + def _read(self, timeout: Optional[float]) -> Optional[str]: # first read what is already in receive buffer while self.serialPortOrig.in_waiting: @@ -165,18 +171,20 @@ def _read(self, timeout): break return string - def flush(self): + def flush(self) -> None: del self._buffer[:] while self.serialPortOrig.in_waiting: self.serialPortOrig.read() - def open(self): + def open(self) -> None: self._write("O") - def close(self): + def close(self) -> None: self._write("C") - def _recv_internal(self, timeout): + def _recv_internal( + self, timeout: Optional[float] + ) -> Tuple[Optional[Message], bool]: canId = None remote = False @@ -223,7 +231,7 @@ def _recv_internal(self, timeout): return msg, False return None, False - def send(self, msg, timeout=None): + def send(self, msg: Message, timeout: Optional[float] = None) -> None: if timeout != self.serialPortOrig.write_timeout: self.serialPortOrig.write_timeout = timeout if msg.is_remote_frame: @@ -239,20 +247,21 @@ def send(self, msg, timeout=None): sendStr += "".join(["%02X" % b for b in msg.data]) self._write(sendStr) - def shutdown(self): + def shutdown(self) -> None: self.close() self.serialPortOrig.close() - def fileno(self): + def fileno(self) -> int: if hasattr(self.serialPortOrig, "fileno"): return self.serialPortOrig.fileno() # Return an invalid file descriptor on Windows return -1 - def get_version(self, timeout): + def get_version( + self, timeout: Optional[float] + ) -> Tuple[Optional[int], Optional[int]]: """Get HW and SW version of the slcan interface. - :type timeout: int or None :param timeout: seconds to wait for version or None to wait indefinitely @@ -288,14 +297,12 @@ def get_version(self, timeout): else: return None, None - def get_serial_number(self, timeout): + def get_serial_number(self, timeout: Optional[float]) -> Optional[str]: """Get serial number of the slcan interface. - :type timeout: int or None :param timeout: seconds to wait for serial number or None to wait indefinitely - :rtype str or None :return: None on timeout or a str object. """ diff --git a/can/typechecking.py b/can/typechecking.py index 0327874f5..50070b01e 100644 --- a/can/typechecking.py +++ b/can/typechecking.py @@ -22,7 +22,9 @@ CanData = typing.Union[bytes, bytearray, int, typing.Iterable[int]] # Used for the Abstract Base Class -Channel = typing.Union[int, str] +ChannelStr = str +ChannelInt = int +Channel = typing.Union[ChannelInt, ChannelStr] # Used by the IO module FileLike = typing.IO[typing.Any]