diff --git a/messaging/tests/test_messaging.py b/messaging/tests/test_messaging.py new file mode 100755 index 00000000000000..89176fd2cbe4f9 --- /dev/null +++ b/messaging/tests/test_messaging.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +import os +import time +import random +import unittest + +from cereal import log +import cereal.messaging as messaging +from cereal.services import service_list + +events = [evt for evt in log.Event.schema.union_fields if evt in service_list.keys()] + +def random_sock(): + return random.choice(events) + +def random_socks(num_socks=10): + return list(set([random_sock() for _ in range(num_socks)])) + +def random_bytes(length=1000): + return bytes([random.randrange(0xFF) for _ in range(length)]) + +def zmq_sleep(t=1): + if "ZMQ" in os.environ: + time.sleep(t) + +# TODO: test both msgq and zmq + +class TestPubSubSockets(unittest.TestCase): + + def setUp(self): + # ZMQ pub socket takes too long to die + # sleep to prevent multiple publishers error between tests + zmq_sleep() + + def test_pub_sub(self): + sock = random_sock() + pub_sock = messaging.pub_sock(sock) + sub_sock = messaging.sub_sock(sock, conflate=False, timeout=None) + zmq_sleep(3) + + for _ in range(1000): + msg = random_bytes() + pub_sock.send(msg) + recvd = sub_sock.receive() + self.assertEqual(msg, recvd) + + def test_conflate(self): + sock = random_sock() + pub_sock = messaging.pub_sock(sock) + for conflate in [True, False]: + for _ in range(10): + num_msgs = random.randint(3, 10) + sub_sock = messaging.sub_sock(sock, conflate=conflate, timeout=None) + zmq_sleep() + + sent_msgs = [] + for __ in range(num_msgs): + msg = random_bytes() + pub_sock.send(msg) + sent_msgs.append(msg) + time.sleep(0.1) + recvd_msgs = messaging.drain_sock_raw(sub_sock) + if conflate: + self.assertEqual(len(recvd_msgs), 1) + else: + # TODO: compare actual data + self.assertEqual(len(recvd_msgs), len(sent_msgs)) + + def test_receive_timeout(self): + sock = random_sock() + for _ in range(10): + timeout = random.randrange(200) + sub_sock = messaging.sub_sock(sock, timeout=timeout) + zmq_sleep() + + start_time = time.monotonic() + recvd = sub_sock.receive() + self.assertLess(time.monotonic() - start_time, 0.2) + assert recvd is None + +if __name__ == "__main__": + unittest.main() diff --git a/messaging/tests/test_pub_sub_master.py b/messaging/tests/test_pub_sub_master.py new file mode 100755 index 00000000000000..4cd3db5fd51c4f --- /dev/null +++ b/messaging/tests/test_pub_sub_master.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +import numbers +import random +import time +import unittest + +from cereal import car +import cereal.messaging as messaging +from cereal.messaging.tests.test_messaging import events, random_sock, random_socks, \ + random_bytes, zmq_sleep + + +# TODO: this should take any capnp struct and returrn a msg with random populated data +def random_carstate(): + fields = ["vEgo", "aEgo", "gas", "steeringAngle"] + msg = messaging.new_message("carState") + cs = msg.carState + for f in fields: + setattr(cs, f, random.random() * 10) + return msg + +# TODO: this should compare any capnp structs +def assert_carstate(cs1, cs2): + for f in car.CarState.schema.non_union_fields: + # TODO: check all types + val1, val2 = getattr(cs1, f), getattr(cs2, f) + if isinstance(val1, numbers.Number): + assert val1 == val2, f"{f}: sent '{val1}' vs recvd '{val2}'" + + +class TestSubMaster(unittest.TestCase): + + def setUp(self): + # ZMQ pub socket takes too long to die + # sleep to prevent multiple publishers error between tests + zmq_sleep(3) + + def test_init(self): + sm = messaging.SubMaster(events) + for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive, + sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]: + self.assertEqual(len(p), len(events)) + + def test_init_state(self): + socks = random_socks() + sm = messaging.SubMaster(socks) + self.assertEqual(sm.frame, -1) + self.assertFalse(any(sm.updated.values())) + self.assertFalse(any(sm.alive.values())) + self.assertTrue(all(t == 0. for t in sm.rcv_time.values())) + self.assertTrue(all(f == 0 for f in sm.rcv_frame.values())) + self.assertTrue(all(t == 0 for t in sm.logMonoTime.values())) + + for p in [sm.updated, sm.rcv_time, sm.rcv_frame, sm.alive, + sm.sock, sm.freq, sm.data, sm.logMonoTime, sm.valid]: + self.assertEqual(len(p), len(socks)) + + def test_getitem(self): + sock = "carState" + pub_sock = messaging.pub_sock(sock) + sm = messaging.SubMaster([sock,]) + zmq_sleep() + + msg = random_carstate() + pub_sock.send(msg.to_bytes()) + sm.update(1000) + assert_carstate(msg.carState, sm[sock]) + + # TODO: break this test up to individually test SubMaster.update and SubMaster.update_msgs + def test_update(self): + sock = "carState" + pub_sock = messaging.pub_sock(sock) + sm = messaging.SubMaster([sock,]) + zmq_sleep() + + for i in range(10): + msg = messaging.new_message(sock) + pub_sock.send(msg.to_bytes()) + sm.update(1000) + self.assertEqual(sm.frame, i) + self.assertTrue(all(sm.updated.values())) + + def test_update_timeout(self): + sock = random_sock() + sm = messaging.SubMaster([sock,]) + for _ in range(5): + timeout = random.randrange(1000, 5000) + start_time = time.monotonic() + sm.update(timeout) + t = time.monotonic() - start_time + self.assertGreaterEqual(t, timeout/1000.) + self.assertLess(t, 5) + self.assertFalse(any(sm.updated.values())) + + def test_alive(self): + pass + + def test_ignore_alive(self): + pass + + def test_valid(self): + pass + + # SubMaster should always conflate + def test_conflate(self): + sock = "carState" + pub_sock = messaging.pub_sock(sock) + sm = messaging.SubMaster([sock,]) + + n = 10 + for i in range(n+1): + msg = messaging.new_message(sock) + msg.carState.vEgo = i + pub_sock.send(msg.to_bytes()) + time.sleep(0.01) + sm.update(1000) + self.assertEqual(sm[sock].vEgo, n) + + +class TestPubMaster(unittest.TestCase): + + def setUp(self): + # ZMQ pub socket takes too long to die + # sleep to prevent multiple publishers error between tests + zmq_sleep(3) + + def test_init(self): + messaging.PubMaster(events) + + def test_send(self): + socks = random_socks() + pm = messaging.PubMaster(socks) + sub_socks = {s: messaging.sub_sock(s, conflate=True, timeout=1000) for s in socks} + zmq_sleep() + + # PubMaster accepts either a capnp msg builder or bytes + for capnp in [True, False]: + for i in range(100): + sock = socks[i % len(socks)] + + if capnp: + try: + msg = messaging.new_message(sock) + except Exception: + msg = messaging.new_message(sock, random.randrange(50)) + else: + msg = random_bytes() + + pm.send(sock, msg) + recvd = sub_socks[sock].receive() + + if capnp: + msg.clear_write_flag() + msg = msg.to_bytes() + self.assertEqual(msg, recvd, i) + + +if __name__ == "__main__": + unittest.main()