diff --git a/can/io/asc.py b/can/io/asc.py index f039cda32..db596ca25 100644 --- a/can/io/asc.py +++ b/can/io/asc.py @@ -10,6 +10,7 @@ import time from datetime import datetime from typing import Any, Dict, Generator, List, Optional, TextIO, Union +from packaging.version import Version from ..message import Message from ..typechecking import StringPathLike @@ -56,31 +57,59 @@ def __init__( raise ValueError("The given file cannot be None") self.base = base self._converted_base = self._check_base(base) + self.asc_version = Version("0.0.0") # init asc format version + self._new_asc_version = Version("8.1.0") + # TODO: what is relative timestamp? Seems it should be timestamps_format self.relative_timestamp = relative_timestamp self.date: Optional[str] = None self.start_time = 0.0 # TODO - what is this used for? The ASC Writer only prints `absolute` self.timestamps_format: Optional[str] = None self.internal_events_logged = False + self._extract_header() def _extract_header(self) -> None: for _line in self.file: line = _line.strip() + # parse date datetime_match = re.match( r"date\s+\w+\s+(?P.+)", line, re.IGNORECASE ) + + # parse base base_match = re.match( r"base\s+(?Phex|dec)(?:\s+timestamps\s+" r"(?Pabsolute|relative))?", line, re.IGNORECASE, ) + + # parse asc format version + asc_version_match = re.match( + r"// version (?P.+)", line, re.IGNORECASE + ) + comment_match = re.match(r"//.*", line) + events_match = re.match( r"(?Pno)?\s*internal\s+events\s+logged", line, re.IGNORECASE ) + # parse start time + trigger_match = re.match( + r"begin\s+triggerblock\s+\w+\s+(?P.+)", + line, + re.IGNORECASE, + ) + + # parse start time + trigger_match = re.match( + r"begin\s+triggerblock\s+\w+\s+(?P.+)", + line, + re.IGNORECASE, + ) + if datetime_match: self.date = datetime_match.group("datetime_string") self.start_time = ( @@ -98,14 +127,26 @@ def _extract_header(self) -> None: self.timestamps_format = timestamp_format or "absolute" continue + if asc_version_match: + asc_version = asc_version_match.group("version") + self.asc_version = Version(asc_version) + continue + if comment_match: continue if events_match: self.internal_events_logged = events_match.group("no_events") is None - break + continue - break + if trigger_match: + datetime_str = trigger_match.group("datetime_string") + self.start_time = ( + 0.0 + if self.timestamps_format == "relative" + else self._datetime_to_timestamp(datetime_str) + ) + break @staticmethod def _datetime_to_timestamp(datetime_string: str) -> float: @@ -252,26 +293,63 @@ def _process_fd_can_frame(self, line: str, msg_kwargs: Dict[str, Any]) -> Messag return Message(**msg_kwargs) + def _process_fd_can_frame_2(self, line: str, msg_kwargs: Dict[str, Any]) -> Message: + channel, direction, rest_of_message = line.split(None, 2) + # See ASCWriter + msg_kwargs["channel"] = int(channel) - 1 + msg_kwargs["is_rx"] = direction == "Rx" + + # CAN FD error frame + if rest_of_message.strip()[:10].lower() == "errorframe": + # Error Frame + # TODO: maybe use regex to parse BRS, ESI, etc? + msg_kwargs["is_error_frame"] = True + else: + ( + can_id_str, + symbolic_name, + frame_name_or_brs, + rest_of_message, + ) = rest_of_message.split(None, 3) + + if frame_name_or_brs.isdigit(): + brs = frame_name_or_brs + esi, dlc_str, data_length_str, data = rest_of_message.split(None, 3) + else: + brs, esi, dlc_str, data_length_str, data = rest_of_message.split( + None, 4 + ) + + self._extract_can_id(can_id_str, msg_kwargs) + msg_kwargs["bitrate_switch"] = brs == "1" + msg_kwargs["error_state_indicator"] = esi == "1" + dlc = int(dlc_str, self._converted_base) + data_length = int(data_length_str) + if data_length == 0: + # CAN remote Frame + msg_kwargs["is_remote_frame"] = True + msg_kwargs["dlc"] = dlc + else: + if dlc2len(dlc) != data_length: + logger.warning( + "DLC vs Data Length mismatch %d[%d] != %d", + dlc, + dlc2len(dlc), + data_length, + ) + msg_kwargs["dlc"] = data_length + + self._process_data_string(data, data_length, msg_kwargs) + + return Message(**msg_kwargs) + def __iter__(self) -> Generator[Message, None, None]: - self._extract_header() + # extract head in initial + # self._extract_header() for _line in self.file: line = _line.strip() - trigger_match = re.match( - r"begin\s+triggerblock\s+\w+\s+(?P.+)", - line, - re.IGNORECASE, - ) - if trigger_match: - datetime_str = trigger_match.group("datetime_string") - self.start_time = ( - 0.0 - if self.relative_timestamp - else self._datetime_to_timestamp(datetime_str) - ) - continue - if not re.match( r"\d+\.\d+\s+(\d+\s+(\w+\s+(Tx|Rx)|ErrorFrame)|CANFD)", line, @@ -300,8 +378,15 @@ def __iter__(self) -> Generator[Message, None, None]: if "is_fd" not in msg_kwargs: msg = self._process_classic_can_frame(rest_of_message, msg_kwargs) - else: + elif self.asc_version < self._new_asc_version: msg = self._process_fd_can_frame(rest_of_message, msg_kwargs) + else: + if self.asc_version < self._new_asc_version: + logger.warning( + "ASC format is under 8.1 or unknown, process may not safe." + ) + else: + msg = self._process_fd_can_frame(rest_of_message, msg_kwargs) if msg is not None: yield msg