diff --git a/can/io/trc.py b/can/io/trc.py index ec08d1af1..d1ee2b72d 100644 --- a/can/io/trc.py +++ b/can/io/trc.py @@ -7,15 +7,15 @@ Version 1.1 will be implemented as it is most commonly used """ # noqa -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum import io import os import logging -from typing import Generator, Optional, Union, TextIO, Callable, List +from typing import Generator, Optional, Union, TextIO, Callable, List, Dict from ..message import Message -from ..util import channel2int +from ..util import channel2int, len2dlc, dlc2len from .generic import FileIOMessageWriter, MessageReader from ..typechecking import StringPathLike @@ -32,6 +32,11 @@ class TRCFileVersion(Enum): V2_0 = 200 V2_1 = 201 + def __ge__(self, other): + if self.__class__ is other.__class__: + return self.value >= other.value + return NotImplemented + class TRCReader(MessageReader): """ @@ -51,6 +56,8 @@ def __init__( """ super().__init__(file, mode="r") self.file_version = TRCFileVersion.UNKNOWN + self.start_time: Optional[datetime] = None + self.columns: Dict[str, int] = {} if not self.file: raise ValueError("The given file cannot be None") @@ -67,17 +74,42 @@ def _extract_header(self): file_version = line.split("=")[1] if file_version == "1.1": self.file_version = TRCFileVersion.V1_1 + elif file_version == "2.0": + self.file_version = TRCFileVersion.V2_0 elif file_version == "2.1": self.file_version = TRCFileVersion.V2_1 else: self.file_version = TRCFileVersion.UNKNOWN except IndexError: logger.debug("TRCReader: Failed to parse version") + elif line.startswith(";$STARTTIME"): + logger.debug("TRCReader: Found start time '%s'", line) + try: + self.start_time = datetime( + 1899, 12, 30, tzinfo=timezone.utc + ) + timedelta(days=float(line.split("=")[1])) + except IndexError: + logger.debug("TRCReader: Failed to parse start time") + elif line.startswith(";$COLUMNS"): + logger.debug("TRCReader: Found columns '%s'", line) + try: + columns = line.split("=")[1].split(",") + self.columns = {column: columns.index(column) for column in columns} + except IndexError: + logger.debug("TRCReader: Failed to parse columns") elif line.startswith(";"): continue else: break + if self.file_version >= TRCFileVersion.V1_1: + if self.start_time is None: + raise ValueError("File has no start time information") + + if self.file_version >= TRCFileVersion.V2_0: + if not self.columns: + raise ValueError("File has no column information") + if self.file_version == TRCFileVersion.UNKNOWN: logger.info( "TRCReader: No file version was found, so version 1.0 is assumed" @@ -87,8 +119,8 @@ def _extract_header(self): self._parse_cols = self._parse_msg_V1_0 elif self.file_version == TRCFileVersion.V1_1: self._parse_cols = self._parse_cols_V1_1 - elif self.file_version == TRCFileVersion.V2_1: - self._parse_cols = self._parse_cols_V2_1 + elif self.file_version in [TRCFileVersion.V2_0, TRCFileVersion.V2_1]: + self._parse_cols = self._parse_cols_V2_x else: raise NotImplementedError("File version not fully implemented for reading") @@ -113,7 +145,12 @@ def _parse_msg_V1_1(self, cols: List[str]) -> Optional[Message]: arbit_id = cols[3] msg = Message() - msg.timestamp = float(cols[1]) / 1000 + if isinstance(self.start_time, datetime): + msg.timestamp = ( + self.start_time + timedelta(milliseconds=float(cols[1])) + ).timestamp() + else: + msg.timestamp = float(cols[1]) / 1000 msg.arbitration_id = int(arbit_id, 16) msg.is_extended_id = len(arbit_id) > 4 msg.channel = 1 @@ -122,15 +159,38 @@ def _parse_msg_V1_1(self, cols: List[str]) -> Optional[Message]: msg.is_rx = cols[2] == "Rx" return msg - def _parse_msg_V2_1(self, cols: List[str]) -> Optional[Message]: + def _parse_msg_V2_x(self, cols: List[str]) -> Optional[Message]: + type_ = cols[self.columns["T"]] + bus = self.columns.get("B", None) + + if "l" in self.columns: + length = int(cols[self.columns["l"]]) + dlc = len2dlc(length) + elif "L" in self.columns: + dlc = int(cols[self.columns["L"]]) + length = dlc2len(dlc) + else: + raise ValueError("No length/dlc columns present.") + msg = Message() - msg.timestamp = float(cols[1]) / 1000 - msg.arbitration_id = int(cols[4], 16) - msg.is_extended_id = len(cols[4]) > 4 - msg.channel = int(cols[3]) - msg.dlc = int(cols[7]) - msg.data = bytearray([int(cols[i + 8], 16) for i in range(msg.dlc)]) - msg.is_rx = cols[5] == "Rx" + if isinstance(self.start_time, datetime): + msg.timestamp = ( + self.start_time + timedelta(milliseconds=float(cols[self.columns["O"]])) + ).timestamp() + else: + msg.timestamp = float(cols[1]) / 1000 + msg.arbitration_id = int(cols[self.columns["I"]], 16) + msg.is_extended_id = len(cols[self.columns["I"]]) > 4 + msg.channel = int(cols[bus]) if bus is not None else 1 + msg.dlc = dlc + msg.data = bytearray( + [int(cols[i + self.columns["D"]], 16) for i in range(length)] + ) + msg.is_rx = cols[self.columns["d"]] == "Rx" + msg.is_fd = type_ in ["FD", "FB", "FE", "BI"] + msg.bitrate_switch = type_ in ["FB", " FE"] + msg.error_state_indicator = type_ in ["FE", "BI"] + return msg def _parse_cols_V1_1(self, cols: List[str]) -> Optional[Message]: @@ -141,10 +201,10 @@ def _parse_cols_V1_1(self, cols: List[str]) -> Optional[Message]: logger.info("TRCReader: Unsupported type '%s'", dtype) return None - def _parse_cols_V2_1(self, cols: List[str]) -> Optional[Message]: - dtype = cols[2] - if dtype == "DT": - return self._parse_msg_V2_1(cols) + def _parse_cols_V2_x(self, cols: List[str]) -> Optional[Message]: + dtype = cols[self.columns["T"]] + if dtype in ["DT", "FD", "FB"]: + return self._parse_msg_V2_x(cols) else: logger.info("TRCReader: Unsupported type '%s'", dtype) return None @@ -228,7 +288,7 @@ def __init__( self._msg_fmt_string = self.FORMAT_MESSAGE_V1_0 self._format_message = self._format_message_init - def _write_header_V1_0(self, start_time: timedelta) -> None: + def _write_header_V1_0(self, start_time: datetime) -> None: lines = [ ";##########################################################################", f"; {self.filepath}", @@ -249,13 +309,11 @@ def _write_header_V1_0(self, start_time: timedelta) -> None: ] self.file.writelines(line + "\n" for line in lines) - def _write_header_V2_1(self, header_time: timedelta, start_time: datetime) -> None: - milliseconds = int( - (header_time.seconds * 1000) + (header_time.microseconds / 1000) - ) + def _write_header_V2_1(self, start_time: datetime) -> None: + header_time = start_time - datetime(year=1899, month=12, day=30) lines = [ ";$FILEVERSION=2.1", - f";$STARTTIME={header_time.days}.{milliseconds}", + f";$STARTTIME={header_time/timedelta(days=1)}", ";$COLUMNS=N,O,T,B,I,d,R,L,D", ";", f"; {self.filepath}", @@ -308,14 +366,12 @@ def _format_message_init(self, msg, channel): def write_header(self, timestamp: float) -> None: # write start of file header - ref_time = datetime(year=1899, month=12, day=30) - start_time = datetime.now() + timedelta(seconds=timestamp) - header_time = start_time - ref_time + start_time = datetime.utcfromtimestamp(timestamp) if self.file_version == TRCFileVersion.V1_0: - self._write_header_V1_0(header_time) + self._write_header_V1_0(start_time) elif self.file_version == TRCFileVersion.V2_1: - self._write_header_V2_1(header_time, start_time) + self._write_header_V2_1(start_time) else: raise NotImplementedError("File format is not supported") self.header_written = True diff --git a/test/data/test_CanMessage.trc b/test/data/test_CanMessage.trc index 215997b57..8b1361808 100644 --- a/test/data/test_CanMessage.trc +++ b/test/data/test_CanMessage.trc @@ -1,5 +1,5 @@ ;$FILEVERSION=2.1 -;$STARTTIME=0 +;$STARTTIME=43008.920986006946 ;$COLUMNS=N,O,T,B,I,d,R,L,D ; ; C:\Users\User\Desktop\python-can\test\data\test_CanMessage.trc diff --git a/test/logformats_test.py b/test/logformats_test.py index 05c8b986f..3486827a9 100644 --- a/test/logformats_test.py +++ b/test/logformats_test.py @@ -810,9 +810,10 @@ class TestTrcFileFormatGen(TestTrcFileFormatBase): """Generic tests for can.TRCWriter and can.TRCReader with different file versions""" def test_can_message(self): + start_time = 1506809173.191 # 30.09.2017 22:06:13.191.000 as timestamp expected_messages = [ can.Message( - timestamp=2.5010, + timestamp=start_time + 2.5010, arbitration_id=0xC8, is_extended_id=False, is_rx=False, @@ -821,7 +822,7 @@ def test_can_message(self): data=[9, 8, 7, 6, 5, 4, 3, 2], ), can.Message( - timestamp=17.876708, + timestamp=start_time + 17.876708, arbitration_id=0x6F9, is_extended_id=False, channel=0, @@ -841,10 +842,17 @@ def test_can_message(self): ) def test_can_message_versions(self, name, filename, is_rx_support): with self.subTest(name): + if name == "V1_0": + # Version 1.0 does not support start time + start_time = 0 + else: + start_time = ( + 1639837687.062001 # 18.12.2021 14:28:07.062.001 as timestamp + ) def msg_std(timestamp): msg = can.Message( - timestamp=timestamp, + timestamp=timestamp + start_time, arbitration_id=0x000, is_extended_id=False, channel=1, @@ -857,7 +865,7 @@ def msg_std(timestamp): def msg_ext(timestamp): msg = can.Message( - timestamp=timestamp, + timestamp=timestamp + start_time, arbitration_id=0x100, is_extended_id=True, channel=1,