Skip to content

Commit

Permalink
Merge pull request #359 from DUNE-DAQ/kbiery/one_event_per_file_fixes
Browse files Browse the repository at this point in the history
Fixed the file_index Attribute inside long-readout-window HDF5 files when one-event-per-file is enabled
  • Loading branch information
wesketchum committed Jul 5, 2024
2 parents 1def8db + f461cfe commit a7da594
Showing 1 changed file with 45 additions and 28 deletions.
73 changes: 45 additions & 28 deletions plugins/HDF5DataStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class HDF5DataStore : public DataStore

m_file_index = 0;
m_recorded_size = 0;
m_current_record_number = std::numeric_limits<size_t>::max();

if (m_operation_mode != "one-event-per-file"
//&& m_operation_mode != "one-fragment-per-file"
Expand Down Expand Up @@ -162,7 +163,7 @@ class HDF5DataStore : public DataStore
virtual void write(const daqdataformats::TriggerRecord& tr)
{

// check if there is sufficient space for this data block
// check if there is sufficient space for this record
size_t current_free_space = get_free_space(m_path);
size_t tr_size = tr.get_total_size_bytes();
if (current_free_space < (m_free_space_safety_factor_for_write * tr_size)) {
Expand All @@ -178,12 +179,19 @@ class HDF5DataStore : public DataStore
throw RetryableDataStoreProblem(ERS_HERE, get_name(), msg, issue);
}

// check if a new file should be opened for this data block
increment_file_index_if_needed(tr_size);
// check if a new file should be opened for this record
if (! increment_file_index_if_needed(tr_size)) {
if (m_config_params.mode == "one-event-per-file") {
if (m_current_record_number != std::numeric_limits<size_t>::max() &&
tr.get_header_ref().get_trigger_number() != m_current_record_number) {
++m_file_index;
}
}
}
m_current_record_number = tr.get_header_ref().get_trigger_number();

// determine the filename from Storage Key + configuration parameters
std::string full_filename =
get_file_name(tr.get_header_ref().get_trigger_number(), tr.get_header_ref().get_run_number());
std::string full_filename = get_file_name(tr.get_header_ref().get_run_number());

try {
open_file_if_needed(full_filename, HighFive::File::OpenOrCreate);
Expand All @@ -194,7 +202,7 @@ class HDF5DataStore : public DataStore
throw FileOperationProblem(ERS_HERE, get_name(), full_filename);
}

// write the data block
// write the record
m_file_handle->write(tr);
m_recorded_size = m_file_handle->get_recorded_size();
}
Expand All @@ -209,7 +217,7 @@ class HDF5DataStore : public DataStore
virtual void write(const daqdataformats::TimeSlice& ts)
{

// check if there is sufficient space for this data block
// check if there is sufficient space for this record
size_t current_free_space = get_free_space(m_path);
size_t ts_size = ts.get_total_size_bytes();
if (current_free_space < (m_free_space_safety_factor_for_write * ts_size)) {
Expand All @@ -225,11 +233,19 @@ class HDF5DataStore : public DataStore
throw RetryableDataStoreProblem(ERS_HERE, get_name(), msg, issue);
}

// check if a new file should be opened for this data block
increment_file_index_if_needed(ts_size);
// check if a new file should be opened for this record
if (! increment_file_index_if_needed(ts_size)) {
if (m_config_params.mode == "one-event-per-file") {
if (m_current_record_number != std::numeric_limits<size_t>::max() &&
ts.get_header().timeslice_number != m_current_record_number) {
++m_file_index;
}
}
}
m_current_record_number = ts.get_header().timeslice_number;

// determine the filename from Storage Key + configuration parameters
std::string full_filename = get_file_name(ts.get_header().timeslice_number, ts.get_header().run_number);
std::string full_filename = get_file_name(ts.get_header().run_number);

try {
open_file_if_needed(full_filename, HighFive::File::OpenOrCreate);
Expand All @@ -240,13 +256,13 @@ class HDF5DataStore : public DataStore
throw FileOperationProblem(ERS_HERE, get_name(), full_filename);
}

// write the data block
// write the record
m_file_handle->write(ts);
m_recorded_size = m_file_handle->get_recorded_size();
}
}

/**
* @brief Informs the HDF5DataStore that writes or reads of data blocks
* @brief Informs the HDF5DataStore that writes or reads of records
* associated with the specified run number will soon be requested.
* This allows the DataStore to test that the output file path is valid
* and any other checks that are useful in advance of the first data
Expand Down Expand Up @@ -280,10 +296,11 @@ class HDF5DataStore : public DataStore

m_file_index = 0;
m_recorded_size = 0;
m_current_record_number = std::numeric_limits<size_t>::max();
}

/**
* @brief Informs the HD5DataStore that writes or reads of data blocks
* @brief Informs the HD5DataStore that writes or reads of records
* associated with the specified run number have finished, for now.
* This allows the DataStore to close open files and do any other
* cleanup or shutdown operations that are useful once the writes or
Expand Down Expand Up @@ -324,8 +341,13 @@ class HDF5DataStore : public DataStore
// Total number of generated files
size_t m_file_index;

// Total size of data being written
size_t m_recorded_size;
// Size of data being written
size_t m_recorded_size; // per file

// Record number for the record that is currently being written out
// This is only useful for long-readout windows, in which there may
// be multiple calls to write()
size_t m_current_record_number;

// Configuration
hdf5datastore::ConfParams m_config_params;
Expand All @@ -340,8 +362,7 @@ class HDF5DataStore : public DataStore
/**
* @brief Translates the specified input parameters into the appropriate filename.
*/
std::string get_file_name(uint64_t record_number, // NOLINT(build/unsigned)
daqdataformats::run_number_t run_number)
std::string get_file_name(daqdataformats::run_number_t run_number)
{
std::ostringstream work_oss;
work_oss << m_config_params.directory_path;
Expand All @@ -356,28 +377,24 @@ class HDF5DataStore : public DataStore
work_oss << m_config_params.filename_parameters.run_number_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_run_number) << std::setfill('0') << run_number;
work_oss << "_";
if (m_config_params.mode == "one-event-per-file") {

work_oss << m_config_params.filename_parameters.trigger_number_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_trigger_number) << std::setfill('0')
<< record_number;
} else if (m_config_params.mode == "all-per-file") {
work_oss << m_config_params.filename_parameters.file_index_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_file_index) << std::setfill('0')
<< m_file_index;

work_oss << m_config_params.filename_parameters.file_index_prefix;
work_oss << std::setw(m_config_params.filename_parameters.digits_for_file_index) << std::setfill('0')
<< m_file_index;
}
work_oss << "_" << m_config_params.filename_parameters.writer_identifier;
work_oss << ".hdf5";
return work_oss.str();
}

void increment_file_index_if_needed(size_t size_of_next_write)
bool increment_file_index_if_needed(size_t size_of_next_write)
{
if ((m_recorded_size + size_of_next_write) > m_max_file_size && m_recorded_size > 0) {
++m_file_index;
m_recorded_size = 0;
return true;
}
return false;
}

void open_file_if_needed(const std::string& file_name, unsigned open_flags = HighFive::File::ReadOnly)
Expand Down

0 comments on commit a7da594

Please sign in to comment.