diff --git a/detd/common.py b/detd/common.py index d677154..9065ffc 100644 --- a/detd/common.py +++ b/detd/common.py @@ -181,12 +181,3 @@ def __init__(self, template, params): data = template.substitute(params).replace('\n', '') super().__init__(data) - - -class Options: - """Methods to assign parameters to passs into one convenient Object. - Used for: Passing parameters in Python script for manual customization of the qdisc. """ - def __init__(self): - - self.qdiscmap = "nomap" - diff --git a/detd/devices/device.py b/detd/devices/device.py index 8bd01d5..74656ba 100644 --- a/detd/devices/device.py +++ b/detd/devices/device.py @@ -22,12 +22,11 @@ from ..logger import get_logger +from ..scheduler import TxCapability +from ..scheduler import DataPath logger = get_logger(__name__) - - - def from_pci_id(pci_id): # Retrieve all the subclasses inheriting from class Device @@ -42,16 +41,6 @@ def from_pci_id(pci_id): raise NameError("Unrecognized PCI ID: {}".format(pci_id)) - - - -class Capability(enum.Enum): - Qbv = 0 - Qbu = 1 - LTC = 2 - - - class Device: """ @@ -71,7 +60,11 @@ def __init__(self, num_tx_queues, num_rx_queues): self.systeminfo = SystemInformation() self.systemconf = SystemConfigurator() - self.capabilities = [] + self.tx_capabilities = TxCapability.default() + self.data_path_capabilities = DataPath.default() + self.launch_time_control = False + self.TxSelectionOffload = False + self.preemption = False self.num_tx_queues = num_tx_queues self.num_rx_queues = num_rx_queues @@ -85,11 +78,11 @@ def __init__(self, num_tx_queues, num_rx_queues): self.features = {} - def setup(self, interface, mapping, scheduler, stream): + def setup(self, interface, mapping, scheduler, stream, hints): '''Performs the configuration of the talker stream provided. ''' - self.systemconf.setup(interface, mapping, scheduler, stream) + self.systemconf.setup(interface, mapping, scheduler, stream, hints) def get_rate(self, interface): @@ -127,5 +120,21 @@ def supports_schedule(self, schedule): raise NotImplementedError("The handler class for the device must implement this function") - def supports_qbv(self): - return Capability.Qbv in self.capabilities + def get_tx_capabilities(self): + return self.tx_capabilities + + def supports_ltc(self): + """Returns True if the device supports Launch Time Control (LTC), + False otherwise.""" + return self.launch_time_control + + def supports_tx_selection_offload(self): + return self.TxSelectionOffload + + def get_data_path(self): + '''Returns True if the device supports the specified DataPath capability, + False otherwise.''' + return self.data_path_capabilities + + def supports_preemption(self): + return self.preemption diff --git a/detd/devices/intel_i210.py b/detd/devices/intel_i210.py index 036cfc5..0f2d91c 100644 --- a/detd/devices/intel_i210.py +++ b/detd/devices/intel_i210.py @@ -13,9 +13,10 @@ from ..logger import get_logger -from .device import Capability +from ..scheduler import TxCapability from .device import Device +from ..scheduler import DataPath logger = get_logger(__name__) @@ -27,8 +28,6 @@ class IntelI210(Device): NUM_TX_QUEUES = 4 NUM_RX_QUEUES = 4 - - CAPABILITIES = [Capability.LTC] PCI_IDS_VALID = ['8086:1533', '8086:1536', '8086:1537', '8086:1538', '8086:157B', '8086:157C', '8086:15F6'] @@ -44,7 +43,11 @@ def __init__(self, pci_id): super().__init__(IntelI210.NUM_TX_QUEUES, IntelI210.NUM_RX_QUEUES) - self.capabilities = [Capability.LTC] + self.launch_time_control = True + self.data_path_capabilities = DataPath.AF_PACKET + self.tx_capabilities = TxCapability.Qbv + self.TxSelectionOffload = False + self.preemption = False self.features['rxvlan'] = 'off' diff --git a/detd/devices/intel_i225.py b/detd/devices/intel_i225.py index fdb1efd..5178fd5 100644 --- a/detd/devices/intel_i225.py +++ b/detd/devices/intel_i225.py @@ -12,9 +12,10 @@ from ..logger import get_logger -from .device import Capability +from ..scheduler import TxCapability from .device import Device +from ..scheduler import DataPath logger = get_logger(__name__) @@ -26,8 +27,6 @@ class IntelI225(Device): NUM_TX_QUEUES = 4 NUM_RX_QUEUES = 4 - CAPABILITIES = [Capability.Qbv] - # Devices supporting TSN: i225-LM, i225-IT PCI_IDS_VALID = ['8086:0D9F', '8086:15F2'] @@ -52,7 +51,11 @@ def __init__(self, pci_id): if pci_id in IntelI225.PCI_IDS_UNPROGRAMMED: raise "The flash image in this i225 device is empty, or the NVM configuration loading failed." - self.capabilities = [Capability.Qbv] + self.launch_time_control = False + self.data_path_capabilities = DataPath.AF_PACKET + self.tx_capabilities = TxCapability.Qbv + self.TxSelectionOffload = True + self.preemption = False self.features['rxvlan'] = 'off' #self.features['hw-tc-offload'] = 'on' diff --git a/detd/devices/intel_mgbeehl.py b/detd/devices/intel_mgbeehl.py index c0e6e57..a9d12a6 100644 --- a/detd/devices/intel_mgbeehl.py +++ b/detd/devices/intel_mgbeehl.py @@ -13,9 +13,10 @@ from ..logger import get_logger -from .device import Capability +from ..scheduler import TxCapability from .device import Device +from ..scheduler import DataPath logger = get_logger(__name__) @@ -57,7 +58,11 @@ def __init__(self, pci_id): super().__init__(IntelMgbeEhl.NUM_TX_QUEUES, IntelMgbeEhl.NUM_RX_QUEUES) - self.capabilities = [Capability.Qbv] + self.launch_time_control = True + self.data_path_capabilities = DataPath.AF_PACKET + self.tx_capabilities = TxCapability.Qbv + self.TxSelectionOffload = False + self.preemption = False self.features['rxvlan'] = 'off' self.features['hw-tc-offload'] = 'on' diff --git a/detd/ipc.proto b/detd/ipc.proto index a0e8cb4..881f944 100644 --- a/detd/ipc.proto +++ b/detd/ipc.proto @@ -12,7 +12,11 @@ message StreamQosRequest { uint32 txmax = 8; bool setup_socket = 9; uint32 basetime = 10; - string qdiscmap = 11; + uint32 tx_selection = 11; + bool tx_selection_offload = 12; + uint32 data_path = 13; + bool preemption = 14; + bool launch_time_control = 15; } diff --git a/detd/manager.py b/detd/manager.py index a4e7bd9..6448652 100644 --- a/detd/manager.py +++ b/detd/manager.py @@ -23,8 +23,13 @@ from .scheduler import Scheduler from .scheduler import Traffic from .scheduler import TrafficType +from .scheduler import Hints +from .scheduler import TxCapability +from .scheduler import DataPath + from .systemconf import SystemInformation -from .mapping import Mapping +from .mapping import MappingFixed +from .mapping import MappingFlexible from .common import Check from .devices import device @@ -62,8 +67,8 @@ def rate(self): return self.device.get_rate(self) - def setup(self, mapping, scheduler, stream): - self.device.setup(self, mapping, scheduler, stream) + def setup(self, mapping, scheduler, stream, hints): + self.device.setup(self, mapping, scheduler, stream, hints) @@ -86,7 +91,7 @@ def add_talker(self, config): with self.lock: if not config.interface.name in self.talker_manager: - interface_manager = InterfaceManager(config.interface, config.options) + interface_manager = InterfaceManager(config) self.talker_manager[config.interface.name] = interface_manager return self.talker_manager[config.interface.name].add_talker(config) @@ -96,20 +101,20 @@ def add_talker(self, config): class InterfaceManager(): - def __init__(self, interface, options): + def __init__(self, config): logger.info(f"Initializing {__class__.__name__}") - self.interface = interface - self.options = options - - if self.options.qdiscmap == "nomap": - self.mapping = Mapping(self.interface) + self.interface = config.interface + self.hints = self._get_device_hints(config) + + if self.hints.tx_selection == TxCapability.Qbv: + self.mapping = MappingFixed(self.interface) else: - self.mapping = Mapping(self.interface, self.options) + self.mapping = MappingFlexible(self.interface) + self.scheduler = Scheduler(self.mapping) - def add_talker(self, config): ''' Performs the local configuration for the configuration provided @@ -174,7 +179,7 @@ def add_talker(self, config): # Configure the system try: - self.interface.setup(self.mapping, self.scheduler, config.stream) + self.interface.setup(self.mapping, self.scheduler, config.stream, self.hints) except: # Leave the internal structures in a consistent state logger.error("Error applying the configuration on the system") @@ -213,3 +218,45 @@ def update_base_time(self, config): safety_margin = multiple * period config.stream.base_time = (now + ns_until_next_cycle) + safety_margin + + def _get_device_hints(self, config): + + # Default config based on the device capabilies + tx_selection = config.interface.device.get_tx_capabilities() + tx_selection_offload = config.interface.device.supports_tx_selection_offload() + data_path = config.interface.device.get_data_path() + preemption = config.interface.device.supports_preemption() + launch_time_control = config.interface.device.supports_ltc() + + # Check if the device supports the requested features + if config.hints.data_path is not None and DataPath(config.hints.data_path) != self.interface.device.get_data_path(): + raise ValueError(f"Device does not support the requested DataPath feature." + f"Requested: {DataPath(config.hints.data_path)}, Device Capabilities: {self.interface.device.get_data_path()}") + else: + data_path = DataPath(config.hints.data_path) + + if config.hints.tx_selection is not None and TxCapability(config.hints.tx_selection) != self.interface.device.get_tx_capabilities(): + raise ValueError(f"Device does not support the requested Tx selection feature." + f"Requested: {TxCapability(config.hints.tx_selection)}, Device Capabilities: {self.interface.device.get_tx_capabilities()}") + else: + tx_selection = TxCapability(config.hints.tx_selection) + + if config.hints.launch_time_control is not None and config.hints.launch_time_control != self.interface.device.supports_ltc(): + raise ValueError(f"Device does not support the requested launch_time_control feature." + f"Requested: {config.hints.launch_time_control}, Device Capabilities: {self.interface.device.supports_ltc()}") + else: + launch_time_control = config.hints.launch_time_control + + if config.hints.tx_selection_offload is not None and config.hints.tx_selection_offload != self.interface.device.supports_tx_selection_offload(): + raise ValueError(f"Device does not support the requested tx_selection_offload feature." + f"Requested: {config.hints.tx_selection_offload}, Device Capabilities: {self.interface.device.supports_tx_selection_offload()}") + else: + tx_selection_offload = config.hints.tx_selection_offload + + if config.hints.preemption is not None and config.hints.preemption != self.interface.device.supports_preemption(): + raise ValueError(f"Device does not support the requested preemption feature." + f"Requested: {config.hints.preemption}, Device Capabilities: {self.interface.device.supports_preemption()}") + else: + preemption = config.hints.preemption + + return Hints(tx_selection, tx_selection_offload, data_path, preemption, launch_time_control) diff --git a/detd/mapping.py b/detd/mapping.py index 12a87c3..e5f2d85 100644 --- a/detd/mapping.py +++ b/detd/mapping.py @@ -28,9 +28,7 @@ - - -class MappingNaive(): +class MappingFlexible(): """ A class mapping the hardware and system resources (socket priorities, @@ -209,7 +207,7 @@ def unmap_and_free_queue(self, tc): -class Mapping(): +class MappingFixed(): """ A class mapping the hardware and system resources (socket priorities, @@ -245,15 +243,13 @@ class Mapping(): """ - def __init__(self, interface, options = None): + def __init__(self, interface): logger.info(f"Initializing {__class__.__name__}") # FIXME: make the number of Tx queues a parameter, so things are not hardcoded self.interface = interface - self.options = options - # Socket priorities @@ -357,10 +353,8 @@ def soprio_to_tc(self): for tc, soprio in enumerate(self.tc_to_soprio): mapping[soprio] = tc - if self.options is None: - return mapping - else: - return [int(x) for x in self.options.qdiscmap.split()] + + return mapping def assign_and_map(self, pcp, traffics): diff --git a/detd/proxy.py b/detd/proxy.py index 10a8123..fea92a2 100644 --- a/detd/proxy.py +++ b/detd/proxy.py @@ -94,7 +94,11 @@ def send_qos_request(self, configuration, setup_socket): request.txmin = configuration.stream.txoffset request.txmax = configuration.stream.txoffset request.setup_socket = setup_socket - request.qdiscmap = configuration.options.qdiscmap + request.tx_selection = configuration.hints.tx_selection.value + request.tx_selection_offload = configuration.hints.tx_selection_offload + request.data_path = configuration.hints.data_path.value + request.preemption = configuration.hints.preemption + request.launch_time_control = configuration.hints.launch_time_control message = request.SerializeToString() self.send(message) diff --git a/detd/scheduler.py b/detd/scheduler.py index eb2ade7..e81c9cf 100644 --- a/detd/scheduler.py +++ b/detd/scheduler.py @@ -22,7 +22,6 @@ import math from .common import Check -from .common import Options from .logger import get_logger @@ -37,7 +36,7 @@ class Configuration: - def __init__(self, interface, stream, traffic, options = None): + def __init__(self, interface, stream, traffic, hints = None): if stream.txoffset > traffic.interval: raise TypeError("Invalid TxOffset, it exceeds Interval") @@ -45,10 +44,12 @@ def __init__(self, interface, stream, traffic, options = None): self.interface = interface self.stream = stream self.traffic = traffic - if options is None: - self.options = Options() + + # Initialize hints to a default value if None is passed + if hints is None: + self.hints = Hints(TxCapability.Qbv, False, DataPath.AF_PACKET, False, True) else: - self.options = options + self.hints = hints class StreamConfiguration: @@ -389,3 +390,59 @@ def reschedule(self): self.schedule.add_best_effort_padding(self.traffics[0]) # FIXME: error handling + +class TxCapability(enum.Enum): + none = 0 + Qbv = 1 + Qbu = 2 + + @staticmethod + def default(): + return TxCapability.none + +class DataPath(enum.Enum): + none = 0 + AF_PACKET = 1 + AF_XDP_ZC = 2 + + @staticmethod + def default(): + return DataPath.none + +class Hints: + """ + A configuration class for managing traffic specifications and QoS (Quality of Service) + settings for network devices. + + Attributes: + tx_selection (str): Determines the transmission selection mechanism to use. + Possible values are: + - 'ENHANCEMENTS_FOR_SCHEDULED_TRAFFIC' (802.1Qbv) + - 'STRICT_PRIORITY' + tx_selection_offload (bool): Indicates whether a hardware offload for the + tx_selection mechanism is used. True means hardware offload is enabled, + false implies a software-based approach. + data_path (str): Specifies the data path technology used. Current options include: + - 'AF_PACKET' + - 'AF_XDP_ZC' + Future expansions may include other data paths like 'DPDK'. + preemption (bool): Enables or disables preemption in the data transmission. + launch_time_control (bool): Enables or disables launch time control for packets. + + """ + def __init__(self, tx_selection: TxCapability, tx_selection_offload: bool, data_path: DataPath, preemption: bool, launch_time_control: bool): + + self.tx_selection = tx_selection + self.tx_selection_offload = tx_selection_offload + self.data_path = data_path + self.preemption = preemption + self.launch_time_control = launch_time_control + + def __repr__(self): + return (f"Hints(tx_selection={self.tx_selection.name}, " + f"tx_selection_offload={self.tx_selection_offload}, " + f"data_path={self.data_path.name}, " + f"preemption={self.preemption}, " + f"launch_time_control={self.launch_time_control})") + + diff --git a/detd/service.py b/detd/service.py index c19452e..fbb505f 100644 --- a/detd/service.py +++ b/detd/service.py @@ -39,6 +39,7 @@ from .scheduler import Configuration from .scheduler import StreamConfiguration from .scheduler import TrafficSpecification +from .scheduler import Hints from .systemconf import Check from .systemconf import QdiscConfigurator @@ -46,8 +47,6 @@ from .systemconf import SystemInformation from .systemconf import CommandIp -from .common import Options - from .logger import setup_root_logger from .logger import get_logger @@ -249,14 +248,19 @@ def _add_talker(self, request): interval = request.period size = request.size interface_name = request.interface + tx_selection = request.tx_selection + tx_selection_offload = request.tx_selection_offload + data_path = request.data_path + preemption = request.preemption + launch_time_control = request.launch_time_control + - options = Options() - options.qdiscmap = request.qdiscmap interface = Interface(interface_name) stream = StreamConfiguration(addr, vid, pcp, txoffset) traffic = TrafficSpecification(interval, size) + hints = Hints(tx_selection, tx_selection_offload, data_path, preemption, launch_time_control) - config = Configuration(interface, stream, traffic, options) + config = Configuration(interface, stream, traffic, hints) vlan_interface, soprio = self.server.manager.add_talker(config) diff --git a/detd/systemconf.py b/detd/systemconf.py index 8596b08..496f095 100644 --- a/detd/systemconf.py +++ b/detd/systemconf.py @@ -61,6 +61,7 @@ from .logger import get_logger +from .scheduler import TxCapability logger = get_logger(__name__) @@ -87,7 +88,7 @@ def __init__(self): self.already_configured_vids = [] - def args_valid(self, interface, mapping, scheduler, stream): + def args_valid(self, interface, mapping, scheduler, stream, hints): if not Check.is_interface(interface.name): return False @@ -131,11 +132,11 @@ def args_valid(self, interface, mapping, scheduler, stream): return True - def setup(self, interface, mapping, scheduler, stream): + def setup(self, interface, mapping, scheduler, stream, hints): logger.info("Setting up platform and devices") - if not self.args_valid(interface, mapping, scheduler, stream): + if not self.args_valid(interface, mapping, scheduler, stream, hints): raise TypeError try: @@ -147,7 +148,7 @@ def setup(self, interface, mapping, scheduler, stream): # FIXME: consider other exceptions, e.g. TypeError try: # FIXME add qdisc reset - self.qdisc.setup(interface, mapping, scheduler, stream.base_time) + self.qdisc.setup(interface, mapping, scheduler, stream.base_time, hints) except subprocess.CalledProcessError: raise @@ -218,10 +219,10 @@ def __init__(self): pass - def setup(self, interface, mapping, scheduler, base_time): + def setup(self, interface, mapping, scheduler, base_time, hints): tc = CommandTc() - if interface.device.supports_qbv(): + if hints.tx_selection == TxCapability.Qbv: tc.set_taprio_offload(interface, mapping, scheduler, base_time) else: tc.set_taprio_software(interface, mapping, scheduler, base_time) diff --git a/tests/test_mapping.py b/tests/test_mapping.py index b447cf2..5fd7009 100644 --- a/tests/test_mapping.py +++ b/tests/test_mapping.py @@ -7,8 +7,7 @@ import unittest from detd import Interface -from detd import Mapping -from detd import MappingNaive +from detd import MappingFlexible import os @@ -39,7 +38,7 @@ def test_mappingnaive_assignqueueandmap(self): interface = Interface(interface_name) tc = None - mapping = MappingNaive(interface) + mapping = MappingFlexible(interface) expected_tc_to_hwq_mapping = [ {"offset":0, "num_queues":8} ] self.assertEqual(mapping.tc_to_hwq, expected_tc_to_hwq_mapping) @@ -118,7 +117,7 @@ def test_mappingnaive_unmapandfreequeue(self): interface = Interface(interface_name) tc = None - mapping = MappingNaive(interface) + mapping = MappingFlexible(interface) mapping.assign_queue_and_map(tc) mapping.assign_queue_and_map(tc) mapping.assign_queue_and_map(tc) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index d807dfc..7e8da97 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -7,7 +7,7 @@ import os import unittest -from detd import Mapping +from detd import MappingFixed from detd import StreamConfiguration from detd import TrafficSpecification from detd import Interface @@ -55,7 +55,7 @@ def test_add_single_scheduled_traffic_start_0(self): with RunContext(TestMode.HOST): interface = Interface("eth0") - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 0 * us_to_ns @@ -73,7 +73,7 @@ def test_add_remove_single_scheduled_traffic_start_0(self): with RunContext(TestMode.HOST): interface = Interface("eth0") - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 0 * us_to_ns @@ -95,7 +95,7 @@ def test_add_single_scheduled_traffic_start_non_0(self): with RunContext(TestMode.HOST): interface = Interface("eth0") - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 250 * us_to_ns @@ -112,7 +112,7 @@ def test_add_remove_single_scheduled_traffic_start_non_0(self): with RunContext(TestMode.HOST): interface = Interface("eth0") - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 250 * us_to_ns @@ -133,7 +133,7 @@ def test_add_two_scheduled_traffics_same_interval(self): with RunContext(TestMode.HOST): interface = Interface("eth0") - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 250 * us_to_ns @@ -162,7 +162,7 @@ def test_add_two_scheduled_traffics_different_interval(self): with RunContext(TestMode.HOST): interface = Interface("eth0") - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 250 * us_to_ns @@ -200,7 +200,7 @@ def test_add_remove_last_to_first_two_scheduled_traffics_different_interval(self with RunContext(TestMode.HOST): interface = Interface("eth0") - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 250 * us_to_ns @@ -252,7 +252,7 @@ def test_add_remove_first_to_last_two_scheduled_traffics_different_interval(self with RunContext(TestMode.HOST): interface = Interface("eth0") - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 250 * us_to_ns @@ -308,7 +308,7 @@ def test_schedule_conflictswithtraffic_matchfull(self): with RunContext(self.mode): interface_name = "eth0" interface = Interface(interface_name) - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 100 * us_to_ns @@ -330,7 +330,7 @@ def test_schedule_conflictswithtraffic_nomatch(self): with RunContext(self.mode): interface_name = "eth0" interface = Interface(interface_name) - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 100 * us_to_ns @@ -351,7 +351,7 @@ def test_schedule_conflictswithtraffic_leftmatch(self): with RunContext(self.mode): interface_name = "eth0" interface = Interface(interface_name) - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 100 * us_to_ns @@ -372,7 +372,7 @@ def test_schedule_conflictswithtraffic_rightmatch(self): with RunContext(self.mode): interface_name = "eth0" interface = Interface(interface_name) - mapping = Mapping(interface) + mapping = MappingFixed(interface) scheduler = Scheduler(mapping) txoffset = 100 * us_to_ns