Skip to content

Commit

Permalink
Merge pull request #20 from DUNE-DAQ/plasorak/pin-thread-sequence
Browse files Browse the repository at this point in the history
Alter configuration to execute thread pinning when desired
  • Loading branch information
plasorak committed Feb 21, 2024
2 parents fc374ec + 5b57eff commit 8d14945
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 30 deletions.
43 changes: 43 additions & 0 deletions python/fddaqconf/thread_pinning.py
@@ -0,0 +1,43 @@

def add_thread_pinning_to_boot(system_data, thread_pinning_file, path):
after = thread_pinning_file['after']
file = thread_pinning_file['file']
from pathlib import Path
from os.path import expandvars
resolved_thread_pinning_file = Path(expandvars(file)).expanduser()

if not resolved_thread_pinning_file.is_absolute():
resolved_thread_pinning_file = path / resolved_thread_pinning_file

if not resolved_thread_pinning_file.exists():
raise RuntimeError(f'Cannot find the file {file} ({resolved_thread_pinning_file})')

if not system_data['boot'].get('scripts'):
system_data['boot']['scripts'] = {}
key = "thread_pinning_0"
else:
numbers = [0]
for script in system_data['boot']['scripts'].keys():
if not script.startswith('thread_pinning_'):
continue
numbers += [int(script.split('_')[-1])]

numbers.sort()
number = numbers[-1]+1
key = f"thread_pinning_{number}"

for data in system_data['boot']['scripts'].values():
if data.get('after', '') == after:
raise RuntimeError(f'Already specified a pinning script for after \'{after}\'')

system_data['boot']['scripts'][key] = {
"after": after,
"cmd": [
"readout-affinity.py --pinfile ${DUNEDAQ_THREAD_PIN_FILE}"
],
"env": {
"DUNEDAQ_THREAD_PIN_FILE": resolved_thread_pinning_file.resolve().as_posix(),
"LD_LIBRARY_PATH": "getenv",
"PATH": "getenv"
}
}
15 changes: 11 additions & 4 deletions schema/fddaqconf/readoutgen.jsonnet
Expand Up @@ -31,7 +31,7 @@ local cs = {
], doc="Exception to the default NUMA ID for FELIX cards"),

numa_exceptions: s.sequence( "NUMAExceptions", self.numa_exception, doc="Exceptions to the default NUMA ID"),

numa_config: s.record("numa_config", [
s.field( "default_id", types.count, default=0, doc="Default NUMA ID for FELIX cards"),
s.field( "default_latency_numa_aware", types.flag, default=false, doc="Default for Latency Buffer NUMA awareness"),
Expand All @@ -51,14 +51,21 @@ local cs = {
s.field( "exceptions", self.dpdk_lcore_exceptions, default=[], doc="Exceptions to the default NUMA ID"),
]),


thread_pinning_file: s.record("ThreadPinningFile", [
s.field( "after", types.string, default="", doc="When to execute the thread pinning script with this file, for example specifying boot will execute the threadpinning after boot"),
s.field( "file", types.path, default="", doc="A thread pinning configuration file"),
]),
thread_pinning_files: s.sequence( "ThreadPinningFiles", self.thread_pinning_file, doc="A list of thread pinning files"),

readout: s.record("readout", [
s.field( "detector_readout_map_file", types.path, default='./DetectorReadoutMap.json', doc="File containing detector hardware map for configuration to run"),
s.field( "use_fake_data_producers", types.flag, default=false, doc="Use fake data producers that respond with empty fragments immediately instead of (fake) cards and DLHs"),
// s.field( "memory_limit_gb", types.count, default=64, doc="Application memory limit in GB")
// Fake cards
s.field( "use_fake_cards", types.flag, default=false, doc="Use fake cards"),
s.field( "generate_periodic_adc_pattern", types.flag, default=false, doc="Generate a periodic ADC pattern inside the input data. Only when FakeCard reader is used"),
s.field( "emulated_TP_rate_per_ch", types.float4, default=1.0, doc="Rate of TPs per channel when using a periodic ADC pattern generation. Values expresses as multiples of the expected rate of 100 Hz/ch"),
s.field( "generate_periodic_adc_pattern", types.flag, default=false, doc="Generate a periodic ADC pattern inside the input data. Only when FakeCard reader is used"),
s.field( "emulated_TP_rate_per_ch", types.float4, default=1.0, doc="Rate of TPs per channel when using a periodic ADC pattern generation. Values expresses as multiples of the expected rate of 100 Hz/ch"),
s.field( "emulated_data_times_start_with_now", types.flag, default=false, doc="If active, the timestamp of the first emulated data frame is set to the current wallclock time"),
s.field( "default_data_file", types.path, default='asset://?label=ProtoWIB&subsystem=readout', doc="File containing data frames to be replayed by the fake cards. Former -d. Uses the asset manager, can also be 'asset://?checksum=somelonghash', or 'file://somewhere/frames.bin' or 'frames.bin'"),
s.field( "data_files", self.data_files, default=[], doc="Files to use by detector type"),
Expand All @@ -71,7 +78,7 @@ local cs = {
s.field( "numa_config", self.numa_config, default=self.numa_config, doc='Configuration of FELIX NUMA IDs'),
// DLH
s.field( "emulator_mode", types.flag, default=false, doc="If active, timestamps of data frames are overwritten when processed by the readout. This is necessary if the felix card does not set correct timestamps. Former -e"),
s.field( "thread_pinning_file", types.path, default="", doc="A thread pinning configuration file that gets executed after conf."),
s.field( "thread_pinning_files", self.thread_pinning_files, default=[], doc="A list of thread pinning configuration files that gets executed when specified."),
s.field( "source_queue_timeout_ms", types.count, default=0, doc="The source queue timeout that will be used in the datalink handle when polling source queues"),
s.field( "source_queue_sleep_us", types.count, default=500, doc="The source queue seep that will be used in the datalink handle when polling source queues."),

Expand Down
35 changes: 9 additions & 26 deletions scripts/fddaqconf_gen
Expand Up @@ -28,7 +28,7 @@ import daqconf.detreadoutmap as dromap


def expand_conf(config_data, debug=False):
"""Expands the moo configuration record into sub-records,
"""Expands the moo configuration record into sub-records,
re-casting its members into the corresponding moo objects.
Args:
Expand Down Expand Up @@ -192,7 +192,7 @@ def cli(
# console.log("Commandline parsing completed")
if check_args_and_exit:
return

output_dir = Path(json_dir)
if output_dir.exists():
if dry_run:
Expand Down Expand Up @@ -240,7 +240,7 @@ def cli(
boot.base_command_port = base_command_port
console.log(f"boot.base_command_port set to {boot.base_command_port}")


if detector_readout_map_file is not None:
readout.detector_readout_map_file = detector_readout_map_file
console.log(f"readout.detector_readout_map_file set to {readout.detector_readout_map_file}")
Expand All @@ -259,7 +259,7 @@ def cli(
console.log("Loading dataflow config generator")
from daqconf.apps.dataflow_gen import get_dataflow_app
console.log("Loading readout config generator")
from fddaqconf.apps.readout_gen import FDReadoutAppGenerator
from fddaqconf.apps.readout_gen import FDReadoutAppGenerator
console.log("Loading trigger config generator")
from daqconf.apps.trigger_gen import get_trigger_app
console.log("Loading DFO config generator")
Expand Down Expand Up @@ -464,7 +464,7 @@ def cli(
#--------------------------------------------------------------------------
if readout.use_fake_data_producers == False:
the_system.apps[ru_name] = roapp_gen.generate(
RU_DESCRIPTOR=ru_desc,
RU_DESCRIPTOR=ru_desc,
SOURCEID_BROKER=sourceid_broker,
data_file_map=data_file_map,
data_timeout_requests=readout_data_request_timeout
Expand Down Expand Up @@ -621,27 +621,10 @@ def cli(
control_to_data_network=CDN,
)

from fddaqconf.thread_pinning import add_thread_pinning_to_boot

if readout.thread_pinning_file != "":
resolved_thread_pinning_file = Path(os.path.expandvars(readout.thread_pinning_file)).expanduser()
if not resolved_thread_pinning_file.is_absolute():
resolved_thread_pinning_file = config_file.parent / resolved_thread_pinning_file

if not resolved_thread_pinning_file.exists():
raise RuntimeError(f'Cannot find the file {readout.thread_pinning_file} ({resolved_thread_pinning_file})')

system_command_datas['boot']['scripts'] = {
"thread_pinning": {
"cmd": [
"readout-affinity.py --pinfile ${DUNEDAQ_THREAD_PIN_FILE}"
],
"env": {
"DUNEDAQ_THREAD_PIN_FILE": resolved_thread_pinning_file.resolve().as_posix(),
"LD_LIBRARY_PATH": "getenv",
"PATH": "getenv"
}
}
}
for tpf in readout.thread_pinning_files:
add_thread_pinning_to_boot(system_command_datas, tpf, config_file.parent)


if not dry_run:
Expand Down Expand Up @@ -682,4 +665,4 @@ if __name__ == '__main__':
console.print_exception()
raise SystemExit(-1)
# console.log("daqconf - finished")

0 comments on commit 8d14945

Please sign in to comment.