/
DataWriter.hpp
141 lines (117 loc) · 5.29 KB
/
DataWriter.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/**
* @file DataWriter.hpp
*
* This is part of the DUNE DAQ Software Suite, copyright 2020.
* Licensing/copyright details are in the COPYING file that you should have
* received with this code.
*/
#ifndef DFMODULES_PLUGINS_DATAWRITER_HPP_
#define DFMODULES_PLUGINS_DATAWRITER_HPP_
#include "dfmodules/DataStore.hpp"
#include "appfwk/DAQModule.hpp"
#include "appdal/DataWriterConf.hpp"
#include "coredal/ReadoutMap.hpp"
#include "coredal/DetectorConfig.hpp"
#include "daqdataformats/TriggerRecord.hpp"
#include "dfmessages/TriggerDecisionToken.hpp"
#include "iomanager/Receiver.hpp"
#include "iomanager/Sender.hpp"
#include "utilities/WorkerThread.hpp"
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <vector>
namespace dunedaq {
namespace dfmodules {
/**
* @brief DataWriter is a shell for what we might write for the MiniDAQApp.
*/
class DataWriter : public dunedaq::appfwk::DAQModule
{
public:
/**
* @brief DataWriter Constructor
* @param name Instance name for this DataWriter instance
*/
explicit DataWriter(const std::string& name);
DataWriter(const DataWriter&) = delete; ///< DataWriter is not copy-constructible
DataWriter& operator=(const DataWriter&) = delete; ///< DataWriter is not copy-assignable
DataWriter(DataWriter&&) = delete; ///< DataWriter is not move-constructible
DataWriter& operator=(DataWriter&&) = delete; ///< DataWriter is not move-assignable
void init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg) override;
void get_info(opmonlib::InfoCollector& ci, int level) override;
private:
// Commands
void do_conf(const data_t&);
void do_start(const data_t&);
void do_stop(const data_t&);
void do_scrap(const data_t&);
// Callback
void receive_trigger_record(std::unique_ptr<daqdataformats::TriggerRecord>&);
std::atomic<bool> m_running = false;
// Configuration
const appdal::DataWriterConf* m_data_writer_conf;
const coredal::ReadoutMap* m_readout_map;
const coredal::DetectorConfig* m_detector_config;
// size_t m_sleep_msec_while_running;
std::chrono::milliseconds m_queue_timeout;
bool m_data_storage_is_enabled;
int m_data_storage_prescale;
daqdataformats::run_number_t m_run_number;
size_t m_min_write_retry_time_usec;
size_t m_max_write_retry_time_usec;
int m_write_retry_time_increase_factor;
// Connections
std::string m_trigger_record_connection;
using tr_receiver_ct = iomanager::ReceiverConcept<std::unique_ptr<daqdataformats::TriggerRecord>>;
std::shared_ptr<tr_receiver_ct> m_tr_receiver;
using token_sender_t = iomanager::SenderConcept<dfmessages::TriggerDecisionToken>;
std::shared_ptr<token_sender_t> m_token_output;
std::string m_trigger_decision_connection;
// Worker(s)
dunedaq::utilities::WorkerThread m_thread;
void do_work(std::atomic<bool>&);
std::unique_ptr<DataStore> m_data_writer;
// Metrics
std::atomic<uint64_t> m_records_received = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_records_received_tot = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_records_written = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_records_written_tot = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_bytes_output = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_bytes_output_tot = { 0 }; // NOLINT(build/unsigned)
std::atomic<uint64_t> m_writing_ms = { 0 }; // NOLINT(build/unsigned)
// Other
std::map<daqdataformats::trigger_number_t, size_t> m_seqno_counts;
inline double elapsed_seconds(std::chrono::steady_clock::time_point then,
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now()) const
{
return std::chrono::duration_cast<std::chrono::seconds>(now - then).count();
}
};
} // namespace dfmodules
ERS_DECLARE_ISSUE_BASE(dfmodules,
InvalidDataWriter,
appfwk::GeneralDAQModuleIssue,
"A valid dataWriter instance is not available so it will not be possible to write data. A "
"likely cause for this is a skipped or missed Configure transition.",
((std::string)name),
ERS_EMPTY)
ERS_DECLARE_ISSUE_BASE(dfmodules,
DataWritingProblem,
appfwk::GeneralDAQModuleIssue,
"A problem was encountered when writing TriggerRecord number " << trnum << "." << seqnum
<< " in run " << runnum,
((std::string)name),
((size_t)trnum)((size_t)seqnum)((size_t)runnum))
ERS_DECLARE_ISSUE_BASE(dfmodules,
InvalidRunNumber,
appfwk::GeneralDAQModuleIssue,
"An invalid run number was received in a "
<< msg_type << "message, "
<< "received=" << received << ", expected=" << expected << ", trig/seq_number=" << trnum << "."
<< seqnum,
((std::string)name),
((std::string)msg_type)((size_t)received)((size_t)expected)((size_t)trnum)((size_t)seqnum))
} // namespace dunedaq
#endif // DFMODULES_PLUGINS_DATAWRITER_HPP_