Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b56b64a
pushed config in taskrunner
Barthelemy Dec 2, 2021
61cca54
correct name
Dec 2, 2021
53cdd0b
more output
Barthelemy Dec 2, 2021
79c482e
with Giulio's magic line
Barthelemy Dec 2, 2021
60bf80d
fixfff
Barthelemy Dec 3, 2021
e51a0c5
error printing
Dec 6, 2021
01a1d1a
print the tree
Barthelemy Dec 8, 2021
3d23d4f
print the tree
Barthelemy Dec 8, 2021
6067d57
il
Barthelemy Dec 8, 2021
82f4088
working ?
Jan 13, 2022
11148fd
say if empty
Jan 13, 2022
c1482ae
Use the config and fix a wrong guard in the header.
Barthelemy Dec 13, 2021
0294888
handle the config
Barthelemy Jan 13, 2022
129688b
clean up and extraction
Barthelemy Jan 13, 2022
5daf7f9
Add the label qc-reconfigurable
Barthelemy Jan 13, 2022
a48795a
Rename loadTaskConfig to printTaskConfig
Barthelemy Jan 14, 2022
7a263bd
Extract infologger initialization
Barthelemy Jan 14, 2022
1a0f305
Extract infologger initialization
Barthelemy Jan 14, 2022
65907b9
Prepare aggregators for refresh
Barthelemy Jan 14, 2022
9439568
AggregatorRunner refresh
Barthelemy Jan 14, 2022
d68381d
format
Barthelemy Jan 14, 2022
fd7b0c8
remove cout
Barthelemy Jan 17, 2022
679a05e
format
Barthelemy Jan 17, 2022
6690a55
Update Framework/include/QualityControl/AggregatorRunner.h
Barthelemy Jan 17, 2022
8fb3571
remove useless inline
Barthelemy Jan 17, 2022
95e24ba
Pass only what we need and pass config
Barthelemy Jan 17, 2022
c3268be
Update Framework/include/QualityControl/runnerUtils.h
Barthelemy Jan 17, 2022
7a11774
proper label method names
Barthelemy Jan 17, 2022
1159a85
Better printing of ptree
Barthelemy Jan 17, 2022
fdf8889
remove unused headers
Barthelemy Jan 17, 2022
ea25cf6
Fix issue iwth resetAfterCycles
Barthelemy Jan 17, 2022
713d79f
format
Barthelemy Jan 17, 2022
dda49c1
format
Barthelemy Jan 17, 2022
a61ec09
trigger rebuild
Barthelemy Jan 18, 2022
71fd3f4
fix
Barthelemy Jan 18, 2022
d8e353f
Merge branch 'master' into push-config
Barthelemy Jan 24, 2022
00eac9d
fix
Barthelemy Jan 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions Framework/include/QualityControl/AggregatorRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,18 @@ class AggregatorRunner : public framework::Task
*/
void store(core::QualityObjectsType& qualityObjects);

inline void initDatabase();
inline void initMonitoring();
inline void initServiceDiscovery();
inline void initAggregators();
void refreshConfig(framework::InitContext& iCtx);

/**
* Prepare the inputs, remove the duplicates
*/
void prepareInputs();

void initInfoLogger(framework::InitContext& iCtx);
void initDatabase();
void initMonitoring();
void initServiceDiscovery();
void initAggregators();

/**
* Reorder the aggregators stored in mAggregators.
Expand Down Expand Up @@ -173,7 +181,7 @@ class AggregatorRunner : public framework::Task
std::vector<std::shared_ptr<Aggregator>> mAggregators;
std::shared_ptr<o2::quality_control::repository::DatabaseInterface> mDatabase;
AggregatorRunnerConfig mRunnerConfig;
std::vector<AggregatorConfig> mAggregatorsConfigs;
std::vector<AggregatorConfig> mAggregatorsConfig;
core::QualityObjectsMapType mQualityObjects; // where we cache the incoming quality objects and the output of the aggregators
UpdatePolicyManager updatePolicyManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct AggregatorRunnerConfig {
std::string fallbackPeriodName{};
std::string fallbackPassName{};
std::string fallbackProvenance{};
framework::Options options{};
};

} // namespace o2::quality_control::checker
Expand Down
8 changes: 6 additions & 2 deletions Framework/include/QualityControl/AggregatorRunnerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "QualityControl/CommonSpec.h"
#include "QualityControl/AggregatorRunnerConfig.h"
#include "QualityControl/AggregatorConfig.h"
#include "QualityControl/AggregatorSpec.h"

#include <vector>

Expand All @@ -36,10 +37,13 @@ class AggregatorRunnerFactory
AggregatorRunnerFactory() = default;
virtual ~AggregatorRunnerFactory() = default;

static framework::DataProcessorSpec create(AggregatorRunnerConfig arc, std::vector<AggregatorConfig> acs);
static framework::DataProcessorSpec create(const core::CommonSpec& commonSpec,
const std::vector<AggregatorSpec>& aggregatorsSpec);
static void customizeInfrastructure(std::vector<framework::CompletionPolicy>& policies);

static AggregatorRunnerConfig extractConfig(const core::CommonSpec&);
static AggregatorRunnerConfig extractRunnerConfig(const core::CommonSpec&);
static std::vector<AggregatorConfig> extractAggregatorsConfig(const core::CommonSpec& commonSpec,
const std::vector<AggregatorSpec>& aggregatorsSpec);
};

} // namespace o2::quality_control::checker
Expand Down
12 changes: 8 additions & 4 deletions Framework/include/QualityControl/CheckRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class CheckRunner : public framework::Task
void setTaskStoreSet(std::unordered_set<std::string> storeSet) { mInputStoreSet = storeSet; }
std::string getDeviceName() { return mDeviceName; };

static framework::DataProcessorLabel getLabel() { return { "qc-check" }; }
static framework::DataProcessorLabel getCheckRunnerLabel() { return { "qc-check" }; }
static std::string createCheckRunnerIdString() { return "qc-check"; };
static std::string createCheckRunnerName(const std::vector<CheckConfig>& checks);
static std::string createSinkCheckRunnerName(o2::framework::InputSpec input);
Expand Down Expand Up @@ -164,9 +164,10 @@ class CheckRunner : public framework::Task
*/
static o2::framework::Outputs collectOutputs(const std::vector<CheckConfig>& checks);

inline void initDatabase();
inline void initMonitoring();
inline void initServiceDiscovery();
void initDatabase();
void initMonitoring();
void initServiceDiscovery();
void initInfologger(framework::InitContext& iCtx);

/**
* Update the list of objects this TaskRunner is sending out.
Expand Down Expand Up @@ -203,6 +204,9 @@ class CheckRunner : public framework::Task
/// \brief Callback for CallbackService::Id::Reset (DPL) a.k.a. RESET DEVICE transition (FairMQ)
void reset();

/// Refresh the configuration using the payload found in the fairmq options (if available)
void refreshConfig(framework::InitContext& iCtx);

// General state
std::string mDeviceName;
std::vector<Check> mChecks;
Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/CheckRunnerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct CheckRunnerConfig {
std::string fallbackPeriodName{};
std::string fallbackPassName{};
std::string fallbackProvenance{};
framework::Options options{};
};

} // namespace o2::quality_control::checker
Expand Down
6 changes: 4 additions & 2 deletions Framework/include/QualityControl/TaskRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class TaskRunner : public framework::Task
const framework::Options& getOptions() const { return mTaskConfig.options; };

/// \brief Data Processor Label to identify all Task Runners
static framework::DataProcessorLabel getLabel() { return { "qc-task" }; }
static framework::DataProcessorLabel getTaskRunnerLabel() { return { "qc-task" }; }
/// \brief ID string for all TaskRunner devices
static std::string createTaskRunnerIdString();
/// \brief Unified DataOrigin for Quality Control tasks
Expand All @@ -112,7 +112,9 @@ class TaskRunner : public framework::Task
void reset();

std::tuple<bool /*data ready*/, bool /*timer ready*/> validateInputs(const framework::InputRecord&);
void loadTaskConfig();
void refreshConfig(framework::InitContext& iCtx);
void initInfologger(framework::InitContext& iCtx);
void printTaskConfig();
void startOfActivity();
void endOfActivity();
void startCycle();
Expand Down
8 changes: 5 additions & 3 deletions Framework/include/QualityControl/TaskRunnerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
/// \author Piotr Konopka
///

#ifndef QC_CORE_TASKFACTORY_H
#define QC_CORE_TASKFACTORY_H
#ifndef QC_CORE_TASKRUNNERFACTORY_H
#define QC_CORE_TASKRUNNERFACTORY_H

#include <string>
#include <vector>
Expand Down Expand Up @@ -49,6 +49,8 @@ class TaskRunnerFactory
/// \brief Knows how to create TaskConfig from Specs
static TaskRunnerConfig extractConfig(const CommonSpec&, const TaskSpec&, std::optional<int> id = std::nullopt, std::optional<int> resetAfterCycles = std::nullopt);

static bool computeResetAfterCycles(const TaskSpec& taskSpec);

/// \brief Provides necessary customization of the TaskRunners.
///
/// Provides necessary customization of the Completion Policies of the TaskRunners. This is necessary to make
Expand All @@ -59,4 +61,4 @@ class TaskRunnerFactory

} // namespace o2::quality_control::core

#endif // QC_CORE_TASKFACTORY_H
#endif // QC_CORE_TASKRUNNERFACTORY_H
16 changes: 15 additions & 1 deletion Framework/include/QualityControl/runnerUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <FairMQDevice.h>
#include <Framework/ConfigParamRegistry.h>
#include <QualityControl/QcInfoLogger.h>
#include <boost/property_tree/json_parser.hpp>

namespace o2::quality_control::core
{
Expand Down Expand Up @@ -119,6 +120,19 @@ inline std::string computeProvenance(const std::string& fallbackProvenance = "")
return provenance;
}

inline std::string indentTree(int level)
{
return std::string(level * 2, ' ');
}

inline void printTree(const boost::property_tree::ptree& pt, int level = 0)
{
std::stringstream ss;
boost::property_tree::json_parser::write_json(ss, pt);
for (std::string line; std::getline(ss, line, '\n');)
ILOG(Debug, Trace) << line << ENDM;
}

} // namespace o2::quality_control::core

#endif //QUALITYCONTROL_RUNNERUTILS_H
#endif // QUALITYCONTROL_RUNNERUTILS_H
96 changes: 73 additions & 23 deletions Framework/src/AggregatorRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@
#include <Framework/InputRecordWalker.h>

#include <utility>

#include <TSystem.h>

// QC
#include "QualityControl/DatabaseFactory.h"
#include "QualityControl/QcInfoLogger.h"
#include "QualityControl/ServiceDiscovery.h"
#include "QualityControl/Aggregator.h"
#include "QualityControl/runnerUtils.h"
#include "QualityControl/InfrastructureSpecReader.h"
#include "QualityControl/AggregatorRunnerFactory.h"

using namespace AliceO2::Common;
using namespace AliceO2::InfoLogger;
Expand All @@ -46,18 +51,64 @@ const auto current_diagnostic = boost::current_exception_diagnostic_information;
namespace o2::quality_control::checker
{

AggregatorRunner::AggregatorRunner(AggregatorRunnerConfig arc, const std::vector<AggregatorConfig>& acs)
AggregatorRunner::AggregatorRunner(AggregatorRunnerConfig arc, const std::vector<AggregatorConfig>& acs) //, const o2::quality_control::core::InfrastructureSpec& infrastructureSpec)
: mDeviceName(createAggregatorRunnerName()),
mRunnerConfig(std::move(arc)),
mAggregatorsConfigs(acs),
mAggregatorsConfig(std::move(acs)),
mTotalNumberObjectsReceived(0),
mTotalNumberAggregatorExecuted(0),
mTotalNumberObjectsProduced(0)
{
// Prepare the inputs, remove duplicates
prepareInputs();
}

AggregatorRunner::~AggregatorRunner()
{
if (mServiceDiscovery != nullptr) {
mServiceDiscovery->deregister();
}
}

void AggregatorRunner::refreshConfig(InitContext& iCtx)
{
try {
auto updatedTree = iCtx.options().get<boost::property_tree::ptree>("qcConfiguration");

if (updatedTree.empty()) {
ILOG(Warning, Devel) << "Templated config tree is empty, we continue with the original one" << ENDM;
} else {
if (gSystem->Getenv("O2_QC_DEBUG_CONFIG_TREE")) { // until we are sure it works, keep a backdoor
ILOG(Debug, Devel) << "We print the tree we got from the ECS via DPL : " << ENDM;
printTree(updatedTree);
}

// read the config, prepare spec
auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(updatedTree);

// replace the runner config
mRunnerConfig = AggregatorRunnerFactory::extractRunnerConfig(infrastructureSpec.common);

// replace the aggregators configs
mAggregatorsConfig.clear();
mAggregatorsConfig = AggregatorRunnerFactory::extractAggregatorsConfig(infrastructureSpec.common, infrastructureSpec.aggregators);

// replace the inputs
mInputs.clear();
prepareInputs();

ILOG(Debug, Devel) << "Configuration refreshed" << ENDM;
}
} catch (std::invalid_argument& error) {
// ignore the error, we just skip the update of the config file. It can be legit, e.g. in command line mode
ILOG(Warning, Devel) << "Could not get updated config tree in TaskRunner::init() - `qcConfiguration` could not be retrieved" << ENDM;
}
}

void AggregatorRunner::prepareInputs()
{
std::set<std::string> alreadySeen;
int i = 0;
for (const auto& aggConfig : mAggregatorsConfigs) {
for (const auto& aggConfig : mAggregatorsConfig) {
for (auto input : aggConfig.inputSpecs) {
if (alreadySeen.count(input.binding) == 0) {
alreadySeen.insert(input.binding);
Expand All @@ -68,13 +119,6 @@ AggregatorRunner::AggregatorRunner(AggregatorRunnerConfig arc, const std::vector
}
}

AggregatorRunner::~AggregatorRunner()
{
if (mServiceDiscovery != nullptr) {
mServiceDiscovery->deregister();
}
}

header::DataDescription AggregatorRunner::createAggregatorRunnerDataDescription(const std::string& aggregatorName)
{
if (aggregatorName.empty()) {
Expand All @@ -92,18 +136,12 @@ std::string AggregatorRunner::createAggregatorRunnerName()

void AggregatorRunner::init(framework::InitContext& iCtx)
{
InfoLoggerContext* ilContext = nullptr;
AliceO2::InfoLogger::InfoLogger* il = nullptr;
try {
ilContext = &iCtx.services().get<AliceO2::InfoLogger::InfoLoggerContext>();
il = &iCtx.services().get<AliceO2::InfoLogger::InfoLogger>();
} catch (const RuntimeErrorRef& err) {
ILOG(Error) << "Could not find the DPL InfoLogger." << ENDM;
}
initInfoLogger(iCtx);

refreshConfig(iCtx);
QcInfoLogger::setDetector(AggregatorRunner::getDetectorName(mAggregators));

try {
QcInfoLogger::init("aggregator", mRunnerConfig.infologgerFilterDiscardDebug, mRunnerConfig.infologgerDiscardLevel, il, ilContext);
QcInfoLogger::setDetector(AggregatorRunner::getDetectorName(mAggregators));
initDatabase();
initMonitoring();
initServiceDiscovery();
Expand Down Expand Up @@ -225,7 +263,7 @@ void AggregatorRunner::initAggregators()
ILOG(Info, Devel) << "Initialization of the aggregators" << ENDM;

// For every aggregator definition, create an Aggregator
for (const auto& aggregatorConfig : mAggregatorsConfigs) {
for (const auto& aggregatorConfig : mAggregatorsConfig) {
ILOG(Info, Devel) << ">> Aggregator name : " << aggregatorConfig.name << ENDM;
try {
auto aggregator = make_shared<Aggregator>(aggregatorConfig);
Expand All @@ -247,6 +285,19 @@ void AggregatorRunner::initAggregators()
reorderAggregators();
}

void AggregatorRunner::initInfoLogger(InitContext& iCtx)
{
InfoLoggerContext* ilContext = nullptr;
AliceO2::InfoLogger::InfoLogger* il = nullptr;
try {
ilContext = &iCtx.services().get<AliceO2::InfoLogger::InfoLoggerContext>();
il = &iCtx.services().get<AliceO2::InfoLogger::InfoLogger>();
} catch (const RuntimeErrorRef& err) {
ILOG(Error) << "Could not find the DPL InfoLogger." << ENDM;
}
QcInfoLogger::init("aggregator", mRunnerConfig.infologgerFilterDiscardDebug, mRunnerConfig.infologgerDiscardLevel, il, ilContext);
}

bool AggregatorRunner::areSourcesIn(const std::vector<AggregatorSource>& sources,
const std::vector<std::shared_ptr<Aggregator>>& aggregators)
{
Expand Down Expand Up @@ -336,7 +387,6 @@ void AggregatorRunner::start(const ServiceRegistry& services)
<< "\n - period: " << mActivity.mPeriodName << "\n - pass type: " << mActivity.mPassName << "\n - provenance: " << mActivity.mProvenance << ENDM;
}


void AggregatorRunner::stop()
{
ILOG(Info, Ops) << "Stopping run " << mActivity.mId << ENDM;
Expand Down
Loading