Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,20 @@ def autojump_clock():
Custom autojump clock with a reasonable threshold for non-time I/O waits
"""
return trio.testing.MockClock(autojump_threshold=0.01)

@pytest.fixture
async def routed_ep(mctpd):
"""
Return a routable endpoint (using the default sysnet ep that already
exists), with routing established, but no mctpd registration.
"""
ep = mctpd.network.endpoints[0]
iface = mctpd.system.interfaces[0]

ep.eid = 9
route = mctpd.system.Route(ep.eid, 1, iface = iface)
neigh = mctpd.system.Neighbour(iface, ep.lladdr, ep.eid)
await mctpd.system.add_route(route)
await mctpd.system.add_neighbour(neigh)

return ep
73 changes: 14 additions & 59 deletions tests/test_mctpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,19 +406,13 @@ async def test_assign_endpoint_static_varies(dbus, mctpd):

""" Test that the mctpd control protocol responder support has support
for a basic Get Endpoint ID command"""
async def test_get_endpoint_id(dbus, mctpd):
async def test_get_endpoint_id(dbus, mctpd, routed_ep):
ep = routed_ep
iface = mctpd.system.interfaces[0]
dev = mctpd.network.endpoints[0]
mctp = await mctpd_mctp_iface_obj(dbus, iface)
dev.eid = 12

await mctpd.system.add_route(mctpd.system.Route(dev.eid, 0, iface = iface))
await mctpd.system.add_neighbour(
mctpd.system.Neighbour(iface, dev.lladdr, dev.eid)
)

cmd = MCTPControlCommand(True, 0, 0x02)
rsp = await dev.send_control(mctpd.network.mctp_socket, cmd)
rsp = await ep.send_control(mctpd.network.mctp_socket, cmd)

# command code
assert rsp[1] == 0x02
Expand Down Expand Up @@ -1263,14 +1257,8 @@ async def test_bridge_pool_range_limited(dbus, sysnet, nursery):
res = await mctpd.stop_mctpd()
assert res == 0

async def test_get_message_types(dbus, mctpd):
ep = mctpd.network.endpoints[0]
ep.eid = 12
iface = mctpd.system.interfaces[0]
await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface = iface))
await mctpd.system.add_neighbour(
mctpd.system.Neighbour(iface, ep.lladdr, ep.eid)
)
async def test_get_message_types(dbus, mctpd, routed_ep):
ep = routed_ep

# Check default response when no responder registered
cmd = MCTPControlCommand(True, 0, 0x05, bytes([0x00]))
Expand All @@ -1292,30 +1280,17 @@ async def test_get_message_types(dbus, mctpd):
assert rsp.hex(' ') == '00 04 00 01 f4 f3 f2 f1'

""" Test RegisterVDMTypeSupport when no responders are registered """
async def test_register_vdm_type_support_empty(mctpd):
ep = mctpd.network.endpoints[0]
ep.eid = 12
iface = mctpd.system.interfaces[0]
await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface))
await mctpd.system.add_neighbour(
mctpd.system.Neighbour(iface, ep.lladdr, ep.eid)
)
async def test_register_vdm_type_support_empty(mctpd, routed_ep):
ep = routed_ep

# Verify error response when no VDM is registered
cmd = MCTPControlCommand(True, 0, 0x06, bytes([0x00]))
rsp = await ep.send_control(mctpd.network.mctp_socket, cmd)
assert rsp.hex(' ') == '00 06 02'

""" Test RegisterVDMTypeSupport when a single PCIe VDM is registered """
async def test_register_vdm_type_support_pcie_only(dbus, mctpd):
ep = mctpd.network.endpoints[0]
ep.eid = 12
iface = mctpd.system.interfaces[0]
await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface))
await mctpd.system.add_neighbour(
mctpd.system.Neighbour(iface, ep.lladdr, ep.eid)
)

async def test_register_vdm_type_support_pcie_only(dbus, mctpd, routed_ep):
ep = routed_ep
mctp = await mctpd_mctp_base_iface_obj(dbus)

# Register PCIe VDM: format=0x00, VID=0xABCD, command_set=0x0001
Expand All @@ -1333,15 +1308,8 @@ async def test_register_vdm_type_support_pcie_only(dbus, mctpd):
assert rsp.hex(' ') == '00 06 02'

""" Test RegisterVDMTypeSupport when a single IANA VDM is registered """
async def test_register_vdm_type_support_iana_only(dbus, mctpd):
ep = mctpd.network.endpoints[0]
ep.eid = 12
iface = mctpd.system.interfaces[0]
await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface))
await mctpd.system.add_neighbour(
mctpd.system.Neighbour(iface, ep.lladdr, ep.eid)
)

async def test_register_vdm_type_support_iana_only(dbus, mctpd, routed_ep):
ep = routed_ep
mctp = await mctpd_mctp_base_iface_obj(dbus)

# Register IANA VDM: format=0x01, VID=0x1234ABCD, command_set=0x5678
Expand All @@ -1354,14 +1322,8 @@ async def test_register_vdm_type_support_iana_only(dbus, mctpd):
assert rsp.hex(' ') == '00 06 00 ff 01 12 34 ab cd 56 78'

""" Test RegisterVDMTypeSupport with dbus disconnect """
async def test_register_vdm_type_support_dbus_disconnect(mctpd):
ep = mctpd.network.endpoints[0]
ep.eid = 12
iface = mctpd.system.interfaces[0]
await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface))
await mctpd.system.add_neighbour(
mctpd.system.Neighbour(iface, ep.lladdr, ep.eid)
)
async def test_register_vdm_type_support_dbus_disconnect(mctpd, routed_ep):
ep = routed_ep

# Verify error response when no VDM is registered
cmd = MCTPControlCommand(True, 0, 0x06, bytes([0x00]))
Expand Down Expand Up @@ -1390,15 +1352,8 @@ async def test_register_vdm_type_support_dbus_disconnect(mctpd):

""" Test RegisterVDMTypeSupport error handling """
async def test_register_vdm_type_support_errors(dbus, mctpd):
ep = mctpd.network.endpoints[0]
ep.eid = 12
iface = mctpd.system.interfaces[0]
await mctpd.system.add_route(mctpd.system.Route(ep.eid, 1, iface=iface))
await mctpd.system.add_neighbour(
mctpd.system.Neighbour(iface, ep.lladdr, ep.eid)
)

mctp = await mctpd_mctp_base_iface_obj(dbus)

# Verify DBus call fails with invalid format 0x02
v_type = asyncdbus.Variant('q', 0xABCD)
with pytest.raises(asyncdbus.errors.DBusError) as ex:
Expand Down