Skip to content

Commit

Permalink
Refactor message type accessor, improve tests
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre R. Mai <pmai@pmsf.de>
  • Loading branch information
pmai committed Jun 10, 2024
1 parent 91cc877 commit f9f0bf0
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
4 changes: 2 additions & 2 deletions osi3trace/osi2read.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
python3 osi2read.py -d trace.osi -o myreadableosifile
"""

from osi3trace.osi_trace import OSITrace, MESSAGES_TYPE
from osi3trace.osi_trace import OSITrace
import argparse
import pathlib

Expand All @@ -28,7 +28,7 @@ def command_line_arguments():
"--type",
"-t",
help="Name of the type used to serialize data.",
choices=list(MESSAGES_TYPE.keys()),
choices=OSITrace.message_types(),
default="SensorView",
type=str,
required=False,
Expand Down
5 changes: 5 additions & 0 deletions osi3trace/osi_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ def map_message_type(type_name):
"""Map the type name to the protobuf message type."""
return MESSAGES_TYPE[type_name]

@staticmethod
def message_types():
"""Message types that OSITrace supports."""
return list(MESSAGES_TYPE.keys())

def __init__(self, path=None, type_name="SensorView", cache_messages=False):
self.type = self.map_message_type(type_name)
self.file = None
Expand Down
27 changes: 27 additions & 0 deletions tests/test_osi_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ def test_osi_trace_sv(self):
trace = OSITrace(path_input)
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, SensorView)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))
Expand All @@ -40,7 +43,10 @@ def test_osi_trace_gt(self):
trace = OSITrace(path_input, "GroundTruth")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, GroundTruth)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))
Expand All @@ -54,7 +60,10 @@ def test_osi_trace_sd(self):
trace = OSITrace(path_input, "SensorData")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, SensorData)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))
Expand All @@ -68,7 +77,10 @@ def test_osi_trace_svc(self):
trace = OSITrace(path_input, "SensorViewConfiguration")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, SensorViewConfiguration)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 1)
trace.close()

self.assertTrue(os.path.exists(path_output))
Expand All @@ -82,7 +94,10 @@ def test_osi_trace_tu(self):
trace = OSITrace(path_input, "TrafficUpdate")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, TrafficUpdate)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))
Expand All @@ -96,7 +111,10 @@ def test_osi_trace_tc(self):
trace = OSITrace(path_input, "TrafficCommand")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, TrafficCommand)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))
Expand All @@ -110,7 +128,10 @@ def test_osi_trace_tcu(self):
trace = OSITrace(path_input, "TrafficCommandUpdate")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, TrafficCommandUpdate)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))
Expand All @@ -124,7 +145,10 @@ def test_osi_trace_mr(self):
trace = OSITrace(path_input, "MotionRequest")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, MotionRequest)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))
Expand All @@ -138,7 +162,10 @@ def test_osi_trace_su(self):
trace = OSITrace(path_input, "StreamingUpdate")
with open(path_output, "wt") as f:
for message in trace:
self.assertIsInstance(message, StreamingUpdate)
f.write(str(message))

self.assertEqual(len(trace.retrieve_offsets()), 10)
trace.close()

self.assertTrue(os.path.exists(path_output))
Expand Down

0 comments on commit f9f0bf0

Please sign in to comment.