diff --git a/Framework/include/QualityControl/AggregatorRunner.h b/Framework/include/QualityControl/AggregatorRunner.h index 77b9e11dc1..3c59c86be9 100644 --- a/Framework/include/QualityControl/AggregatorRunner.h +++ b/Framework/include/QualityControl/AggregatorRunner.h @@ -135,10 +135,18 @@ class AggregatorRunner : public framework::Task */ void store(core::QualityObjectsType& qualityObjects); - inline void initDatabase(); - inline void initMonitoring(); - inline void initServiceDiscovery(); - inline void initAggregators(); + void refreshConfig(framework::InitContext& iCtx); + + /** + * Prepare the inputs, remove the duplicates + */ + void prepareInputs(); + + void initInfoLogger(framework::InitContext& iCtx); + void initDatabase(); + void initMonitoring(); + void initServiceDiscovery(); + void initAggregators(); /** * Reorder the aggregators stored in mAggregators. @@ -173,7 +181,7 @@ class AggregatorRunner : public framework::Task std::vector> mAggregators; std::shared_ptr mDatabase; AggregatorRunnerConfig mRunnerConfig; - std::vector mAggregatorsConfigs; + std::vector mAggregatorsConfig; core::QualityObjectsMapType mQualityObjects; // where we cache the incoming quality objects and the output of the aggregators UpdatePolicyManager updatePolicyManager; diff --git a/Framework/include/QualityControl/AggregatorRunnerConfig.h b/Framework/include/QualityControl/AggregatorRunnerConfig.h index 366ec98ca0..e88656e9aa 100644 --- a/Framework/include/QualityControl/AggregatorRunnerConfig.h +++ b/Framework/include/QualityControl/AggregatorRunnerConfig.h @@ -33,6 +33,7 @@ struct AggregatorRunnerConfig { std::string fallbackPeriodName{}; std::string fallbackPassName{}; std::string fallbackProvenance{}; + framework::Options options{}; }; } // namespace o2::quality_control::checker diff --git a/Framework/include/QualityControl/AggregatorRunnerFactory.h b/Framework/include/QualityControl/AggregatorRunnerFactory.h index d4a44cd5bc..b1e3ec9466 100644 --- a/Framework/include/QualityControl/AggregatorRunnerFactory.h +++ b/Framework/include/QualityControl/AggregatorRunnerFactory.h @@ -23,6 +23,7 @@ #include "QualityControl/CommonSpec.h" #include "QualityControl/AggregatorRunnerConfig.h" #include "QualityControl/AggregatorConfig.h" +#include "QualityControl/AggregatorSpec.h" #include @@ -36,10 +37,13 @@ class AggregatorRunnerFactory AggregatorRunnerFactory() = default; virtual ~AggregatorRunnerFactory() = default; - static framework::DataProcessorSpec create(AggregatorRunnerConfig arc, std::vector acs); + static framework::DataProcessorSpec create(const core::CommonSpec& commonSpec, + const std::vector& aggregatorsSpec); static void customizeInfrastructure(std::vector& policies); - static AggregatorRunnerConfig extractConfig(const core::CommonSpec&); + static AggregatorRunnerConfig extractRunnerConfig(const core::CommonSpec&); + static std::vector extractAggregatorsConfig(const core::CommonSpec& commonSpec, + const std::vector& aggregatorsSpec); }; } // namespace o2::quality_control::checker diff --git a/Framework/include/QualityControl/CheckRunner.h b/Framework/include/QualityControl/CheckRunner.h index 784449fc17..1ebe863e06 100644 --- a/Framework/include/QualityControl/CheckRunner.h +++ b/Framework/include/QualityControl/CheckRunner.h @@ -114,7 +114,7 @@ class CheckRunner : public framework::Task void setTaskStoreSet(std::unordered_set storeSet) { mInputStoreSet = storeSet; } std::string getDeviceName() { return mDeviceName; }; - static framework::DataProcessorLabel getLabel() { return { "qc-check" }; } + static framework::DataProcessorLabel getCheckRunnerLabel() { return { "qc-check" }; } static std::string createCheckRunnerIdString() { return "qc-check"; }; static std::string createCheckRunnerName(const std::vector& checks); static std::string createSinkCheckRunnerName(o2::framework::InputSpec input); @@ -164,9 +164,10 @@ class CheckRunner : public framework::Task */ static o2::framework::Outputs collectOutputs(const std::vector& checks); - inline void initDatabase(); - inline void initMonitoring(); - inline void initServiceDiscovery(); + void initDatabase(); + void initMonitoring(); + void initServiceDiscovery(); + void initInfologger(framework::InitContext& iCtx); /** * Update the list of objects this TaskRunner is sending out. @@ -203,6 +204,9 @@ class CheckRunner : public framework::Task /// \brief Callback for CallbackService::Id::Reset (DPL) a.k.a. RESET DEVICE transition (FairMQ) void reset(); + /// Refresh the configuration using the payload found in the fairmq options (if available) + void refreshConfig(framework::InitContext& iCtx); + // General state std::string mDeviceName; std::vector mChecks; diff --git a/Framework/include/QualityControl/CheckRunnerConfig.h b/Framework/include/QualityControl/CheckRunnerConfig.h index 44b5e3ad15..20bac3c96c 100644 --- a/Framework/include/QualityControl/CheckRunnerConfig.h +++ b/Framework/include/QualityControl/CheckRunnerConfig.h @@ -32,6 +32,7 @@ struct CheckRunnerConfig { std::string fallbackPeriodName{}; std::string fallbackPassName{}; std::string fallbackProvenance{}; + framework::Options options{}; }; } // namespace o2::quality_control::checker diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index 2b2fbe6462..658669420b 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -90,7 +90,7 @@ class TaskRunner : public framework::Task const framework::Options& getOptions() const { return mTaskConfig.options; }; /// \brief Data Processor Label to identify all Task Runners - static framework::DataProcessorLabel getLabel() { return { "qc-task" }; } + static framework::DataProcessorLabel getTaskRunnerLabel() { return { "qc-task" }; } /// \brief ID string for all TaskRunner devices static std::string createTaskRunnerIdString(); /// \brief Unified DataOrigin for Quality Control tasks @@ -112,7 +112,9 @@ class TaskRunner : public framework::Task void reset(); std::tuple validateInputs(const framework::InputRecord&); - void loadTaskConfig(); + void refreshConfig(framework::InitContext& iCtx); + void initInfologger(framework::InitContext& iCtx); + void printTaskConfig(); void startOfActivity(); void endOfActivity(); void startCycle(); diff --git a/Framework/include/QualityControl/TaskRunnerFactory.h b/Framework/include/QualityControl/TaskRunnerFactory.h index 326d50a671..7d7caaa5f2 100644 --- a/Framework/include/QualityControl/TaskRunnerFactory.h +++ b/Framework/include/QualityControl/TaskRunnerFactory.h @@ -14,8 +14,8 @@ /// \author Piotr Konopka /// -#ifndef QC_CORE_TASKFACTORY_H -#define QC_CORE_TASKFACTORY_H +#ifndef QC_CORE_TASKRUNNERFACTORY_H +#define QC_CORE_TASKRUNNERFACTORY_H #include #include @@ -49,6 +49,8 @@ class TaskRunnerFactory /// \brief Knows how to create TaskConfig from Specs static TaskRunnerConfig extractConfig(const CommonSpec&, const TaskSpec&, std::optional id = std::nullopt, std::optional resetAfterCycles = std::nullopt); + static bool computeResetAfterCycles(const TaskSpec& taskSpec); + /// \brief Provides necessary customization of the TaskRunners. /// /// Provides necessary customization of the Completion Policies of the TaskRunners. This is necessary to make @@ -59,4 +61,4 @@ class TaskRunnerFactory } // namespace o2::quality_control::core -#endif // QC_CORE_TASKFACTORY_H +#endif // QC_CORE_TASKRUNNERFACTORY_H diff --git a/Framework/include/QualityControl/runnerUtils.h b/Framework/include/QualityControl/runnerUtils.h index dd932301dd..ef673bf436 100644 --- a/Framework/include/QualityControl/runnerUtils.h +++ b/Framework/include/QualityControl/runnerUtils.h @@ -24,6 +24,7 @@ #include #include #include +#include namespace o2::quality_control::core { @@ -119,6 +120,19 @@ inline std::string computeProvenance(const std::string& fallbackProvenance = "") return provenance; } +inline std::string indentTree(int level) +{ + return std::string(level * 2, ' '); +} + +inline void printTree(const boost::property_tree::ptree& pt, int level = 0) +{ + std::stringstream ss; + boost::property_tree::json_parser::write_json(ss, pt); + for (std::string line; std::getline(ss, line, '\n');) + ILOG(Debug, Trace) << line << ENDM; +} + } // namespace o2::quality_control::core -#endif //QUALITYCONTROL_RUNNERUTILS_H \ No newline at end of file +#endif // QUALITYCONTROL_RUNNERUTILS_H diff --git a/Framework/src/AggregatorRunner.cxx b/Framework/src/AggregatorRunner.cxx index d9ab971196..eb40c938d5 100644 --- a/Framework/src/AggregatorRunner.cxx +++ b/Framework/src/AggregatorRunner.cxx @@ -25,12 +25,17 @@ #include #include + +#include + // QC #include "QualityControl/DatabaseFactory.h" #include "QualityControl/QcInfoLogger.h" #include "QualityControl/ServiceDiscovery.h" #include "QualityControl/Aggregator.h" #include "QualityControl/runnerUtils.h" +#include "QualityControl/InfrastructureSpecReader.h" +#include "QualityControl/AggregatorRunnerFactory.h" using namespace AliceO2::Common; using namespace AliceO2::InfoLogger; @@ -46,18 +51,64 @@ const auto current_diagnostic = boost::current_exception_diagnostic_information; namespace o2::quality_control::checker { -AggregatorRunner::AggregatorRunner(AggregatorRunnerConfig arc, const std::vector& acs) +AggregatorRunner::AggregatorRunner(AggregatorRunnerConfig arc, const std::vector& acs) //, const o2::quality_control::core::InfrastructureSpec& infrastructureSpec) : mDeviceName(createAggregatorRunnerName()), mRunnerConfig(std::move(arc)), - mAggregatorsConfigs(acs), + mAggregatorsConfig(std::move(acs)), mTotalNumberObjectsReceived(0), mTotalNumberAggregatorExecuted(0), mTotalNumberObjectsProduced(0) { - // Prepare the inputs, remove duplicates + prepareInputs(); +} + +AggregatorRunner::~AggregatorRunner() +{ + if (mServiceDiscovery != nullptr) { + mServiceDiscovery->deregister(); + } +} + +void AggregatorRunner::refreshConfig(InitContext& iCtx) +{ + try { + auto updatedTree = iCtx.options().get("qcConfiguration"); + + if (updatedTree.empty()) { + ILOG(Warning, Devel) << "Templated config tree is empty, we continue with the original one" << ENDM; + } else { + if (gSystem->Getenv("O2_QC_DEBUG_CONFIG_TREE")) { // until we are sure it works, keep a backdoor + ILOG(Debug, Devel) << "We print the tree we got from the ECS via DPL : " << ENDM; + printTree(updatedTree); + } + + // read the config, prepare spec + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(updatedTree); + + // replace the runner config + mRunnerConfig = AggregatorRunnerFactory::extractRunnerConfig(infrastructureSpec.common); + + // replace the aggregators configs + mAggregatorsConfig.clear(); + mAggregatorsConfig = AggregatorRunnerFactory::extractAggregatorsConfig(infrastructureSpec.common, infrastructureSpec.aggregators); + + // replace the inputs + mInputs.clear(); + prepareInputs(); + + ILOG(Debug, Devel) << "Configuration refreshed" << ENDM; + } + } catch (std::invalid_argument& error) { + // ignore the error, we just skip the update of the config file. It can be legit, e.g. in command line mode + ILOG(Warning, Devel) << "Could not get updated config tree in TaskRunner::init() - `qcConfiguration` could not be retrieved" << ENDM; + } +} + +void AggregatorRunner::prepareInputs() +{ std::set alreadySeen; int i = 0; - for (const auto& aggConfig : mAggregatorsConfigs) { + for (const auto& aggConfig : mAggregatorsConfig) { for (auto input : aggConfig.inputSpecs) { if (alreadySeen.count(input.binding) == 0) { alreadySeen.insert(input.binding); @@ -68,13 +119,6 @@ AggregatorRunner::AggregatorRunner(AggregatorRunnerConfig arc, const std::vector } } -AggregatorRunner::~AggregatorRunner() -{ - if (mServiceDiscovery != nullptr) { - mServiceDiscovery->deregister(); - } -} - header::DataDescription AggregatorRunner::createAggregatorRunnerDataDescription(const std::string& aggregatorName) { if (aggregatorName.empty()) { @@ -92,18 +136,12 @@ std::string AggregatorRunner::createAggregatorRunnerName() void AggregatorRunner::init(framework::InitContext& iCtx) { - InfoLoggerContext* ilContext = nullptr; - AliceO2::InfoLogger::InfoLogger* il = nullptr; - try { - ilContext = &iCtx.services().get(); - il = &iCtx.services().get(); - } catch (const RuntimeErrorRef& err) { - ILOG(Error) << "Could not find the DPL InfoLogger." << ENDM; - } + initInfoLogger(iCtx); + + refreshConfig(iCtx); + QcInfoLogger::setDetector(AggregatorRunner::getDetectorName(mAggregators)); try { - QcInfoLogger::init("aggregator", mRunnerConfig.infologgerFilterDiscardDebug, mRunnerConfig.infologgerDiscardLevel, il, ilContext); - QcInfoLogger::setDetector(AggregatorRunner::getDetectorName(mAggregators)); initDatabase(); initMonitoring(); initServiceDiscovery(); @@ -225,7 +263,7 @@ void AggregatorRunner::initAggregators() ILOG(Info, Devel) << "Initialization of the aggregators" << ENDM; // For every aggregator definition, create an Aggregator - for (const auto& aggregatorConfig : mAggregatorsConfigs) { + for (const auto& aggregatorConfig : mAggregatorsConfig) { ILOG(Info, Devel) << ">> Aggregator name : " << aggregatorConfig.name << ENDM; try { auto aggregator = make_shared(aggregatorConfig); @@ -247,6 +285,19 @@ void AggregatorRunner::initAggregators() reorderAggregators(); } +void AggregatorRunner::initInfoLogger(InitContext& iCtx) +{ + InfoLoggerContext* ilContext = nullptr; + AliceO2::InfoLogger::InfoLogger* il = nullptr; + try { + ilContext = &iCtx.services().get(); + il = &iCtx.services().get(); + } catch (const RuntimeErrorRef& err) { + ILOG(Error) << "Could not find the DPL InfoLogger." << ENDM; + } + QcInfoLogger::init("aggregator", mRunnerConfig.infologgerFilterDiscardDebug, mRunnerConfig.infologgerDiscardLevel, il, ilContext); +} + bool AggregatorRunner::areSourcesIn(const std::vector& sources, const std::vector>& aggregators) { @@ -336,7 +387,6 @@ void AggregatorRunner::start(const ServiceRegistry& services) << "\n - period: " << mActivity.mPeriodName << "\n - pass type: " << mActivity.mPassName << "\n - provenance: " << mActivity.mProvenance << ENDM; } - void AggregatorRunner::stop() { ILOG(Info, Ops) << "Stopping run " << mActivity.mId << ENDM; diff --git a/Framework/src/AggregatorRunnerFactory.cxx b/Framework/src/AggregatorRunnerFactory.cxx index 9311dfb42f..9e98a66c2d 100644 --- a/Framework/src/AggregatorRunnerFactory.cxx +++ b/Framework/src/AggregatorRunnerFactory.cxx @@ -19,11 +19,13 @@ #include #include -#include #include +#include #include "QualityControl/AggregatorRunner.h" +#include "QualityControl/Aggregator.h" #include "QualityControl/AggregatorRunnerFactory.h" +#include "QualityControl/QcInfoLogger.h" using namespace std; using namespace o2::framework; @@ -31,17 +33,21 @@ using namespace o2::framework; namespace o2::quality_control::checker { -DataProcessorSpec AggregatorRunnerFactory::create(AggregatorRunnerConfig arc, std::vector acs) +DataProcessorSpec AggregatorRunnerFactory::create(const core::CommonSpec& commonSpec, + const std::vector& aggregatorsSpec) { - AggregatorRunner aggregator{ std::move(arc), std::move(acs) }; + AggregatorRunnerConfig aggRunnerConfig = AggregatorRunnerFactory::extractRunnerConfig(commonSpec); + std::vector aggConfigs = AggregatorRunnerFactory::extractAggregatorsConfig(commonSpec, aggregatorsSpec); + AggregatorRunner aggregator{ aggRunnerConfig, aggConfigs }; DataProcessorSpec newAggregatorRunner{ aggregator.getDeviceName(), aggregator.getInputs(), Outputs{}, AlgorithmSpec{}, - Options{} + aggRunnerConfig.options }; + newAggregatorRunner.labels.emplace_back(o2::framework::ecs::qcReconfigurable); newAggregatorRunner.labels.emplace_back(AggregatorRunner::getLabel()); newAggregatorRunner.algorithm = adaptFromTask(std::move(aggregator)); return newAggregatorRunner; @@ -58,8 +64,13 @@ void AggregatorRunnerFactory::customizeInfrastructure(std::vector AggregatorRunnerFactory::extractAggregatorsConfig( + const core::CommonSpec& commonSpec, + const std::vector& aggregatorsSpec) +{ + std::vector aggConfigs; + for (const auto& aggregatorSpec : aggregatorsSpec) { + if (aggregatorSpec.active) { + ILOG(Debug, Devel) << ">> Aggregator name : " << aggregatorSpec.aggregatorName << ENDM; + aggConfigs.emplace_back(Aggregator::extractConfig(commonSpec, aggregatorSpec)); + } + } + return aggConfigs; +} + } // namespace o2::quality_control::checker diff --git a/Framework/src/CheckRunner.cxx b/Framework/src/CheckRunner.cxx index 64eadebcdf..5c02fa6cb9 100644 --- a/Framework/src/CheckRunner.cxx +++ b/Framework/src/CheckRunner.cxx @@ -29,6 +29,10 @@ #include "QualityControl/DatabaseFactory.h" #include "QualityControl/ServiceDiscovery.h" #include "QualityControl/runnerUtils.h" +#include "QualityControl/InfrastructureSpecReader.h" +#include "QualityControl/CheckRunnerFactory.h" + +#include using namespace std::chrono; using namespace AliceO2::Common; @@ -160,24 +164,40 @@ CheckRunner::~CheckRunner() } } -void CheckRunner::init(framework::InitContext& iCtx) +void CheckRunner::refreshConfig(InitContext& iCtx) { - InfoLoggerContext* ilContext = nullptr; - AliceO2::InfoLogger::InfoLogger* il = nullptr; try { - ilContext = &iCtx.services().get(); - il = &iCtx.services().get(); - } catch (const RuntimeErrorRef& err) { - ILOG(Error) << "Could not find the DPL InfoLogger." << ENDM; + // get the tree + auto updatedTree = iCtx.options().get("qcConfiguration"); + + if (updatedTree.empty()) { + ILOG(Warning, Devel) << "Templated config tree is empty, we continue with the original one" << ENDM; + } else { + if (gSystem->Getenv("O2_QC_DEBUG_CONFIG_TREE")) { // until we are sure it works, keep a backdoor + ILOG(Debug, Devel) << "We print the tree we got from the ECS via DPL : " << ENDM; + printTree(updatedTree); + } + + // prepare the information we need + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(updatedTree); + + // TODO: use the config to reconfigure the check runner. + // TODO: in particular, reset mChecks and update it. + // TODO: Problem is that a lot of the logic is in the infrastructure generator. + // TODO: we should probably just preserve the checks list and update their state. + // TODO: also see if we should update the detector name + } + } catch (std::invalid_argument& error) { + // ignore the error, we just skip the update of the config file. It can be legit, e.g. in command line mode + ILOG(Warning, Devel) << "Could not get updated config tree in TaskRunner::init() - `qcConfiguration` could not be retrieved" << ENDM; } +} +void CheckRunner::init(framework::InitContext& iCtx) +{ try { - QcInfoLogger::init(createCheckRunnerFacility(mDeviceName), - mConfig.infologgerFilterDiscardDebug, - mConfig.infologgerDiscardLevel, - il, - ilContext); - QcInfoLogger::setDetector(mDetectorName); + initInfologger(iCtx); + refreshConfig(iCtx); initDatabase(); initMonitoring(); initServiceDiscovery(); @@ -418,6 +438,23 @@ void CheckRunner::initServiceDiscovery() ILOG(Info, Support) << "ServiceDiscovery initialized" << ENDM; } +void CheckRunner::initInfologger(framework::InitContext& iCtx) +{ + InfoLoggerContext* ilContext = nullptr; + AliceO2::InfoLogger::InfoLogger* il = nullptr; + try { + ilContext = &iCtx.services().get(); + il = &iCtx.services().get(); + } catch (const RuntimeErrorRef& err) { + ILOG(Error) << "Could not find the DPL InfoLogger." << ENDM; + } + QcInfoLogger::init(createCheckRunnerFacility(mDeviceName), + mConfig.infologgerFilterDiscardDebug, + mConfig.infologgerDiscardLevel, + il, + ilContext); +} + void CheckRunner::start(const ServiceRegistry& services) { mActivity.mId = computeRunNumber(services, mConfig.fallbackRunNumber); diff --git a/Framework/src/CheckRunnerFactory.cxx b/Framework/src/CheckRunnerFactory.cxx index 630c1c4701..9cfbfaf76b 100644 --- a/Framework/src/CheckRunnerFactory.cxx +++ b/Framework/src/CheckRunnerFactory.cxx @@ -21,6 +21,7 @@ #include #include #include +#include #include "QualityControl/CheckRunner.h" #include "QualityControl/CheckRunnerFactory.h" @@ -39,8 +40,9 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig qcCheckRunner.getInputs(), Outputs{ qcCheckRunner.getOutputs() }, AlgorithmSpec{}, - Options{} }; - newCheckRunner.labels.emplace_back(CheckRunner::getLabel()); + checkRunnerConfig.options }; + newCheckRunner.labels.emplace_back(o2::framework::ecs::qcReconfigurable); + newCheckRunner.labels.emplace_back(CheckRunner::getCheckRunnerLabel()); newCheckRunner.algorithm = adaptFromTask(std::move(qcCheckRunner)); return newCheckRunner; } @@ -54,7 +56,7 @@ DataProcessorSpec CheckRunnerFactory::createSinkDevice(CheckRunnerConfig checkRu qcCheckRunner.getInputs(), Outputs{ qcCheckRunner.getOutputs() }, adaptFromTask(std::move(qcCheckRunner)), - Options{}, + checkRunnerConfig.options, {}, std::vector{} }; @@ -63,7 +65,7 @@ DataProcessorSpec CheckRunnerFactory::createSinkDevice(CheckRunnerConfig checkRu void CheckRunnerFactory::customizeInfrastructure(std::vector& policies) { - auto matcher = [label = CheckRunner::getLabel()](framework::DeviceSpec const& device) { + auto matcher = [label = CheckRunner::getCheckRunnerLabel()](framework::DeviceSpec const& device) { return std::find(device.labels.begin(), device.labels.end(), label) != device.labels.end(); }; @@ -74,6 +76,11 @@ void CheckRunnerFactory::customizeInfrastructure(std::vector checkRunnerOutputs; auto checkRunnerConfig = CheckRunnerFactory::extractConfig(infrastructureSpec.common); for (auto& [inputNames, checkConfigs] : checksMap) { - //Logging + // Logging ILOG(Info, Devel) << ">> Inputs (" << inputNames.size() << "): "; for (const auto& name : inputNames) ILOG(Info, Devel) << name << " "; @@ -631,15 +630,7 @@ void InfrastructureGenerator::generateAggregator(WorkflowSpec& workflow, const I return; } - std::vector aggregatorConfigs; - for (const auto& aggregatorSpec : infrastructureSpec.aggregators) { - if (aggregatorSpec.active) { - ILOG(Debug, Devel) << ">> Aggregator name : " << aggregatorSpec.aggregatorName << ENDM; - aggregatorConfigs.emplace_back(Aggregator::extractConfig(infrastructureSpec.common, aggregatorSpec)); - } - } - - DataProcessorSpec spec = AggregatorRunnerFactory::create(AggregatorRunnerFactory::extractConfig(infrastructureSpec.common), aggregatorConfigs); + DataProcessorSpec spec = AggregatorRunnerFactory::create(infrastructureSpec.common, infrastructureSpec.aggregators); workflow.emplace_back(spec); } diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 40a133472c..3fe1b13c8a 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -34,9 +34,13 @@ #include "QualityControl/QcInfoLogger.h" #include "QualityControl/TaskFactory.h" #include "QualityControl/runnerUtils.h" +#include "QualityControl/InfrastructureSpecReader.h" +#include "QualityControl/TaskRunnerFactory.h" #include #include +#include +#include using namespace std; @@ -59,7 +63,41 @@ TaskRunner::TaskRunner(const TaskRunnerConfig& config) { } -void TaskRunner::init(InitContext& iCtx) +void TaskRunner::refreshConfig(InitContext& iCtx) +{ + try { + // get the tree + auto updatedTree = iCtx.options().get("qcConfiguration"); + + if (updatedTree.empty()) { + ILOG(Warning, Devel) << "Templated config tree is empty, we continue with the original one" << ENDM; + } else { + if (gSystem->Getenv("O2_QC_DEBUG_CONFIG_TREE")) { // until we are sure it works, keep a backdoor + ILOG(Debug, Devel) << "We print the tree we got from the ECS via DPL : " << ENDM; + printTree(updatedTree); + } + + // prepare the information we need + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(updatedTree); + // find the correct taskSpec + auto taskSpecIter = find_if(infrastructureSpec.tasks.begin(), + infrastructureSpec.tasks.end(), + [this](const TaskSpec& ts) { return ts.taskName == mTaskConfig.taskName; }); + if (taskSpecIter != infrastructureSpec.tasks.end()) { + int resetAfterCycles = TaskRunnerFactory::computeResetAfterCycles(*taskSpecIter); + mTaskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, *taskSpecIter, mTaskConfig.parallelTaskID, resetAfterCycles); + ILOG(Debug, Devel) << "Configuration refreshed" << ENDM; + } else { + ILOG(Error, Support) << "Could not find the task " << mTaskConfig.taskName << " in the templated config provided by ECS, we continue with the original config" << ENDM; + } + } + } catch (std::invalid_argument& error) { + // ignore the error, we just skip the update of the config file. It can be legit, e.g. in command line mode + ILOG(Warning, Devel) << "Could not get updated config tree in TaskRunner::init() - `qcConfiguration` could not be retrieved" << ENDM; + } +} + +void TaskRunner::initInfologger(InitContext& iCtx) { AliceO2::InfoLogger::InfoLoggerContext* ilContext = nullptr; AliceO2::InfoLogger::InfoLogger* il = nullptr; @@ -74,21 +112,25 @@ void TaskRunner::init(InitContext& iCtx) mTaskConfig.infologgerDiscardLevel, il, ilContext); + QcInfoLogger::setDetector(mTaskConfig.detectorName); +} +void TaskRunner::init(InitContext& iCtx) +{ + initInfologger(iCtx); ILOG(Info, Support) << "Initializing TaskRunner" << ENDM; - try { - loadTaskConfig(); - } catch (...) { - // catch the configuration exception and print it to avoid losing it - ILOG(Fatal, Ops) << "Unexpected exception during configuration:\n" - << current_diagnostic(true) << ENDM; - throw; - } + + refreshConfig(iCtx); + printTaskConfig(); // registering state machine callbacks - iCtx.services().get().set(CallbackService::Id::Start, [this, &services = iCtx.services()]() { start(services); }); - iCtx.services().get().set(CallbackService::Id::Reset, [this]() { reset(); }); - iCtx.services().get().set(CallbackService::Id::Stop, [this]() { stop(); }); + try { + iCtx.services().get().set(CallbackService::Id::Start, [this, &services = iCtx.services()]() { start(services); }); + iCtx.services().get().set(CallbackService::Id::Reset, [this]() { reset(); }); + iCtx.services().get().set(CallbackService::Id::Stop, [this]() { stop(); }); + } catch (o2::framework::RuntimeErrorRef& ref) { + ILOG(Error) << "Error during initialization: " << o2::framework::error_from_ref(ref).what << ENDM; + } // setup monitoring mCollector = MonitoringFactory::Get(mTaskConfig.monitoringUrl); @@ -99,8 +141,8 @@ void TaskRunner::init(InitContext& iCtx) mObjectsManager = std::make_shared(mTaskConfig.taskName, mTaskConfig.className, mTaskConfig.detectorName, mTaskConfig.consulUrl, mTaskConfig.parallelTaskID); // setup user's task - TaskFactory f; - mTask.reset(f.create(mTaskConfig, mObjectsManager)); + TaskFactory factory; + mTask.reset(factory.create(mTaskConfig, mObjectsManager)); mTask->setMonitoring(mCollector); // init user's task @@ -308,12 +350,8 @@ std::tuple TaskRunner::validateInputs return { dataReady, timerReady }; } -void TaskRunner::loadTaskConfig() // todo consider renaming +void TaskRunner::printTaskConfig() { - ILOG(Info, Support) << "Loading configuration" << ENDM; - - QcInfoLogger::setDetector(mTaskConfig.detectorName); - ILOG(Info, Support) << "Configuration loaded : " << ENDM; ILOG(Info, Support) << ">> Task name : " << mTaskConfig.taskName << ENDM; ILOG(Info, Support) << ">> Module name : " << mTaskConfig.moduleName << ENDM; diff --git a/Framework/src/TaskRunnerFactory.cxx b/Framework/src/TaskRunnerFactory.cxx index a993d60e1c..4d67c92b8a 100644 --- a/Framework/src/TaskRunnerFactory.cxx +++ b/Framework/src/TaskRunnerFactory.cxx @@ -22,14 +22,19 @@ #include #include #include +#include +#include +#include +#include +#include +#include namespace o2::quality_control::core { using namespace o2::framework; -o2::framework::DataProcessorSpec - TaskRunnerFactory::create(const TaskRunnerConfig& taskConfig) +o2::framework::DataProcessorSpec TaskRunnerFactory::create(const TaskRunnerConfig& taskConfig) { TaskRunner qcTask{ taskConfig }; @@ -40,7 +45,8 @@ o2::framework::DataProcessorSpec adaptFromTask(std::move(qcTask)), taskConfig.options }; - newTask.labels.emplace_back(TaskRunner::getLabel()); + newTask.labels.emplace_back(o2::framework::ecs::qcReconfigurable); + newTask.labels.emplace_back(TaskRunner::getTaskRunnerLabel()); return newTask; } @@ -75,7 +81,8 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig Options options{ { "period-timer-cycle", framework::VariantType::Int, static_cast(taskSpec.cycleDurationSeconds * 1000000), { "timer period" } }, - { "runNumber", framework::VariantType::String, { "Run number" } } + { "runNumber", framework::VariantType::String, { "Run number" } }, + { "qcConfiguration", VariantType::Dict, emptyDict(), { "Some dictionary configuration" } } }; return { @@ -108,7 +115,7 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig void TaskRunnerFactory::customizeInfrastructure(std::vector& policies) { - auto matcher = [label = TaskRunner::getLabel()](framework::DeviceSpec const& device) { + auto matcher = [label = TaskRunner::getTaskRunnerLabel()](framework::DeviceSpec const& device) { return std::find(device.labels.begin(), device.labels.end(), label) != device.labels.end(); }; auto callback = TaskRunner::completionPolicyCallback; @@ -117,4 +124,9 @@ void TaskRunnerFactory::customizeInfrastructure(std::vector> getAggregatorCo aggregatorConfigs.emplace_back(Aggregator::extractConfig(infrastructureSpec.common, aggregatorSpec)); } } - auto aggregatorRunnerConfig = AggregatorRunnerFactory::extractConfig(infrastructureSpec.common); + auto aggregatorRunnerConfig = AggregatorRunnerFactory::extractRunnerConfig(infrastructureSpec.common); return { aggregatorRunnerConfig, aggregatorConfigs }; } @@ -68,7 +68,12 @@ BOOST_AUTO_TEST_CASE(test_aggregator_runner) auto [aggregatorRunnerConfig, aggregatorConfigs] = getAggregatorConfigs(configFilePath); AggregatorRunner aggregatorRunner{ aggregatorRunnerConfig, aggregatorConfigs }; - std::unique_ptr store; + Options options{ + { "runNumber", VariantType::String, { "Run number" } }, + { "qcConfiguration", VariantType::Dict, emptyDict(), { "Some dictionary configuration" } } + }; + std::vector> retr; + std::unique_ptr store = make_unique(move(options), move(retr)); ConfigParamRegistry cfReg(std::move(store)); ServiceRegistry sReg; InitContext initContext{ cfReg, sReg }; diff --git a/Framework/test/testTaskRunner.cxx b/Framework/test/testTaskRunner.cxx index 2d95385652..a8f244b4ca 100644 --- a/Framework/test/testTaskRunner.cxx +++ b/Framework/test/testTaskRunner.cxx @@ -22,6 +22,9 @@ #include "QualityControl/InfrastructureSpecReader.h" #include "Configuration/ConfigurationFactory.h" #include "Configuration/ConfigurationInterface.h" +#include +#include +#include #define BOOST_TEST_MODULE TaskRunner test #define BOOST_TEST_MAIN @@ -69,7 +72,7 @@ BOOST_AUTO_TEST_CASE(test_factory) BOOST_CHECK(taskRunner.algorithm.onInit != nullptr); - BOOST_REQUIRE_EQUAL(taskRunner.options.size(), 2); + BOOST_REQUIRE_EQUAL(taskRunner.options.size(), 3); BOOST_CHECK_EQUAL(taskRunner.options[0].name, "period-timer-cycle"); } @@ -96,11 +99,23 @@ BOOST_AUTO_TEST_CASE(test_task_runner) BOOST_CHECK_EQUAL(qcTask.getOutputSpec(), (OutputSpec{ { "mo" }, "QC", "abcTask", 0, Lifetime::Sporadic })); - BOOST_REQUIRE_EQUAL(qcTask.getOptions().size(), 2); + BOOST_REQUIRE_EQUAL(qcTask.getOptions().size(), 3); BOOST_CHECK_EQUAL(qcTask.getOptions()[0].name, "period-timer-cycle"); // This is maximum that we can do until we are able to test the DPL algorithms in isolation. // TODO: When it is possible, we should try calling run() and init() + + // Attempt for init: + Options options{ + { "runNumber", VariantType::String, { "Run number" } }, + { "qcConfiguration", VariantType::Dict, emptyDict(), { "Some dictionary configuration" } } + }; + std::vector> retr; + std::unique_ptr store = make_unique(move(options), move(retr)); + ConfigParamRegistry cfReg(std::move(store)); + ServiceRegistry sReg; + InitContext initContext{ cfReg, sReg }; + qcTask.init(initContext); } BOOST_AUTO_TEST_CASE(test_task_wrong_detector_name)