Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alter configuration to execute thread pinning when desired #20

Merged
merged 1 commit into from Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 43 additions & 0 deletions python/fddaqconf/thread_pinning.py
@@ -0,0 +1,43 @@

def add_thread_pinning_to_boot(system_data, thread_pinning_file, path):
after = thread_pinning_file['after']
file = thread_pinning_file['file']
from pathlib import Path
from os.path import expandvars
resolved_thread_pinning_file = Path(expandvars(file)).expanduser()

if not resolved_thread_pinning_file.is_absolute():
resolved_thread_pinning_file = path / resolved_thread_pinning_file

if not resolved_thread_pinning_file.exists():
raise RuntimeError(f'Cannot find the file {file} ({resolved_thread_pinning_file})')

if not system_data['boot'].get('scripts'):
system_data['boot']['scripts'] = {}
key = "thread_pinning_0"
else:
numbers = [0]
for script in system_data['boot']['scripts'].keys():
if not script.startswith('thread_pinning_'):
continue
numbers += [int(script.split('_')[-1])]

numbers.sort()
number = numbers[-1]+1
key = f"thread_pinning_{number}"

for data in system_data['boot']['scripts'].values():
if data.get('after', '') == after:
raise RuntimeError(f'Already specified a pinning script for after \'{after}\'')

system_data['boot']['scripts'][key] = {
"after": after,
"cmd": [
"readout-affinity.py --pinfile ${DUNEDAQ_THREAD_PIN_FILE}"
],
"env": {
"DUNEDAQ_THREAD_PIN_FILE": resolved_thread_pinning_file.resolve().as_posix(),
"LD_LIBRARY_PATH": "getenv",
"PATH": "getenv"
}
}
15 changes: 11 additions & 4 deletions schema/fddaqconf/readoutgen.jsonnet
Expand Up @@ -31,7 +31,7 @@ local cs = {
], doc="Exception to the default NUMA ID for FELIX cards"),

numa_exceptions: s.sequence( "NUMAExceptions", self.numa_exception, doc="Exceptions to the default NUMA ID"),

numa_config: s.record("numa_config", [
s.field( "default_id", types.count, default=0, doc="Default NUMA ID for FELIX cards"),
s.field( "default_latency_numa_aware", types.flag, default=false, doc="Default for Latency Buffer NUMA awareness"),
Expand All @@ -51,14 +51,21 @@ local cs = {
s.field( "exceptions", self.dpdk_lcore_exceptions, default=[], doc="Exceptions to the default NUMA ID"),
]),


thread_pinning_file: s.record("ThreadPinningFile", [
s.field( "after", types.string, default="", doc="When to execute the thread pinning script with this file, for example specifying boot will execute the threadpinning after boot"),
s.field( "file", types.path, default="", doc="A thread pinning configuration file"),
]),
thread_pinning_files: s.sequence( "ThreadPinningFiles", self.thread_pinning_file, doc="A list of thread pinning files"),

readout: s.record("readout", [
s.field( "detector_readout_map_file", types.path, default='./DetectorReadoutMap.json', doc="File containing detector hardware map for configuration to run"),
s.field( "use_fake_data_producers", types.flag, default=false, doc="Use fake data producers that respond with empty fragments immediately instead of (fake) cards and DLHs"),
// s.field( "memory_limit_gb", types.count, default=64, doc="Application memory limit in GB")
// Fake cards
s.field( "use_fake_cards", types.flag, default=false, doc="Use fake cards"),
s.field( "generate_periodic_adc_pattern", types.flag, default=false, doc="Generate a periodic ADC pattern inside the input data. Only when FakeCard reader is used"),
s.field( "emulated_TP_rate_per_ch", types.float4, default=1.0, doc="Rate of TPs per channel when using a periodic ADC pattern generation. Values expresses as multiples of the expected rate of 100 Hz/ch"),
s.field( "generate_periodic_adc_pattern", types.flag, default=false, doc="Generate a periodic ADC pattern inside the input data. Only when FakeCard reader is used"),
s.field( "emulated_TP_rate_per_ch", types.float4, default=1.0, doc="Rate of TPs per channel when using a periodic ADC pattern generation. Values expresses as multiples of the expected rate of 100 Hz/ch"),
s.field( "emulated_data_times_start_with_now", types.flag, default=false, doc="If active, the timestamp of the first emulated data frame is set to the current wallclock time"),
s.field( "default_data_file", types.path, default='asset://?label=ProtoWIB&subsystem=readout', doc="File containing data frames to be replayed by the fake cards. Former -d. Uses the asset manager, can also be 'asset://?checksum=somelonghash', or 'file://somewhere/frames.bin' or 'frames.bin'"),
s.field( "data_files", self.data_files, default=[], doc="Files to use by detector type"),
Expand All @@ -71,7 +78,7 @@ local cs = {
s.field( "numa_config", self.numa_config, default=self.numa_config, doc='Configuration of FELIX NUMA IDs'),
// DLH
s.field( "emulator_mode", types.flag, default=false, doc="If active, timestamps of data frames are overwritten when processed by the readout. This is necessary if the felix card does not set correct timestamps. Former -e"),
s.field( "thread_pinning_file", types.path, default="", doc="A thread pinning configuration file that gets executed after conf."),
s.field( "thread_pinning_files", self.thread_pinning_files, default=[], doc="A list of thread pinning configuration files that gets executed when specified."),
s.field( "source_queue_timeout_ms", types.count, default=0, doc="The source queue timeout that will be used in the datalink handle when polling source queues"),
s.field( "source_queue_sleep_us", types.count, default=500, doc="The source queue seep that will be used in the datalink handle when polling source queues."),

Expand Down
35 changes: 9 additions & 26 deletions scripts/fddaqconf_gen
Expand Up @@ -28,7 +28,7 @@ import daqconf.detreadoutmap as dromap


def expand_conf(config_data, debug=False):
"""Expands the moo configuration record into sub-records,
"""Expands the moo configuration record into sub-records,
re-casting its members into the corresponding moo objects.

Args:
Expand Down Expand Up @@ -192,7 +192,7 @@ def cli(
# console.log("Commandline parsing completed")
if check_args_and_exit:
return

output_dir = Path(json_dir)
if output_dir.exists():
if dry_run:
Expand Down Expand Up @@ -240,7 +240,7 @@ def cli(
boot.base_command_port = base_command_port
console.log(f"boot.base_command_port set to {boot.base_command_port}")


if detector_readout_map_file is not None:
readout.detector_readout_map_file = detector_readout_map_file
console.log(f"readout.detector_readout_map_file set to {readout.detector_readout_map_file}")
Expand All @@ -259,7 +259,7 @@ def cli(
console.log("Loading dataflow config generator")
from daqconf.apps.dataflow_gen import get_dataflow_app
console.log("Loading readout config generator")
from fddaqconf.apps.readout_gen import FDReadoutAppGenerator
from fddaqconf.apps.readout_gen import FDReadoutAppGenerator
console.log("Loading trigger config generator")
from daqconf.apps.trigger_gen import get_trigger_app
console.log("Loading DFO config generator")
Expand Down Expand Up @@ -464,7 +464,7 @@ def cli(
#--------------------------------------------------------------------------
if readout.use_fake_data_producers == False:
the_system.apps[ru_name] = roapp_gen.generate(
RU_DESCRIPTOR=ru_desc,
RU_DESCRIPTOR=ru_desc,
SOURCEID_BROKER=sourceid_broker,
data_file_map=data_file_map,
data_timeout_requests=readout_data_request_timeout
Expand Down Expand Up @@ -621,27 +621,10 @@ def cli(
control_to_data_network=CDN,
)

from fddaqconf.thread_pinning import add_thread_pinning_to_boot

if readout.thread_pinning_file != "":
resolved_thread_pinning_file = Path(os.path.expandvars(readout.thread_pinning_file)).expanduser()
if not resolved_thread_pinning_file.is_absolute():
resolved_thread_pinning_file = config_file.parent / resolved_thread_pinning_file

if not resolved_thread_pinning_file.exists():
raise RuntimeError(f'Cannot find the file {readout.thread_pinning_file} ({resolved_thread_pinning_file})')

system_command_datas['boot']['scripts'] = {
"thread_pinning": {
"cmd": [
"readout-affinity.py --pinfile ${DUNEDAQ_THREAD_PIN_FILE}"
],
"env": {
"DUNEDAQ_THREAD_PIN_FILE": resolved_thread_pinning_file.resolve().as_posix(),
"LD_LIBRARY_PATH": "getenv",
"PATH": "getenv"
}
}
}
for tpf in readout.thread_pinning_files:
add_thread_pinning_to_boot(system_command_datas, tpf, config_file.parent)


if not dry_run:
Expand Down Expand Up @@ -682,4 +665,4 @@ if __name__ == '__main__':
console.print_exception()
raise SystemExit(-1)
# console.log("daqconf - finished")