1111import os
1212from datetime import datetime , timedelta , timezone
1313from enum import Enum
14- from typing import Any , Callable , Dict , Generator , List , Optional , TextIO , Union
14+ from typing import Any , Callable , Dict , Generator , Optional , TextIO , Tuple , Union
1515
1616from ..message import Message
1717from ..typechecking import StringPathLike
18- from ..util import channel2int , dlc2len , len2dlc
19- from .generic import (
20- TextIOMessageReader ,
21- TextIOMessageWriter ,
22- )
18+ from ..util import channel2int , len2dlc
19+ from .generic import TextIOMessageReader , TextIOMessageWriter
2320
2421logger = logging .getLogger ("can.io.trc" )
2522
@@ -58,13 +55,22 @@ def __init__(
5855 """
5956 super ().__init__ (file , mode = "r" )
6057 self .file_version = TRCFileVersion .UNKNOWN
61- self .start_time : Optional [ datetime ] = None
58+ self ._start_time : float = 0
6259 self .columns : Dict [str , int ] = {}
60+ self ._num_columns = - 1
6361
6462 if not self .file :
6563 raise ValueError ("The given file cannot be None" )
6664
67- self ._parse_cols : Callable [[List [str ]], Optional [Message ]] = lambda x : None
65+ self ._parse_cols : Callable [[Tuple [str , ...]], Optional [Message ]] = (
66+ lambda x : None
67+ )
68+
69+ @property
70+ def start_time (self ) -> Optional [datetime ]:
71+ if self ._start_time :
72+ return datetime .fromtimestamp (self ._start_time , timezone .utc )
73+ return None
6874
6975 def _extract_header (self ):
7076 line = ""
@@ -89,16 +95,18 @@ def _extract_header(self):
8995 elif line .startswith (";$STARTTIME" ):
9096 logger .debug ("TRCReader: Found start time '%s'" , line )
9197 try :
92- self .start_time = datetime (
93- 1899 , 12 , 30 , tzinfo = timezone .utc
94- ) + timedelta (days = float (line .split ("=" )[1 ]))
98+ self ._start_time = (
99+ datetime (1899 , 12 , 30 , tzinfo = timezone .utc )
100+ + timedelta (days = float (line .split ("=" )[1 ]))
101+ ).timestamp ()
95102 except IndexError :
96103 logger .debug ("TRCReader: Failed to parse start time" )
97104 elif line .startswith (";$COLUMNS" ):
98105 logger .debug ("TRCReader: Found columns '%s'" , line )
99106 try :
100107 columns = line .split ("=" )[1 ].split ("," )
101108 self .columns = {column : columns .index (column ) for column in columns }
109+ self ._num_columns = len (columns ) - 1
102110 except IndexError :
103111 logger .debug ("TRCReader: Failed to parse columns" )
104112 elif line .startswith (";" ):
@@ -107,7 +115,7 @@ def _extract_header(self):
107115 break
108116
109117 if self .file_version >= TRCFileVersion .V1_1 :
110- if self .start_time is None :
118+ if self ._start_time is None :
111119 raise ValueError ("File has no start time information" )
112120
113121 if self .file_version >= TRCFileVersion .V2_0 :
@@ -132,7 +140,7 @@ def _extract_header(self):
132140
133141 return line
134142
135- def _parse_msg_v1_0 (self , cols : List [str ]) -> Optional [Message ]:
143+ def _parse_msg_v1_0 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
136144 arbit_id = cols [2 ]
137145 if arbit_id == "FFFFFFFF" :
138146 logger .info ("TRCReader: Dropping bus info line" )
@@ -147,16 +155,11 @@ def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]:
147155 msg .data = bytearray ([int (cols [i + 4 ], 16 ) for i in range (msg .dlc )])
148156 return msg
149157
150- def _parse_msg_v1_1 (self , cols : List [str ]) -> Optional [Message ]:
158+ def _parse_msg_v1_1 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
151159 arbit_id = cols [3 ]
152160
153161 msg = Message ()
154- if isinstance (self .start_time , datetime ):
155- msg .timestamp = (
156- self .start_time + timedelta (milliseconds = float (cols [1 ]))
157- ).timestamp ()
158- else :
159- msg .timestamp = float (cols [1 ]) / 1000
162+ msg .timestamp = float (cols [1 ]) / 1000 + self ._start_time
160163 msg .arbitration_id = int (arbit_id , 16 )
161164 msg .is_extended_id = len (arbit_id ) > 4
162165 msg .channel = 1
@@ -165,16 +168,11 @@ def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]:
165168 msg .is_rx = cols [2 ] == "Rx"
166169 return msg
167170
168- def _parse_msg_v1_3 (self , cols : List [str ]) -> Optional [Message ]:
171+ def _parse_msg_v1_3 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
169172 arbit_id = cols [4 ]
170173
171174 msg = Message ()
172- if isinstance (self .start_time , datetime ):
173- msg .timestamp = (
174- self .start_time + timedelta (milliseconds = float (cols [1 ]))
175- ).timestamp ()
176- else :
177- msg .timestamp = float (cols [1 ]) / 1000
175+ msg .timestamp = float (cols [1 ]) / 1000 + self ._start_time
178176 msg .arbitration_id = int (arbit_id , 16 )
179177 msg .is_extended_id = len (arbit_id ) > 4
180178 msg .channel = int (cols [2 ])
@@ -183,7 +181,7 @@ def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]:
183181 msg .is_rx = cols [3 ] == "Rx"
184182 return msg
185183
186- def _parse_msg_v2_x (self , cols : List [str ]) -> Optional [Message ]:
184+ def _parse_msg_v2_x (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
187185 type_ = cols [self .columns ["T" ]]
188186 bus = self .columns .get ("B" , None )
189187
@@ -192,50 +190,43 @@ def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]:
192190 dlc = len2dlc (length )
193191 elif "L" in self .columns :
194192 dlc = int (cols [self .columns ["L" ]])
195- length = dlc2len (dlc )
196193 else :
197194 raise ValueError ("No length/dlc columns present." )
198195
199196 msg = Message ()
200- if isinstance (self .start_time , datetime ):
201- msg .timestamp = (
202- self .start_time + timedelta (milliseconds = float (cols [self .columns ["O" ]]))
203- ).timestamp ()
204- else :
205- msg .timestamp = float (cols [1 ]) / 1000
197+ msg .timestamp = float (cols [self .columns ["O" ]]) / 1000 + self ._start_time
206198 msg .arbitration_id = int (cols [self .columns ["I" ]], 16 )
207199 msg .is_extended_id = len (cols [self .columns ["I" ]]) > 4
208200 msg .channel = int (cols [bus ]) if bus is not None else 1
209201 msg .dlc = dlc
210- msg .data = bytearray (
211- [int (cols [i + self .columns ["D" ]], 16 ) for i in range (length )]
212- )
202+ if dlc :
203+ msg .data = bytearray .fromhex (cols [self .columns ["D" ]])
213204 msg .is_rx = cols [self .columns ["d" ]] == "Rx"
214- msg .is_fd = type_ in [ "FD" , "FB" , "FE" , "BI" ]
215- msg .bitrate_switch = type_ in [ "FB" , " FE" ]
216- msg .error_state_indicator = type_ in [ "FE" , "BI" ]
205+ msg .is_fd = type_ in { "FD" , "FB" , "FE" , "BI" }
206+ msg .bitrate_switch = type_ in { "FB" , "FE" }
207+ msg .error_state_indicator = type_ in { "FE" , "BI" }
217208
218209 return msg
219210
220- def _parse_cols_v1_1 (self , cols : List [str ]) -> Optional [Message ]:
211+ def _parse_cols_v1_1 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
221212 dtype = cols [2 ]
222213 if dtype in ("Tx" , "Rx" ):
223214 return self ._parse_msg_v1_1 (cols )
224215 else :
225216 logger .info ("TRCReader: Unsupported type '%s'" , dtype )
226217 return None
227218
228- def _parse_cols_v1_3 (self , cols : List [str ]) -> Optional [Message ]:
219+ def _parse_cols_v1_3 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
229220 dtype = cols [3 ]
230221 if dtype in ("Tx" , "Rx" ):
231222 return self ._parse_msg_v1_3 (cols )
232223 else :
233224 logger .info ("TRCReader: Unsupported type '%s'" , dtype )
234225 return None
235226
236- def _parse_cols_v2_x (self , cols : List [str ]) -> Optional [Message ]:
227+ def _parse_cols_v2_x (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
237228 dtype = cols [self .columns ["T" ]]
238- if dtype in [ "DT" , "FD" , "FB" ] :
229+ if dtype in { "DT" , "FD" , "FB" , "FE" , "BI" } :
239230 return self ._parse_msg_v2_x (cols )
240231 else :
241232 logger .info ("TRCReader: Unsupported type '%s'" , dtype )
@@ -244,7 +235,7 @@ def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]:
244235 def _parse_line (self , line : str ) -> Optional [Message ]:
245236 logger .debug ("TRCReader: Parse '%s'" , line )
246237 try :
247- cols = line .split ()
238+ cols = tuple ( line .split (maxsplit = self . _num_columns ) )
248239 return self ._parse_cols (cols )
249240 except IndexError :
250241 logger .warning ("TRCReader: Failed to parse message '%s'" , line )
0 commit comments