Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply asc format since format v8.1 #1641

Closed
wants to merge 6 commits into from
Closed
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
116 changes: 98 additions & 18 deletions can/io/asc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
from datetime import datetime
from typing import Any, Dict, Generator, List, Optional, TextIO, Union
from packaging.version import Version

from ..message import Message
from ..typechecking import StringPathLike
Expand Down Expand Up @@ -56,31 +57,59 @@ def __init__(
raise ValueError("The given file cannot be None")
self.base = base
self._converted_base = self._check_base(base)
self.asc_version = Version("0.0.0") # init asc format version
self._new_asc_version = Version("8.1.0")
# TODO: what is relative timestamp? Seems it should be timestamps_format
self.relative_timestamp = relative_timestamp
self.date: Optional[str] = None
self.start_time = 0.0
# TODO - what is this used for? The ASC Writer only prints `absolute`
self.timestamps_format: Optional[str] = None
self.internal_events_logged = False
self._extract_header()

def _extract_header(self) -> None:
for _line in self.file:
line = _line.strip()

# parse date
datetime_match = re.match(
r"date\s+\w+\s+(?P<datetime_string>.+)", line, re.IGNORECASE
)

# parse base
base_match = re.match(
r"base\s+(?P<base>hex|dec)(?:\s+timestamps\s+"
r"(?P<timestamp_format>absolute|relative))?",
line,
re.IGNORECASE,
)

# parse asc format version
asc_version_match = re.match(
r"// version (?P<version>.+)", line, re.IGNORECASE
)

comment_match = re.match(r"//.*", line)

events_match = re.match(
r"(?P<no_events>no)?\s*internal\s+events\s+logged", line, re.IGNORECASE
)

# parse start time
trigger_match = re.match(
r"begin\s+triggerblock\s+\w+\s+(?P<datetime_string>.+)",
line,
re.IGNORECASE,
)

# parse start time
trigger_match = re.match(
r"begin\s+triggerblock\s+\w+\s+(?P<datetime_string>.+)",
line,
re.IGNORECASE,
)

if datetime_match:
self.date = datetime_match.group("datetime_string")
self.start_time = (
Expand All @@ -98,14 +127,26 @@ def _extract_header(self) -> None:
self.timestamps_format = timestamp_format or "absolute"
continue

if asc_version_match:
asc_version = asc_version_match.group("version")
self.asc_version = Version(asc_version)
continue

if comment_match:
continue

if events_match:
self.internal_events_logged = events_match.group("no_events") is None
break
continue

break
if trigger_match:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the files can contain multiple triggers. Parsing it as part of the header is probably not sufficient.

datetime_str = trigger_match.group("datetime_string")
self.start_time = (
0.0
if self.timestamps_format == "relative"
else self._datetime_to_timestamp(datetime_str)
)
break

@staticmethod
def _datetime_to_timestamp(datetime_string: str) -> float:
Expand Down Expand Up @@ -252,26 +293,63 @@ def _process_fd_can_frame(self, line: str, msg_kwargs: Dict[str, Any]) -> Messag

return Message(**msg_kwargs)

def _process_fd_can_frame_2(self, line: str, msg_kwargs: Dict[str, Any]) -> Message:
channel, direction, rest_of_message = line.split(None, 2)
# See ASCWriter
msg_kwargs["channel"] = int(channel) - 1
msg_kwargs["is_rx"] = direction == "Rx"

# CAN FD error frame
if rest_of_message.strip()[:10].lower() == "errorframe":
# Error Frame
# TODO: maybe use regex to parse BRS, ESI, etc?
msg_kwargs["is_error_frame"] = True
else:
(
can_id_str,
symbolic_name,
frame_name_or_brs,
rest_of_message,
) = rest_of_message.split(None, 3)

if frame_name_or_brs.isdigit():
brs = frame_name_or_brs
esi, dlc_str, data_length_str, data = rest_of_message.split(None, 3)
else:
brs, esi, dlc_str, data_length_str, data = rest_of_message.split(
None, 4
)

self._extract_can_id(can_id_str, msg_kwargs)
msg_kwargs["bitrate_switch"] = brs == "1"
msg_kwargs["error_state_indicator"] = esi == "1"
dlc = int(dlc_str, self._converted_base)
data_length = int(data_length_str)
if data_length == 0:
# CAN remote Frame
msg_kwargs["is_remote_frame"] = True
msg_kwargs["dlc"] = dlc
else:
if dlc2len(dlc) != data_length:
logger.warning(
"DLC vs Data Length mismatch %d[%d] != %d",
dlc,
dlc2len(dlc),
data_length,
)
msg_kwargs["dlc"] = data_length

self._process_data_string(data, data_length, msg_kwargs)

return Message(**msg_kwargs)

def __iter__(self) -> Generator[Message, None, None]:
self._extract_header()
# extract head in initial
# self._extract_header()

for _line in self.file:
line = _line.strip()

trigger_match = re.match(
r"begin\s+triggerblock\s+\w+\s+(?P<datetime_string>.+)",
line,
re.IGNORECASE,
)
if trigger_match:
datetime_str = trigger_match.group("datetime_string")
self.start_time = (
0.0
if self.relative_timestamp
else self._datetime_to_timestamp(datetime_str)
)
continue

if not re.match(
r"\d+\.\d+\s+(\d+\s+(\w+\s+(Tx|Rx)|ErrorFrame)|CANFD)",
line,
Expand Down Expand Up @@ -300,8 +378,10 @@ def __iter__(self) -> Generator[Message, None, None]:

if "is_fd" not in msg_kwargs:
msg = self._process_classic_can_frame(rest_of_message, msg_kwargs)
else:
elif self.asc_version < self._new_asc_version:
msg = self._process_fd_can_frame(rest_of_message, msg_kwargs)
else:
msg = self._process_fd_can_frame_2(rest_of_message, msg_kwargs)
if msg is not None:
yield msg

Expand Down