Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions can/io/asc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
- under `test/data/logfile.asc`
"""

from typing import cast, Any, Generator, IO, List, Optional, Tuple, Union
from can import typechecking

from datetime import datetime
import time
import logging
Expand All @@ -32,7 +35,11 @@ class ASCReader(BaseIOHandler):
TODO: turn relative timestamps back to absolute form
"""

def __init__(self, file, base="hex"):
def __init__(
self,
file: Union[typechecking.FileLike, typechecking.StringPathLike],
base: str = "hex",
) -> None:
"""
:param file: a path-like object or as file-like object to read from
If this is a file-like object, is has to opened in text
Expand All @@ -42,10 +49,13 @@ def __init__(self, file, base="hex"):
this value will be overwritten. Default "hex".
"""
super().__init__(file, mode="r")

if not self.file:
raise ValueError("The given file cannot be None")
self.base = base

@staticmethod
def _extract_can_id(str_can_id, base):
def _extract_can_id(str_can_id: str, base: int) -> Tuple[int, bool]:
if str_can_id[-1:].lower() == "x":
is_extended = True
can_id = int(str_can_id[0:-1], base)
Expand All @@ -55,13 +65,15 @@ def _extract_can_id(str_can_id, base):
return can_id, is_extended

@staticmethod
def _check_base(base):
def _check_base(base: str) -> int:
if base not in ["hex", "dec"]:
raise ValueError('base should be either "hex" or "dec"')
return BASE_DEC if base == "dec" else BASE_HEX

def __iter__(self):
def __iter__(self) -> Generator[Message, None, None]:
base = self._check_base(self.base)
# This is guaranteed to not be None since we raise ValueError in __init__
self.file = cast(IO[Any], self.file)
for line in self.file:
# logger.debug("ASCReader: parsing line: '%s'", line.splitlines()[0])
if line.split(" ")[0] == "base":
Expand Down Expand Up @@ -194,7 +206,11 @@ class ASCWriter(BaseIOHandler, Listener):
FORMAT_DATE = "%a %b %d %I:%M:%S.{} %p %Y"
FORMAT_EVENT = "{timestamp: 9.6f} {message}\n"

def __init__(self, file, channel=1):
def __init__(
self,
file: Union[typechecking.FileLike, typechecking.StringPathLike],
channel: int = 1,
) -> None:
"""
:param file: a path-like object or as file-like object to write to
If this is a file-like object, is has to opened in text
Expand All @@ -203,6 +219,9 @@ def __init__(self, file, channel=1):
have a channel set
"""
super().__init__(file, mode="w")
if not self.file:
raise ValueError("The given file cannot be None")

self.channel = channel

# write start of file header
Expand All @@ -213,24 +232,29 @@ def __init__(self, file, channel=1):

# the last part is written with the timestamp of the first message
self.header_written = False
self.last_timestamp = None
self.started = None
self.last_timestamp = 0.0
self.started = 0.0

def stop(self):
def stop(self) -> None:
# This is guaranteed to not be None since we raise ValueError in __init__
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe it should be private?

self.file = cast(IO[Any], self.file)
if not self.file.closed:
self.file.write("End TriggerBlock\n")
super().stop()

def log_event(self, message, timestamp=None):
def log_event(self, message: str, timestamp: Optional[float] = None) -> None:
"""Add a message to the log file.

:param str message: an arbitrary message
:param float timestamp: the absolute timestamp of the event
:param message: an arbitrary message
:param timestamp: the absolute timestamp of the event
"""

if not message: # if empty or None
logger.debug("ASCWriter: ignoring empty message")
return
# This is guaranteed to not be None since we raise ValueError in __init__
self.file = cast(IO[Any], self.file)

# this is the case for the very first message:
if not self.header_written:
self.last_timestamp = timestamp or 0.0
Expand All @@ -251,14 +275,14 @@ def log_event(self, message, timestamp=None):
line = self.FORMAT_EVENT.format(timestamp=timestamp, message=message)
self.file.write(line)

def on_message_received(self, msg):
def on_message_received(self, msg: Message) -> None:

if msg.is_error_frame:
self.log_event("{} ErrorFrame".format(self.channel), msg.timestamp)
return
if msg.is_remote_frame:
dtype = "r"
data = []
data: List[str] = []
else:
dtype = "d {}".format(msg.dlc)
data = ["{:02X}".format(byte) for byte in msg.data]
Expand Down