From 35d5993596e2b7063f8cd0a4e564c99b80ff59ec Mon Sep 17 00:00:00 2001 From: Jeremy Kerr Date: Thu, 4 Dec 2025 14:45:05 +0800 Subject: [PATCH] tests: use a fixture for setting up a routed endpoint We have a few cases of boilerplate in the test code, where we're setting up an endpoint to communicate with mctpd. Use a fixure - routed_ep - for this pattern, and replace the boilerplate. Signed-off-by: Jeremy Kerr --- tests/conftest.py | 17 +++++++++++ tests/test_mctpd.py | 73 +++++++++------------------------------------ 2 files changed, 31 insertions(+), 59 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 7fca8e8..1dc86f6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -43,3 +43,20 @@ def autojump_clock(): Custom autojump clock with a reasonable threshold for non-time I/O waits """ return trio.testing.MockClock(autojump_threshold=0.01) + +@pytest.fixture +async def routed_ep(mctpd): + """ + Return a routable endpoint (using the default sysnet ep that already + exists), with routing established, but no mctpd registration. + """ + ep = mctpd.network.endpoints[0] + iface = mctpd.system.interfaces[0] + + ep.eid = 9 + route = mctpd.system.Route(ep.eid, 1, iface = iface) + neigh = mctpd.system.Neighbour(iface, ep.lladdr, ep.eid) + await mctpd.system.add_route(route) + await mctpd.system.add_neighbour(neigh) + + return ep diff --git a/tests/test_mctpd.py b/tests/test_mctpd.py index a30ec57..04d19b8 100644 --- a/tests/test_mctpd.py +++ b/tests/test_mctpd.py @@ -406,19 +406,13 @@ async def test_assign_endpoint_static_varies(dbus, mctpd): """ Test that the mctpd control protocol responder support has support for a basic Get Endpoint ID command""" -async def test_get_endpoint_id(dbus, mctpd): +async def test_get_endpoint_id(dbus, mctpd, routed_ep): + ep = routed_ep iface = mctpd.system.interfaces[0] - dev = mctpd.network.endpoints[0] mctp = await mctpd_mctp_iface_obj(dbus, iface) - dev.eid = 12 - - await mctpd.system.add_route(mctpd.system.Route(dev.eid, 0, iface = iface)) - await mctpd.system.add_neighbour( - mctpd.system.Neighbour(iface, dev.lladdr, dev.eid) - ) cmd = MCTPControlCommand(True, 0, 0x02) - rsp = await dev.send_control(mctpd.network.mctp_socket, cmd) + rsp = await ep.send_control(mctpd.network.mctp_socket, cmd) # command code assert rsp[1] == 0x02 @@ -1263,14 +1257,8 @@ async def test_bridge_pool_range_limited(dbus, sysnet, nursery): res = await mctpd.stop_mctpd() assert res == 0 -async def test_get_message_types(dbus, mctpd): - ep = mctpd.network.endpoints[0] - ep.eid = 12 - iface = mctpd.system.interfaces[0] - await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface = iface)) - await mctpd.system.add_neighbour( - mctpd.system.Neighbour(iface, ep.lladdr, ep.eid) - ) +async def test_get_message_types(dbus, mctpd, routed_ep): + ep = routed_ep # Check default response when no responder registered cmd = MCTPControlCommand(True, 0, 0x05, bytes([0x00])) @@ -1292,14 +1280,8 @@ async def test_get_message_types(dbus, mctpd): assert rsp.hex(' ') == '00 04 00 01 f4 f3 f2 f1' """ Test RegisterVDMTypeSupport when no responders are registered """ -async def test_register_vdm_type_support_empty(mctpd): - ep = mctpd.network.endpoints[0] - ep.eid = 12 - iface = mctpd.system.interfaces[0] - await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface)) - await mctpd.system.add_neighbour( - mctpd.system.Neighbour(iface, ep.lladdr, ep.eid) - ) +async def test_register_vdm_type_support_empty(mctpd, routed_ep): + ep = routed_ep # Verify error response when no VDM is registered cmd = MCTPControlCommand(True, 0, 0x06, bytes([0x00])) @@ -1307,15 +1289,8 @@ async def test_register_vdm_type_support_empty(mctpd): assert rsp.hex(' ') == '00 06 02' """ Test RegisterVDMTypeSupport when a single PCIe VDM is registered """ -async def test_register_vdm_type_support_pcie_only(dbus, mctpd): - ep = mctpd.network.endpoints[0] - ep.eid = 12 - iface = mctpd.system.interfaces[0] - await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface)) - await mctpd.system.add_neighbour( - mctpd.system.Neighbour(iface, ep.lladdr, ep.eid) - ) - +async def test_register_vdm_type_support_pcie_only(dbus, mctpd, routed_ep): + ep = routed_ep mctp = await mctpd_mctp_base_iface_obj(dbus) # Register PCIe VDM: format=0x00, VID=0xABCD, command_set=0x0001 @@ -1333,15 +1308,8 @@ async def test_register_vdm_type_support_pcie_only(dbus, mctpd): assert rsp.hex(' ') == '00 06 02' """ Test RegisterVDMTypeSupport when a single IANA VDM is registered """ -async def test_register_vdm_type_support_iana_only(dbus, mctpd): - ep = mctpd.network.endpoints[0] - ep.eid = 12 - iface = mctpd.system.interfaces[0] - await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface)) - await mctpd.system.add_neighbour( - mctpd.system.Neighbour(iface, ep.lladdr, ep.eid) - ) - +async def test_register_vdm_type_support_iana_only(dbus, mctpd, routed_ep): + ep = routed_ep mctp = await mctpd_mctp_base_iface_obj(dbus) # Register IANA VDM: format=0x01, VID=0x1234ABCD, command_set=0x5678 @@ -1354,14 +1322,8 @@ async def test_register_vdm_type_support_iana_only(dbus, mctpd): assert rsp.hex(' ') == '00 06 00 ff 01 12 34 ab cd 56 78' """ Test RegisterVDMTypeSupport with dbus disconnect """ -async def test_register_vdm_type_support_dbus_disconnect(mctpd): - ep = mctpd.network.endpoints[0] - ep.eid = 12 - iface = mctpd.system.interfaces[0] - await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface)) - await mctpd.system.add_neighbour( - mctpd.system.Neighbour(iface, ep.lladdr, ep.eid) - ) +async def test_register_vdm_type_support_dbus_disconnect(mctpd, routed_ep): + ep = routed_ep # Verify error response when no VDM is registered cmd = MCTPControlCommand(True, 0, 0x06, bytes([0x00])) @@ -1390,15 +1352,8 @@ async def test_register_vdm_type_support_dbus_disconnect(mctpd): """ Test RegisterVDMTypeSupport error handling """ async def test_register_vdm_type_support_errors(dbus, mctpd): - ep = mctpd.network.endpoints[0] - ep.eid = 12 - iface = mctpd.system.interfaces[0] - await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface)) - await mctpd.system.add_neighbour( - mctpd.system.Neighbour(iface, ep.lladdr, ep.eid) - ) - mctp = await mctpd_mctp_base_iface_obj(dbus) + # Verify DBus call fails with invalid format 0x02 v_type = asyncdbus.Variant('q', 0xABCD) with pytest.raises(asyncdbus.errors.DBusError) as ex: