Skip to content

Commit

Permalink
Fix test_message_conversion.py (#645)
Browse files Browse the repository at this point in the history
**Public Changes**
None

**Description**

Fixed test_message_conversion.py.
Since it has some bugs in the application code, I'll fix them in another PR #646.

Related: #643

Signed-off-by: Kenji Miyake <kenji.miyake@tier4.jp>
  • Loading branch information
kenji-miyake committed Dec 10, 2021
1 parent 958d413 commit cf57ed9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 63 deletions.
1 change: 1 addition & 0 deletions rosbridge_library/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ if (BUILD_TESTING)
# ament_add_pytest_test(test_internal "test/internal")
# ament_add_pytest_test(test_services "test/internal/test_services.py")
ament_add_pytest_test(test_ros_loader "test/internal/test_ros_loader.py")
ament_add_pytest_test(test_message_conversion "test/internal/test_message_conversion.py")
endif()
115 changes: 52 additions & 63 deletions rosbridge_library/test/internal/test_message_conversion.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
#!/usr/bin/env python3
import unittest
from base64 import standard_b64encode
from io import BytesIO
from json import dumps, loads

import rospy
import rostest
import numpy as np
from rclpy.serialization import deserialize_message, serialize_message
from rosbridge_library.internal import message_conversion as c
from rosbridge_library.internal import ros_loader


class TestMessageConversion(unittest.TestCase):
def setUp(self):
rospy.init_node("test_message_conversion")

def validate_instance(self, inst1):
"""Serializes and deserializes the inst to typecheck and ensure that
instances are correct"""
inst1._check_types()
buff = BytesIO()
inst1.serialize(buff)
inst2 = type(inst1)()
inst2.deserialize(buff.getvalue())
inst2 = deserialize_message(serialize_message(inst1), type(inst1))
self.assertEqual(inst1, inst2)
inst2._check_types()

def msgs_equal(self, msg1, msg2):
if isinstance(msg1, str) and isinstance(msg2, str):
Expand All @@ -33,7 +24,7 @@ def msgs_equal(self, msg1, msg2):
if type(msg1) in c.list_types:
for x, y in zip(msg1, msg2):
self.msgs_equal(x, y)
elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
elif type(msg1) in c.primitive_types or type(msg1) is str:
self.assertEqual(msg1, msg2)
else:
for x in msg1:
Expand All @@ -55,6 +46,17 @@ def do_primitive_test(self, data_value, msgtype):
self.assertEqual(msg["data"], msg2["data"])
self.assertEqual(msg2["data"], inst.data)

def do_byte_test(self, data_value, msgtype):
for msg in [{"data": data_value}]:
inst = ros_loader.get_message_instance(msgtype)
c.populate_instance(msg, inst)
self.assertEqual(inst.data, bytes([data_value]))
self.validate_instance(inst)
extracted = c.extract_values(inst)
for msg2 in [extracted, loads(dumps(extracted))]:
self.assertEqual(msg["data"], msg2["data"])
self.assertEqual(bytes([msg2["data"]]), inst.data)

def do_test(self, orig_msg, msgtype):
for msg in [orig_msg, loads(dumps(orig_msg))]:
inst = ros_loader.get_message_instance(msgtype)
Expand All @@ -76,6 +78,14 @@ def test_int_primitives(self):
self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
self.assertEqual(c._to_inst(msg, rostype, rostype), msg)

@unittest.skip
def test_byte_primitives(self):
# Test raw primitives
for msg in range(0, 200):
for rostype in ["octet"]:
self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), bytes([msg]))
self.assertEqual(c._to_inst(msg, rostype, rostype), bytes([msg]))

def test_bool_primitives(self):
self.assertTrue(c._to_primitive_inst(True, "bool", "bool", []))
self.assertTrue(c._to_inst(True, "bool", "bool"))
Expand All @@ -98,7 +108,6 @@ def test_float_special_cases(self):
def test_signed_int_base_msgs(self):
int8s = range(-127, 128)
for int8 in int8s:
self.do_primitive_test(int8, "std_msgs/Byte")
self.do_primitive_test(int8, "std_msgs/Int8")
self.do_primitive_test(int8, "std_msgs/Int16")
self.do_primitive_test(int8, "std_msgs/Int32")
Expand All @@ -109,21 +118,18 @@ def test_signed_int_base_msgs(self):
self.do_primitive_test(int16, "std_msgs/Int16")
self.do_primitive_test(int16, "std_msgs/Int32")
self.do_primitive_test(int16, "std_msgs/Int64")
self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Byte")
self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Int8")

int32s = [-2147483647, 2147483647]
for int32 in int32s:
self.do_primitive_test(int32, "std_msgs/Int32")
self.do_primitive_test(int32, "std_msgs/Int64")
self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Byte")
self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int8")
self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int16")

int64s = [-9223372036854775807, 9223372036854775807]
for int64 in int64s:
self.do_primitive_test(int64, "std_msgs/Int64")
self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Byte")
self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int8")
self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int16")
self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int32")
Expand Down Expand Up @@ -166,6 +172,12 @@ def test_unsigned_int_base_msgs(self):
self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt16")
self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt32")

@unittest.skip
def test_byte_base_msg(self):
int8s = range(0, 256)
for int8 in int8s:
self.do_byte_test(int8, "std_msgs/Byte")

def test_bool_base_msg(self):
self.do_primitive_test(True, "std_msgs/Bool")
self.do_primitive_test(False, "std_msgs/Bool")
Expand All @@ -175,40 +187,24 @@ def test_string_base_msg(self):
self.do_primitive_test(x, "std_msgs/String")

def test_time_msg(self):
msg = {"data": {"secs": 3, "nsecs": 5}}
self.do_test(msg, "std_msgs/Time")

msg = {"times": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
self.do_test(msg, "rosbridge_test_msgs/TestTimeArray")

def test_time_msg_now(self):
msg = {"data": "now"}
msgtype = "std_msgs/Time"

inst = ros_loader.get_message_instance(msgtype)
c.populate_instance(msg, inst)
currenttime = rospy.get_rostime()
self.validate_instance(inst)
extracted = c.extract_values(inst)
print(extracted)
self.assertIn("data", extracted)
self.assertIn("secs", extracted["data"])
self.assertIn("nsecs", extracted["data"])
self.assertNotEqual(extracted["data"]["secs"], 0)
self.assertLessEqual(extracted["data"]["secs"], currenttime.secs)
self.assertGreaterEqual(currenttime.secs, extracted["data"]["secs"])
msg = {"sec": 3, "nanosec": 5}
self.do_test(msg, "builtin_interfaces/Time")

# TODO: enable this test
# msg = {"times": [{"sec": 3, "nanosec": 5}, {"sec": 2, "nanosec": 7}]}
# self.do_test(msg, "rosbridge_test_msgs/TestTimeArray")

def test_duration_msg(self):
msg = {"data": {"secs": 3, "nsecs": 5}}
self.do_test(msg, "std_msgs/Duration")
msg = {"sec": 3, "nanosec": 5}
self.do_test(msg, "builtin_interfaces/Duration")

msg = {"durations": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
self.do_test(msg, "rosbridge_test_msgs/TestDurationArray")
# TODO: enable this test
# msg = {"durations": [{"sec": 3, "nanosec": 5}, {"sec": 2, "nanosec": 7}]}
# self.do_test(msg, "rosbridge_test_msgs/TestDurationArray")

def test_header_msg(self):
msg = {
"seq": 5,
"stamp": {"secs": 12347, "nsecs": 322304},
"stamp": {"sec": 12347, "nanosec": 322304},
"frame_id": "2394dnfnlcx;v[p234j]",
}
self.do_test(msg, "std_msgs/Header")
Expand All @@ -218,8 +214,6 @@ def test_header_msg(self):
self.do_test(msg, "rosbridge_test_msgs/TestHeaderTwo")

msg = {"header": [msg["header"], msg["header"], msg["header"]]}
msg["header"][1]["seq"] = 6
msg["header"][2]["seq"] = 7
self.do_test(msg, "rosbridge_test_msgs/TestHeaderArray")

def test_assorted_msgs(self):
Expand All @@ -246,6 +240,7 @@ def test_assorted_msgs(self):
c.populate_instance(msg, inst2)
self.assertEqual(inst, inst2)

@unittest.skip
def test_int8array(self):
def test_int8_msg(rostype, data):
msg = {"data": data}
Expand All @@ -257,31 +252,25 @@ def test_int8_msg(rostype, data):
for msgtype in ["TestChar", "TestUInt8"]:
rostype = "rosbridge_test_msgs/" + msgtype

# From List[int]
int8s = list(range(0, 256))
ret = test_int8_msg(rostype, int8s)
self.assertEqual(ret, bytes(bytearray(int8s)))
np.testing.assert_array_equal(ret, np.array(int8s))

str_int8s = bytes(bytearray(int8s))

b64str_int8s = standard_b64encode(str_int8s).decode("ascii")
# From base64 string
b64str_int8s = standard_b64encode(bytes(int8s)).decode("ascii")
ret = test_int8_msg(rostype, b64str_int8s)
self.assertEqual(ret, str_int8s)
np.testing.assert_array_equal(ret, np.array(int8s))

for msgtype in ["TestUInt8FixedSizeArray16"]:
rostype = "rosbridge_test_msgs/" + msgtype

# From List[int]
int8s = list(range(0, 16))
ret = test_int8_msg(rostype, int8s)
self.assertEqual(ret, bytes(bytearray(int8s)))

str_int8s = bytes(bytearray(int8s))
np.testing.assert_array_equal(ret, np.array(int8s))

b64str_int8s = standard_b64encode(str_int8s).decode("ascii")
# From base64 string
b64str_int8s = standard_b64encode(bytes(int8s)).decode("ascii")
ret = test_int8_msg(rostype, b64str_int8s)
self.assertEqual(ret, str_int8s)


PKG = "rosbridge_library"
NAME = "test_message_conversion"
if __name__ == "__main__":
rostest.unitrun(PKG, NAME, TestMessageConversion)
np.testing.assert_array_equal(ret, np.array(int8s))

0 comments on commit cf57ed9

Please sign in to comment.