Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch/fddaq v4.4.x #364

Merged
merged 15 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.12)
project(dfmodules VERSION 2.14.2)
project(dfmodules VERSION 2.14.3)

find_package(daq-cmake REQUIRED)
daq_setup_environment()
Expand Down
11 changes: 11 additions & 0 deletions include/dfmodules/DataStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ ERS_DECLARE_ISSUE(dfmodules,
((std::string)mod_name)((std::string)description))
/// @endcond LCOV_EXCL_STOP

/**
* @brief An ERS Issue for DataStore problems in which it is
* reasonable to skip any warning or error message.
* @cond Doxygen doesn't like ERS macros LCOV_EXCL_START
*/
ERS_DECLARE_ISSUE(dfmodules,
IgnorableDataStoreProblem,
"Module " << mod_name << ": A problem was encountered when " << description,
((std::string)mod_name)((std::string)description))
/// @endcond LCOV_EXCL_STOP

/**
* @brief An ERS Issue for DataStore problems in which it is
* not clear whether retrying the operation might succeed or not.
Expand Down
12 changes: 6 additions & 6 deletions integtest/disabled_output_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@
triggeractivity_frag_params={"fragment_type_description": "Trigger Activity",
"fragment_type": "Trigger_Activity",
"hdf5_source_subsystem": "Trigger",
"expected_fragment_count": 1,
"expected_fragment_count": 3,
"min_size_bytes": 72, "max_size_bytes": 216}
triggertp_frag_params={"fragment_type_description": "Trigger with TPs",
"fragment_type": "Trigger_Primitive",
"hdf5_source_subsystem": "Trigger",
"expected_fragment_count": 2, # number of readout apps (1) times 2
"expected_fragment_count": 3, # number of readout apps (1) times 3, one per plane
"min_size_bytes": 72, "max_size_bytes": 16000}
hsi_frag_params ={"fragment_type_description": "HSI",
"fragment_type": "Hardware_Signal",
Expand Down Expand Up @@ -116,10 +116,10 @@

# The commands to run in nanorc, as a list
nanorc_command_list="integtest-partition boot conf".split()
nanorc_command_list+="start_run --disable-data-storage 101 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split()
nanorc_command_list+="start_run 102 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split()
nanorc_command_list+="start_run --disable-data-storage 103 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split()
nanorc_command_list+="start_run 104 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split()
nanorc_command_list+="start_run --disable-data-storage --wait 2 101 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split()
nanorc_command_list+="start_run --wait 2 102 wait ".split() + [str(run_duration)] + "stop_run --wait 2 wait 2".split()
nanorc_command_list+="start_run --disable-data-storage --wait 2 103 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split()
nanorc_command_list+="start_run --wait 2 104 wait ".split() + [str(run_duration)] + "disable_triggers wait 2 stop_run wait 2".split()
nanorc_command_list+="scrap terminate".split()

# The tests themselves
Expand Down
6 changes: 3 additions & 3 deletions integtest/multi_output_file_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
triggeractivity_frag_params={"fragment_type_description": "Trigger Activity",
"fragment_type": "Trigger_Activity",
"hdf5_source_subsystem": "Trigger",
"expected_fragment_count": number_of_readout_apps,
"expected_fragment_count": (3*number_of_readout_apps),
"min_size_bytes": 72, "max_size_bytes": 520}
triggertp_frag_params={"fragment_type_description": "Trigger with TPs",
"fragment_type": "Trigger_Primitive",
"hdf5_source_subsystem": "Trigger",
"expected_fragment_count": (2*number_of_readout_apps),
"expected_fragment_count": (3*number_of_readout_apps),
"min_size_bytes": 72, "max_size_bytes": 16000}
hsi_frag_params ={"fragment_type_description": "HSI",
"fragment_type": "Hardware_Signal",
Expand Down Expand Up @@ -132,7 +132,7 @@
}

# The commands to run in nanorc, as a list
nanorc_command_list="integtest-partition boot conf start_run 101 wait 180 disable_triggers wait 2 stop_run wait 21 start_run 102 wait 120 disable_triggers wait 2 stop_run wait 21 scrap terminate".split()
nanorc_command_list="integtest-partition boot conf start_run --wait 3 101 wait 180 disable_triggers wait 2 stop_run wait 21 start_run --wait 3 102 wait 120 disable_triggers wait 2 stop_run wait 21 scrap terminate".split()

# The tests themselves

Expand Down
80 changes: 51 additions & 29 deletions plugins/HDF5DataStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class HDF5DataStore : public DataStore

m_file_index = 0;
m_recorded_size = 0;
m_current_record_number = std::numeric_limits<size_t>::max();

if (m_operation_mode != "one-event-per-file"
//&& m_operation_mode != "one-fragment-per-file"
Expand Down Expand Up @@ -162,7 +163,7 @@ class HDF5DataStore : public DataStore
virtual void write(const daqdataformats::TriggerRecord& tr)
{

// check if there is sufficient space for this data block
// check if there is sufficient space for this record
size_t current_free_space = get_free_space(m_path);
size_t tr_size = tr.get_total_size_bytes();
if (current_free_space < (m_free_space_safety_factor_for_write * tr_size)) {
Expand All @@ -178,12 +179,19 @@ class HDF5DataStore : public DataStore
throw RetryableDataStoreProblem(ERS_HERE, get_name(), msg, issue);
}

// check if a new file should be opened for this data block
increment_file_index_if_needed(tr_size);
// check if a new file should be opened for this record
if (! increment_file_index_if_needed(tr_size)) {
if (m_config_params.mode == "one-event-per-file") {
if (m_current_record_number != std::numeric_limits<size_t>::max() &&
tr.get_header_ref().get_trigger_number() != m_current_record_number) {
++m_file_index;
}
}
}
m_current_record_number = tr.get_header_ref().get_trigger_number();

// determine the filename from Storage Key + configuration parameters
std::string full_filename =
get_file_name(tr.get_header_ref().get_trigger_number(), tr.get_header_ref().get_run_number());
std::string full_filename = get_file_name(tr.get_header_ref().get_run_number());

try {
open_file_if_needed(full_filename, HighFive::File::OpenOrCreate);
Expand All @@ -194,7 +202,7 @@ class HDF5DataStore : public DataStore
throw FileOperationProblem(ERS_HERE, get_name(), full_filename);
}

// write the data block
// write the record
m_file_handle->write(tr);
m_recorded_size = m_file_handle->get_recorded_size();
}
Expand All @@ -209,7 +217,7 @@ class HDF5DataStore : public DataStore
virtual void write(const daqdataformats::TimeSlice& ts)
{

// check if there is sufficient space for this data block
// check if there is sufficient space for this record
size_t current_free_space = get_free_space(m_path);
size_t ts_size = ts.get_total_size_bytes();
if (current_free_space < (m_free_space_safety_factor_for_write * ts_size)) {
Expand All @@ -225,11 +233,19 @@ class HDF5DataStore : public DataStore
throw RetryableDataStoreProblem(ERS_HERE, get_name(), msg, issue);
}

// check if a new file should be opened for this data block
increment_file_index_if_needed(ts_size);
// check if a new file should be opened for this record
if (! increment_file_index_if_needed(ts_size)) {
if (m_config_params.mode == "one-event-per-file") {
if (m_current_record_number != std::numeric_limits<size_t>::max() &&
ts.get_header().timeslice_number != m_current_record_number) {
++m_file_index;
}
}
}
m_current_record_number = ts.get_header().timeslice_number;

// determine the filename from Storage Key + configuration parameters
std::string full_filename = get_file_name(ts.get_header().timeslice_number, ts.get_header().run_number);
std::string full_filename = get_file_name(ts.get_header().run_number);

try {
open_file_if_needed(full_filename, HighFive::File::OpenOrCreate);
Expand All @@ -240,13 +256,18 @@ class HDF5DataStore : public DataStore
throw FileOperationProblem(ERS_HERE, get_name(), full_filename);
}

// write the data block
m_file_handle->write(ts);
m_recorded_size = m_file_handle->get_recorded_size();
// write the record
try {
m_file_handle->write(ts);
m_recorded_size = m_file_handle->get_recorded_size();
} catch (hdf5libs::TimeSliceAlreadyExists const& excpt) {
std::string msg = "writing a time slice to file " + m_file_handle->get_file_name();
throw IgnorableDataStoreProblem(ERS_HERE, get_name(), msg, excpt);
}
}

/**
* @brief Informs the HDF5DataStore that writes or reads of data blocks
* @brief Informs the HDF5DataStore that writes or reads of records
* associated with the specified run number will soon be requested.
* This allows the DataStore to test that the output file path is valid
* and any other checks that are useful in advance of the first data
Expand Down Expand Up @@ -280,10 +301,11 @@ class HDF5DataStore : public DataStore

m_file_index = 0;
m_recorded_size = 0;
m_current_record_number = std::numeric_limits<size_t>::max();
}

/**
* @brief Informs the HD5DataStore that writes or reads of data blocks
* @brief Informs the HD5DataStore that writes or reads of records
* associated with the specified run number have finished, for now.
* This allows the DataStore to close open files and do any other
* cleanup or shutdown operations that are useful once the writes or
Expand Down Expand Up @@ -324,8 +346,13 @@ class HDF5DataStore : public DataStore
// Total number of generated files
size_t m_file_index;

// Total size of data being written
size_t m_recorded_size;
// Size of data being written
size_t m_recorded_size; // per file

// Record number for the record that is currently being written out
// This is only useful for long-readout windows, in which there may
// be multiple calls to write()
size_t m_current_record_number;

// Configuration
hdf5datastore::ConfParams m_config_params;
Expand All @@ -340,8 +367,7 @@ class HDF5DataStore : public DataStore
/**
* @brief Translates the specified input parameters into the appropriate filename.
*/
std::string get_file_name(uint64_t record_number, // NOLINT(build/unsigned)
daqdataformats::run_number_t run_number)
std::string get_file_name(daqdataformats::run_number_t run_number)
{
std::ostringstream work_oss;
work_oss << m_config_params.directory_path;
Expand All @@ -356,28 +382,24 @@ class HDF5DataStore : public DataStore
work_oss << m_config_params.filename_parameters.run_number_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_run_number) << std::setfill('0') << run_number;
work_oss << "_";
if (m_config_params.mode == "one-event-per-file") {

work_oss << m_config_params.filename_parameters.trigger_number_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_trigger_number) << std::setfill('0')
<< record_number;
} else if (m_config_params.mode == "all-per-file") {
work_oss << m_config_params.filename_parameters.file_index_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_file_index) << std::setfill('0')
<< m_file_index;

work_oss << m_config_params.filename_parameters.file_index_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_file_index) << std::setfill('0')
<< m_file_index;
}
work_oss << "_" << m_config_params.filename_parameters.writer_identifier;
work_oss << ".hdf5";
return work_oss.str();
}

void increment_file_index_if_needed(size_t size_of_next_write)
bool increment_file_index_if_needed(size_t size_of_next_write)
{
if ((m_recorded_size + size_of_next_write) > m_max_file_size && m_recorded_size > 0) {
++m_file_index;
m_recorded_size = 0;
return true;
}
return false;
}

void open_file_if_needed(const std::string& file_name, unsigned open_flags = HighFive::File::ReadOnly)
Expand Down
54 changes: 49 additions & 5 deletions plugins/TPStreamWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ TPStreamWriter::get_info(opmonlib::InfoCollector& ci, int /*level*/)
{
tpstreamwriterinfo::Info info;

info.tpset_received = m_tpset_received.exchange(0);
info.tpset_written = m_tpset_written.exchange(0);
info.heartbeat_tpsets_received = m_heartbeat_tpsets.exchange(0);
info.tpsets_with_tps_received = m_tpsets_with_tps.exchange(0);
info.tps_received = m_tps_received.exchange(0);
info.tps_written = m_tps_written.exchange(0);
info.timeslices_written = m_timeslices_written.exchange(0);
info.bytes_output = m_bytes_output.exchange(0);
info.tardy_timeslice_max_seconds = m_tardy_timeslice_max_seconds.exchange(0.0);
info.total_tps_received = m_total_tps_received.load();
info.total_tps_written = m_total_tps_written.load();

ci.add(info);
}
Expand All @@ -76,6 +82,8 @@ TPStreamWriter::do_conf(const data_t& payload)
m_accumulation_inactivity_time_before_write =
std::chrono::milliseconds(static_cast<int>(1000*conf_params.tp_accumulation_inactivity_time_before_write_sec));
m_source_id = conf_params.source_id;
warn_user_when_tardy_tps_are_discarded = conf_params.warn_user_when_tardy_tps_are_discarded;
m_accumulation_interval_seconds = ((double) m_accumulation_interval_ticks) / 62500000.0;

// create the DataStore instance here
try {
Expand All @@ -98,6 +106,8 @@ TPStreamWriter::do_start(const nlohmann::json& payload)
TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method";
rcif::cmd::StartParams start_params = payload.get<rcif::cmd::StartParams>();
m_run_number = start_params.run;
m_total_tps_received.store(0);
m_total_tps_written.store(0);

// 06-Mar-2022, KAB: added this call to allow DataStore to prepare for the run.
// I've put this call fairly early in this method because it could throw an
Expand Down Expand Up @@ -159,15 +169,17 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
TPBundleHandler tp_bundle_handler(m_accumulation_interval_ticks, m_run_number, m_accumulation_inactivity_time_before_write);

bool possible_pending_data = true;
size_t largest_timeslice_number = 0;
while (running_flag.load() || possible_pending_data) {
trigger::TPSet tpset;
try {
tpset = m_tpset_source->receive(m_queue_timeout);
++n_tpset_received;
++m_tpset_received;

if (tpset.type == trigger::TPSet::Type::kHeartbeat)
if (tpset.type == trigger::TPSet::Type::kHeartbeat) {
++m_heartbeat_tpsets;
continue;
}

TLOG_DEBUG(21) << "Number of TPs in TPSet is " << tpset.objects.size() << ", Source ID is " << tpset.origin
<< ", seqno is " << tpset.seqno << ", start timestamp is " << tpset.start_time << ", run number is "
Expand All @@ -181,8 +193,12 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
<< m_run_number << "), Source ID is " << tpset.origin << ", seqno is " << tpset.seqno;
continue;
}
++m_tpsets_with_tps;

size_t num_tps_in_tpset = tpset.objects.size();
tp_bundle_handler.add_tpset(std::move(tpset));
m_tps_received += num_tps_in_tpset;
m_total_tps_received += num_tps_in_tpset;
} catch (iomanager::TimeoutExpired&) {
if (running_flag.load()) {continue;}
}
Expand All @@ -194,6 +210,13 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
list_of_timeslices = tp_bundle_handler.get_all_remaining_timeslices();
possible_pending_data = false;
}

// keep track of the largest timeslice number (for reporting on tardy ones)
for (auto& timeslice_ptr : list_of_timeslices) {
largest_timeslice_number = std::max(timeslice_ptr->get_header().timeslice_number, largest_timeslice_number);
}

// attempt to write out each TimeSlice
for (auto& timeslice_ptr : list_of_timeslices) {
daqdataformats::SourceID sid(daqdataformats::SourceID::Subsystem::kTRBuilder, m_source_id);
timeslice_ptr->set_element_id(sid);
Expand All @@ -205,8 +228,11 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
should_retry = false;
try {
m_data_writer->write(*timeslice_ptr);
++m_tpset_written;
++m_timeslices_written;
m_bytes_output += timeslice_ptr->get_total_size_bytes();
size_t number_of_tps_written = (timeslice_ptr->get_sum_of_fragment_payload_sizes() / sizeof(trgdataformats::TriggerPrimitive));
m_tps_written += number_of_tps_written;
m_total_tps_written += number_of_tps_written;
} catch (const RetryableDataStoreProblem& excpt) {
should_retry = true;
ers::error(DataWritingProblem(ERS_HERE,
Expand All @@ -219,6 +245,24 @@ TPStreamWriter::do_work(std::atomic<bool>& running_flag)
}
usleep(retry_wait_usec);
retry_wait_usec *= 2;
} catch (const IgnorableDataStoreProblem& excpt) {
int timeslice_number_diff = largest_timeslice_number - timeslice_ptr->get_header().timeslice_number;
double seconds_too_late = m_accumulation_interval_seconds * timeslice_number_diff;
m_tardy_timeslice_max_seconds = std::max(m_tardy_timeslice_max_seconds.load(), seconds_too_late);
if (warn_user_when_tardy_tps_are_discarded) {
std::ostringstream sid_list;
bool first_frag = true;
for (auto const& frag_ptr : timeslice_ptr->get_fragments_ref()) {
if (first_frag) {first_frag = false;}
else {sid_list << ",";}
sid_list << frag_ptr->get_element_id().to_string();
}
ers::warning(TardyTPsDiscarded(ERS_HERE,
get_name(),
sid_list.str(),
timeslice_ptr->get_header().timeslice_number,
seconds_too_late));
}
} catch (const std::exception& excpt) {
ers::warning(DataWritingProblem(ERS_HERE,
get_name(),
Expand Down
Loading
Loading