diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 659c3f7c5c..943865cb1f 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -75,7 +75,9 @@ add_library(O2QualityControl src/AdvancedWorkflow.cxx src/QualitiesToTRFCollectionConverter.cxx src/Calculators.cxx - src/DataSourceSpec.cxx) + src/DataSourceSpec.cxx + src/RootFileSink.cxx + src/RootFileSource.cxx) target_include_directories( O2QualityControl @@ -313,6 +315,14 @@ set_tests_properties(multinode_test set_property(TEST multinode_test PROPERTY LABELS slow) set_property(TEST multinode_test PROPERTY TIMEOUT 75) +# Batch processing test +string(RANDOM UNIQUE_ID) +configure_file(batch-test.json.in ${CMAKE_BINARY_DIR}/tests/batch-test.json) # substitute the unique id in the task name +add_test(NAME batch_test COMMAND o2-qc-batch-test.sh) +set_tests_properties(batch_test PROPERTIES ENVIRONMENT "JSON_DIR=${CMAKE_BINARY_DIR}/tests;UNIQUE_ID=${UNIQUE_ID}") +set_property(TEST batch_test PROPERTY LABELS slow) +set_property(TEST batch_test PROPERTY TIMEOUT 45) + # disable some tests in the CI to avoid un-expected failures. # to be removed when it is fixed / understood #set_property(TEST functional_test PROPERTY LABELS manual) @@ -395,6 +405,7 @@ install(FILES example-default.json install(PROGRAMS script/RepoCleaner/o2-qc-repo-cleaner script/o2-qc-functional-test.sh script/o2-qc-multinode-test.sh + script/o2-qc-batch-test.sh DESTINATION bin) install(FILES script/RepoCleaner/rules/1_per_hour.py diff --git a/Framework/batch-test.json.in b/Framework/batch-test.json.in new file mode 100644 index 0000000000..4fb11c6ff8 --- /dev/null +++ b/Framework/batch-test.json.in @@ -0,0 +1,64 @@ +{ + "qc": { + "config": { + "database": { + "implementation": "CCDB", + "host": "ccdb-test.cern.ch:8080", + "username": "not_applicable", + "password": "not_applicable", + "name": "not_applicable" + }, + "Activity": { + "number": "42", + "type": "2" + }, + "monitoring": { + "url": "infologger:///debug?qc" + }, + "consul": { + "url": "" + }, + "conditionDB": { + "url": "ccdb-test.cern.ch:8080" + } + }, + "tasks": { + "BatchTestTask@UNIQUE_ID@": { + "active": "true", + "className": "o2::quality_control_modules::skeleton::SkeletonTask", + "moduleName": "QcSkeleton", + "detectorName": "TST", + "cycleDurationSeconds": "5", + "maxNumberCycles": "-1", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "tst-raw" + } + } + }, + "checks": { + "BatchTestCheck@UNIQUE_ID@": { + "active": "true", + "className": "o2::quality_control_modules::skeleton::SkeletonCheck", + "moduleName": "QcSkeleton", + "policy": "OnAny", + "detectorName": "TST", + "dataSource": [{ + "type": "Task", + "name": "BatchTestTask@UNIQUE_ID@", + "MOs": ["example"] + }] + } + } + }, + "dataSamplingPolicies": [ + { + "id": "tst-raw", + "active": "true", + "machines": [], + "query": "random:TST/RAWDATA/0", + "samplingConditions": [], + "blocking": "false" + } + ] +} \ No newline at end of file diff --git a/Framework/include/QualityControl/InfrastructureGenerator.h b/Framework/include/QualityControl/InfrastructureGenerator.h index 9250cd1e65..3493c9f2b4 100644 --- a/Framework/include/QualityControl/InfrastructureGenerator.h +++ b/Framework/include/QualityControl/InfrastructureGenerator.h @@ -52,7 +52,7 @@ class InfrastructureGenerator /// Generates a full QC infrastructure from a configuration file. This function is aimed to use for standalone setups /// and local development. It will create both local and remote QC tasks, and CheckRunners running associated Checks. /// - /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \param configurationSource - full path to configuration file, preceded with the backend (e.g. "json://") /// \return generated standalone QC workflow static framework::WorkflowSpec generateStandaloneInfrastructure(std::string configurationSource); @@ -62,7 +62,7 @@ class InfrastructureGenerator /// and local development. It will create both local and remote QC tasks, and CheckRunners running associated Checks. /// /// \param workflow - existing workflow where QC infrastructure should be placed - /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \param configurationSource - full path to configuration file, preceded with the backend (e.g. "json://") static void generateStandaloneInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource); /// \brief Generates the local part of the QC infrastructure for a specified host. @@ -70,7 +70,7 @@ class InfrastructureGenerator /// Generates the local part of the QC infrastructure for a specified host - taskRunners which are declared in the /// configuration to be 'local'. /// - /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \param configurationSource - full path to configuration file, preceded with the backend (e.g. "json://") /// \param targetHost - name of the machine /// \return generated local QC workflow static framework::WorkflowSpec generateLocalInfrastructure(std::string configurationSource, std::string targetHost); @@ -81,7 +81,7 @@ class InfrastructureGenerator /// configuration to be 'local'. /// /// \param workflow - existing workflow where QC infrastructure should be placed - /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \param configurationSource - full path to configuration file, preceded with the backend (e.g. "json://") /// \param host - name of the machine /// \return generated local QC workflow static void generateLocalInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource, std::string host); @@ -91,7 +91,7 @@ class InfrastructureGenerator /// Generates the remote part of the QC infrastructure - mergers and checkers for 'local' tasks and full QC chain for /// 'remote' tasks. /// - /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \param configurationSource - full path to configuration file, preceded with the backend (e.g. "json://") /// \return generated remote QC workflow static o2::framework::WorkflowSpec generateRemoteInfrastructure(std::string configurationSource); @@ -101,10 +101,45 @@ class InfrastructureGenerator /// 'remote' tasks. /// /// \param workflow - existing workflow where QC infrastructure should be placed - /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") - /// \return generated remote QC workflow + /// \param configurationSource - full path to configuration file, preceded with the backend (e.g. "json://") static void generateRemoteInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource); + /// \brief Generates the local batch part of the QC infrastructure. + /// + /// Generates the local batch part of the QC infrastructure - tasks and a file sink/merger. + /// + /// \param workflow - existing workflow where QC infrastructure should be placed + /// \param configurationSource - full path to configuration file, preceded with the backend (e.g. "json://") + /// \param sinkFilePath - path to the output file + static void generateLocalBatchInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource, std::string sinkFilePath); + + /// \brief Generates the local batch part of the QC infrastructure. + /// + /// Generates the local batch part of the QC infrastructure - tasks and a file sink/merger. + /// + /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \param sinkFilePath - path to the output file + /// \return generated local QC workflow + static framework::WorkflowSpec generateLocalBatchInfrastructure(std::string configurationSource, std::string sinkFilePath); + + /// \brief Generates the remote batch part of the QC infrastructure. + /// + /// Generates the remote batch part of the QC infrastructure - file reader, check runners, aggregator runners. + /// + /// \param workflow - existing workflow where QC infrastructure should be placed + /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \param sourceFilePath - path to the input file + static void generateRemoteBatchInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource, std::string sourceFilePath); + + /// \brief Generates the remote batch part of the QC infrastructure. + /// + /// Generates the remote batch part of the QC infrastructure - file reader, check runners, aggregator runners. + /// + /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") + /// \param sourceFilePath - path to the input file + /// \return generated remote batch QC workflow + static framework::WorkflowSpec generateRemoteBatchInfrastructure(std::string configurationSource, std::string sourceFilePath); + /// \brief Provides necessary customization of the QC infrastructure. /// /// Provides necessary customization of the Completion Policies of the QC infrastructure. This is necessary to make @@ -187,6 +222,26 @@ inline framework::WorkflowSpec generateRemoteInfrastructure(std::string configur return core::InfrastructureGenerator::generateRemoteInfrastructure(configurationSource); } +inline framework::WorkflowSpec generateLocalBatchInfrastructure(std::string configurationSource, std::string sinkFilePath) +{ + return core::InfrastructureGenerator::generateLocalBatchInfrastructure(configurationSource, std::move(sinkFilePath)); +} + +inline void generateLocalBatchInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource, std::string sinkFilePath) +{ + core::InfrastructureGenerator::generateLocalBatchInfrastructure(workflow, configurationSource, std::move(sinkFilePath)); +} + +inline framework::WorkflowSpec generateRemoteBatchInfrastructure(std::string configurationSource, std::string sourceFilePath) +{ + return core::InfrastructureGenerator::generateRemoteBatchInfrastructure(configurationSource, std::move(sourceFilePath)); +} + +inline void generateRemoteBatchInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource, std::string sourceFilePath) +{ + core::InfrastructureGenerator::generateRemoteBatchInfrastructure(workflow, configurationSource, std::move(sourceFilePath)); +} + inline void generateRemoteInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource) { core::InfrastructureGenerator::generateRemoteInfrastructure(workflow, configurationSource); diff --git a/Framework/include/QualityControl/RootFileSink.h b/Framework/include/QualityControl/RootFileSink.h new file mode 100644 index 0000000000..e83e32a983 --- /dev/null +++ b/Framework/include/QualityControl/RootFileSink.h @@ -0,0 +1,56 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file RootFileSink.h +/// \author Piotr Konopka +/// + +#ifndef QUALITYCONTROL_ROOTFILESINK_H +#define QUALITYCONTROL_ROOTFILESINK_H + +#include +#include +#include + +class TFile; + +namespace o2::quality_control::core +{ + +/// \brief A Data Processor which stores MonitorObjectCollections in a specified file +class RootFileSink : public framework::Task +{ + public: + explicit RootFileSink(std::string filePath); + ~RootFileSink() override; + + void init(framework::InitContext& ictx) override; + void run(framework::ProcessingContext& pctx) override; + + static framework::DataProcessorLabel getLabel() + { + return { "QC-ROOT-FILE-SINK" }; + } + + static void customizeInfrastructure(std::vector& policies); + + private: + void reset(); + + private: + std::string mFilePath; + TFile* mFile = nullptr; +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_ROOTFILESINK_H \ No newline at end of file diff --git a/Framework/include/QualityControl/RootFileSource.h b/Framework/include/QualityControl/RootFileSource.h new file mode 100644 index 0000000000..2fb4c30483 --- /dev/null +++ b/Framework/include/QualityControl/RootFileSource.h @@ -0,0 +1,42 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file RootFileSource.h +/// \author Piotr Konopka +/// + +#ifndef QUALITYCONTROL_ROOTFILESOURCE_H +#define QUALITYCONTROL_ROOTFILESOURCE_H + +#include +#include + +namespace o2::quality_control::core +{ + +/// \brief A Data Processor which reads MonitorObjectCollections from a specified file +class RootFileSource : public framework::Task +{ + public: + RootFileSource(std::string filePath); + ~RootFileSource() override = default; + + void init(framework::InitContext& ictx) override; + void run(framework::ProcessingContext& pctx) override; + + private: + std::string mFilePath; +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_ROOTFILESOURCE_H diff --git a/Framework/script/o2-qc-batch-test.sh b/Framework/script/o2-qc-batch-test.sh new file mode 100755 index 0000000000..b1dc84e9e7 --- /dev/null +++ b/Framework/script/o2-qc-batch-test.sh @@ -0,0 +1,95 @@ +#!/usr/bin/env bash +set -e +set -x +set -u +set -m +# Arguments or expected variables +# UNIQUE_ID must be set. +# JSON_DIR must be set and point to the directory containing batch-test.json. + +# this is to make sure that we do not leave child processes behind +# https://unix.stackexchange.com/questions/240723/exit-trap-in-dash-vs-ksh-and-bash/240736#240736 +cleanup() { + # kill all processes whose parent is this process + pkill -P $$ +} +for sig in INT QUIT HUP TERM; do + trap " + cleanup + trap - $sig EXIT + kill -s $sig "'"$$"' "$sig" +done +#trap cleanup EXIT + +function delete_data() { + curl -i -L ccdb-test.cern.ch:8080/truncate/qc/TST/MO/BatchTestTask${UNIQUE_ID}* + curl -i -L ccdb-test.cern.ch:8080/truncate/qc/TST/QO/BatchTestCheck${UNIQUE_ID}* + + rm -f /tmp/batch_test_merged${UNIQUE_ID}.root + rm -f /tmp/batch_test_obj${UNIQUE_ID}.root + rm -f /tmp/batch_test_check${UNIQUE_ID}.root +} + +if [ -z "$UNIQUE_ID" ] +then + echo "UNIQUE_ID must be set when calling o2-qc-batch-test.sh" + exit 1 +fi +if [ -z "$JSON_DIR" ] +then + echo "JSON_DIR must be set when calling o2-qc-batch-test.sh" + exit 1 +fi + +# make sure the CCDB is available otherwise we bail (no failure) +# we do not use ping because it will fail from outside CERN. +if curl --silent --connect-timeout 1 ccdb-test.cern.ch:8080 > /dev/null 2>&1 ; then + echo "CCDB is reachable." +else + echo "CCDB not reachable, batch test is cancelled." + exit 0 +fi + +delete_data + +# Run the Tasks 3 times, merge results into the file. +o2-qc-run-producer --message-amount 100 --message-rate 100 | o2-qc --config json:/${JSON_DIR}/batch-test.json --local-batch /tmp/batch_test_merged${UNIQUE_ID}.root --run +o2-qc-run-producer --message-amount 100 --message-rate 100 | o2-qc --config json:/${JSON_DIR}/batch-test.json --local-batch /tmp/batch_test_merged${UNIQUE_ID}.root --run +# Run Checks and Aggregators, publish results to QCDB +o2-qc --config json:/${JSON_DIR}/batch-test.json --remote-batch /tmp/batch_test_merged${UNIQUE_ID}.root --run + +# check MonitorObject +# first the return code must be 200 +code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/MO/BatchTestTask${UNIQUE_ID}/example/`date +%s`999 --write-out %{http_code} --silent --output /tmp/batch_test_obj${UNIQUE_ID}.root) +if (( $code != 200 )); then + echo "Error, monitor object of the QC Task could not be found." + delete_data + exit 2 +fi +# try to check that we got a valid root object +root -b -l -q -e 'TFile f("/tmp/batch_test_obj${UNIQUE_ID}.root"); f.Print();' +if (( $? != 0 )); then + echo "Error, monitor object of the QC Task is invalid." + delete_data + exit 2 +fi +# try if it is a non empty histogram +entries=`root -b -l -q -e 'TFile f("/tmp/batch_test_obj${UNIQUE_ID}.root"); TH1F *h = (TH1F*)f.Get("ccdb_object"); cout << h->GetEntries() << endl;' | tail -n 1` +if [ $entries -ne 200 ] 2>/dev/null +then + echo "The histogram of the QC Task does not have the expected 200 samples." + delete_data + exit 2 +fi + +# check QualityObject +# first the return code must be 200 +code=$(curl -L ccdb-test.cern.ch:8080/qc/TST/QO/BatchTestCheck${UNIQUE_ID}/`date +%s`999 --write-out %{http_code} --silent --output /tmp/batch_test_check${UNIQUE_ID}.root) +if (( $code != 200 )); then + echo "Error, quality object of the QC Task could not be found." + delete_data + exit 2 +fi + +echo "Batch test was passed." +delete_data \ No newline at end of file diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index 46cbb4473f..cb7bf961dd 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -28,6 +28,8 @@ #include "QualityControl/TaskSpec.h" #include "QualityControl/InfrastructureSpecReader.h" #include "QualityControl/InfrastructureSpec.h" +#include "QualityControl/RootFileSink.h" +#include "QualityControl/RootFileSource.h" #include #include @@ -255,12 +257,88 @@ void InfrastructureGenerator::generateRemoteInfrastructure(framework::WorkflowSp workflow.insert(std::end(workflow), std::begin(qcInfrastructure), std::end(qcInfrastructure)); } +framework::WorkflowSpec InfrastructureGenerator::generateLocalBatchInfrastructure(std::string configurationSource, std::string sinkFilePath) +{ + printVersion(); + + auto config = ConfigurationFactory::getConfiguration(configurationSource); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); + std::vector fileSinkInputs; + + WorkflowSpec workflow; + + for (const auto& taskSpec : infrastructureSpec.tasks) { + if (taskSpec.active) { + + // We will merge deltas, thus we need to reset after each cycle (resetAfterCycles==1) + auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, 1); + workflow.emplace_back(TaskRunnerFactory::create(taskConfig)); + + fileSinkInputs.emplace_back(taskSpec.taskName, TaskRunner::createTaskDataOrigin(), TaskRunner::createTaskDataDescription(taskSpec.taskName)); + } + } + + if (fileSinkInputs.size() > 0) { + // todo: could be moved to a factory. + workflow.push_back({ "qc-root-file-sink", + std::move(fileSinkInputs), + Outputs{}, + adaptFromTask(sinkFilePath), + Options{}, + CommonServices::defaultServices(), + { RootFileSink::getLabel() } }); + } + + return workflow; +} + +void InfrastructureGenerator::generateLocalBatchInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource, std::string sinkFilePath) +{ + auto qcInfrastructure = InfrastructureGenerator::generateLocalBatchInfrastructure(std::move(configurationSource), std::move(sinkFilePath)); + workflow.insert(std::end(workflow), std::begin(qcInfrastructure), std::end(qcInfrastructure)); +} + +framework::WorkflowSpec InfrastructureGenerator::generateRemoteBatchInfrastructure(std::string configurationSource, std::string sourceFilePath) +{ + printVersion(); + + auto config = ConfigurationFactory::getConfiguration(configurationSource); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); + + WorkflowSpec workflow; + + std::vector fileSourceOutputs; + for (const auto& taskSpec : infrastructureSpec.tasks) { + if (taskSpec.active) { + auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, 1); + fileSourceOutputs.push_back(taskConfig.moSpec); + fileSourceOutputs.back().binding.value = taskSpec.taskName; + } + } + if (fileSourceOutputs.size() > 0) { + workflow.push_back({ "qc-root-file-source", {}, std::move(fileSourceOutputs), adaptFromTask(sourceFilePath) }); + } + + auto checkRunnerOutputs = generateCheckRunners(workflow, configurationSource); + generateAggregator(workflow, configurationSource, checkRunnerOutputs); + generatePostProcessing(workflow, configurationSource); + + return workflow; +} + +void InfrastructureGenerator::generateRemoteBatchInfrastructure(framework::WorkflowSpec& workflow, std::string configurationSource, std::string sourceFilePath) +{ + auto qcInfrastructure = InfrastructureGenerator::generateRemoteBatchInfrastructure(configurationSource, sourceFilePath); + workflow.insert(std::end(workflow), std::begin(qcInfrastructure), std::end(qcInfrastructure)); +} + void InfrastructureGenerator::customizeInfrastructure(std::vector& policies) { TaskRunnerFactory::customizeInfrastructure(policies); MergerBuilder::customizeInfrastructure(policies); CheckRunnerFactory::customizeInfrastructure(policies); AggregatorRunnerFactory::customizeInfrastructure(policies); + RootFileSink::customizeInfrastructure(policies); } void InfrastructureGenerator::printVersion() diff --git a/Framework/src/ObjectsManager.cxx b/Framework/src/ObjectsManager.cxx index 0a085d68fa..2397abe2da 100644 --- a/Framework/src/ObjectsManager.cxx +++ b/Framework/src/ObjectsManager.cxx @@ -37,6 +37,7 @@ ObjectsManager::ObjectsManager(std::string taskName, std::string detectorName, s { mMonitorObjects = std::make_unique(); mMonitorObjects->SetOwner(true); + mMonitorObjects->SetName(taskName.c_str()); // register with the discovery service if (!noDiscovery && !consulUrl.empty()) { diff --git a/Framework/src/RootFileSink.cxx b/Framework/src/RootFileSink.cxx new file mode 100644 index 0000000000..0a0e8a2f26 --- /dev/null +++ b/Framework/src/RootFileSink.cxx @@ -0,0 +1,129 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file RootFileSink.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/RootFileSink.h" +#include "QualityControl/QcInfoLogger.h" +#include "QualityControl/MonitorObjectCollection.h" +#include +#include +#include +#include +#include + +using namespace o2::framework; + +namespace o2::quality_control::core +{ + +RootFileSink::RootFileSink(std::string filePath) + : mFilePath(std::move(filePath)) +{ +} + +RootFileSink::~RootFileSink() +{ + if (mFile != nullptr) { + if (mFile->IsOpen()) { + ILOG(Info) << "Closing file '" << mFilePath << "'." << ENDM; + mFile->Close(); + } + delete mFile; + } +} + +void RootFileSink::customizeInfrastructure(std::vector& policies) +{ + auto matcher = [label = RootFileSink::getLabel()](framework::DeviceSpec const& device) { + return std::find(device.labels.begin(), device.labels.end(), label) != device.labels.end(); + }; + auto callback = CompletionPolicyHelpers::consumeWhenAny().callback; + + policies.emplace_back("qcRootFileSinkCompletionPolicy", matcher, callback); +} + +void RootFileSink::init(framework::InitContext& ictx) +{ + ictx.services().get().set(CallbackService::Id::Reset, [this]() { reset(); }); + + if (mFile != nullptr) { + if (mFile->IsOpen()) { + ILOG(Info) << "Closing file '" << mFilePath << "'." << ENDM; + mFile->Close(); + } + delete mFile; + } + mFile = new TFile(mFilePath.c_str(), "UPDATE"); + if (mFile->IsZombie()) { + throw std::runtime_error("File '" + mFilePath + "' is zombie."); + } + if (!mFile->IsOpen()) { + throw std::runtime_error("Failed to open the file: " + mFilePath); + } + if (!mFile->IsWritable()) { + throw std::runtime_error("File '" + mFilePath + "' is not writable."); + } + ILOG(Info) << "Output file '" << mFilePath << "' successfully open." << ENDM; +} + +void RootFileSink::reset() +{ + if (mFile != nullptr) { + if (mFile->IsOpen()) { + ILOG(Info) << "Closing file '" << mFilePath << "'." << ENDM; + mFile->Close(); + } + delete mFile; + mFile = nullptr; + } +} + +void RootFileSink::run(framework::ProcessingContext& pctx) +{ + for (const auto& input : InputRecordWalker(pctx.inputs())) { + auto moc = DataRefUtils::as(input).release(); + if (moc == nullptr) { + ILOG(Error) << "Could not cast the input to MonitorObjectCollection, skipping." << ENDM; + continue; + } + moc->SetOwner(); + + auto mocName = moc->GetName(); + if (*mocName == '\0') { + ILOG(Error) << "MonitorObjectCollection does not have a name, skipping." << ENDM; + continue; + } + + auto storedTObj = mFile->Get(mocName); + if (storedTObj != nullptr) { + auto storedMOC = dynamic_cast(storedTObj); + if (storedMOC == nullptr) { + ILOG(Error) << "Could not cast the stored object to MonitorObjectCollection, skipping." << ENDM; + delete storedTObj; + continue; + } + storedMOC->SetOwner(); + ILOG(Info) << "Merging object '" << moc->GetName() << "' with the existing one in the file." << ENDM; + moc->merge(storedMOC); + } + delete storedTObj; + + ILOG(Info) << "Object '" << moc->GetName() << "' has been stored in the file." << ENDM; + mFile->WriteObject(moc, moc->GetName(), "Overwrite"); + delete moc; + } +} + +} // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/RootFileSource.cxx b/Framework/src/RootFileSource.cxx new file mode 100644 index 0000000000..a4d6dbd7d0 --- /dev/null +++ b/Framework/src/RootFileSource.cxx @@ -0,0 +1,76 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file RootFileSource.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/RootFileSource.h" +#include "QualityControl/QcInfoLogger.h" +#include "QualityControl/MonitorObjectCollection.h" + +#include +#include +#include + +using namespace o2::framework; + +namespace o2::quality_control::core +{ +RootFileSource::RootFileSource(std::string filePath) + : mFilePath(std::move(filePath)) +{ +} + +void RootFileSource::init(framework::InitContext&) +{ +} + +void RootFileSource::run(framework::ProcessingContext& ctx) +{ + auto* file = new TFile(mFilePath.c_str(), "READ"); + if (file->IsZombie()) { + throw std::runtime_error("File '" + mFilePath + "' is zombie."); + } + if (!file->IsOpen()) { + throw std::runtime_error("Failed to open the file: " + mFilePath); + } + ILOG(Info) << "Input file '" << mFilePath << "' successfully open." << ENDM; + + TIter next(file->GetListOfKeys()); + TKey* key; + while ((key = (TKey*)next())) { + auto storedTObj = file->Get(key->GetName()); + if (storedTObj != nullptr) { + auto storedMOC = dynamic_cast(storedTObj); + if (storedMOC == nullptr) { + ILOG(Error) << "Could not cast the stored object to MonitorObjectCollection, skipping." << ENDM; + delete storedTObj; + continue; + } + + // snapshot does a shallow copy, so we cannot let it delete elements in MOC when it deletes the MOC + storedMOC->SetOwner(false); + ctx.outputs().snapshot(OutputRef{ storedMOC->GetName(), 0 }, *storedMOC); + storedMOC->SetOwner(true); + ILOG(Info) << "Read and published object '" << storedMOC->GetName() << "'" << ENDM; + } + delete storedTObj; + } + file->Close(); + delete file; + + ctx.services().get().endOfStream(); + ctx.services().get().readyToQuit(QuitRequest::Me); +} + +} // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/runQC.cxx b/Framework/src/runQC.cxx index 8546a320a6..9d07e6a0d5 100644 --- a/Framework/src/runQC.cxx +++ b/Framework/src/runQC.cxx @@ -45,16 +45,24 @@ void customize(std::vector& workflowOptions) ConfigParamSpec{ "config", VariantType::String, "", { "Absolute path to QC and Data Sampling configuration file." } }); workflowOptions.push_back( - ConfigParamSpec{ "local", VariantType::Bool, false, { "Creates only the local part of the QC topology." } }); + ConfigParamSpec{ "local", VariantType::Bool, false, { "Runs only the local part of the QC workflow." } }); workflowOptions.push_back( - ConfigParamSpec{ "host", VariantType::String, "", { "Name of the host of the local part of the QC topology." - "Necessary to specify when creating topologies on multiple" + ConfigParamSpec{ "host", VariantType::String, "", { "Name of the host of the local part of the QC workflow. " + "Necessary to specify when creating workflows on multiple" " machines. If not specified, hostname of the current machine" " will be used" } }); workflowOptions.push_back( - ConfigParamSpec{ "remote", VariantType::Bool, false, { "Creates only the remote part of the QC topology." } }); + ConfigParamSpec{ "remote", VariantType::Bool, false, { "Runs only the remote part of the QC workflow." } }); workflowOptions.push_back( ConfigParamSpec{ "no-data-sampling", VariantType::Bool, false, { "Do not add Data Sampling infrastructure." } }); + + workflowOptions.push_back( + ConfigParamSpec{ "local-batch", VariantType::String, "", { "Runs the local part of the QC workflow and dumps results to a file. " + "Takes the file path as argument. If it exists, the results are merged. " + "Do not run many QC workflows on the same file at the same time." } }); + workflowOptions.push_back( + ConfigParamSpec{ "remote-batch", VariantType::String, "", { "Runs the remote part of the QC workflow reading the inputs from a file (files). " + "Takes the file path as argument." } }); } void customize(std::vector& policies) @@ -72,60 +80,134 @@ void customize(std::vector& policies) using namespace std::chrono; -WorkflowSpec defineDataProcessing(const ConfigContext& config) +bool validateArguments(const ConfigContext& config) { - WorkflowSpec specs; - const std::string qcConfigurationSource = config.options().get("config"); if (qcConfigurationSource.empty()) { ILOG(Warning, Support) << "No configuration path specified, returning an empty workflow." << ENDM; + return false; + } + + size_t exclusiveOptions = 0; + exclusiveOptions += config.options().get("local"); + exclusiveOptions += config.options().get("remote"); + exclusiveOptions += !config.options().get("local-batch").empty(); + exclusiveOptions += !config.options().get("remote-batch").empty(); + if (exclusiveOptions > 1) { + ILOG(Error, Support) << "More than one of the following options was specified: --local, --remote, --local-batch, --remote--batch. This is not allowed, returning an empty workflow." << ENDM; + return false; + } + + return true; +} + +enum class WorkflowType { + Standalone, + Local, + Remote, + LocalBatch, + RemoteBatch +}; + +WorkflowType getWorkflowType(const ConfigContext& config) +{ + if (config.options().get("local")) { + return WorkflowType::Local; + } else if (config.options().get("remote")) { + return WorkflowType::Remote; + } else if (!config.options().get("local-batch").empty()) { + return WorkflowType::LocalBatch; + } else if (!config.options().get("remote-batch").empty()) { + return WorkflowType::RemoteBatch; + } else { + return WorkflowType::Standalone; + } +} + +WorkflowSpec defineDataProcessing(const ConfigContext& config) +{ + WorkflowSpec specs; + + if (!validateArguments(config)) { return {}; } + + const std::string qcConfigurationSource = config.options().get("config"); ILOG(Info, Support) << "Using config file '" << qcConfigurationSource << "'" << ENDM; try { - - // The QC infrastructure is divided into two parts: + // The online QC infrastructure is divided into two parts: // - local - QC tasks which are on the same machines as the main processing. We also put Data Sampling there. // - remote - QC tasks, mergers and checkers that reside on QC servers // // The user can specify to create either one of these parts by selecting corresponding option, // or both of them, which is the default option (no flags needed). - - if (!config.options().get("local") && !config.options().get("remote")) { - ILOG(Info, Support) << "Creating a standalone QC topology." << ENDM; - - if (!config.options().get("no-data-sampling")) { - ILOG(Info, Support) << "Generating Data Sampling" << ENDM; - DataSampling::GenerateInfrastructure(specs, qcConfigurationSource); - } else { - ILOG(Info, Support) << "Omitting Data Sampling" << ENDM; + // + // For file-based processing, there are also: + // - local-batch - QC tasks are run, the results are stored in the specified file. If the file exists, + // QC objects are merged. Multiple local-batch workflows should not run at the same time, + // as they would modify the same file. + // - remote-batch - Checks and Aggregators are run on the QC objects inside a file created by a local-batch workflow. + // The results are stored in the database specified in the config file. + + auto workflowType = getWorkflowType(config); + switch (workflowType) { + case WorkflowType::Standalone: { + ILOG(Info, Support) << "Creating a standalone QC workflow." << ENDM; + + if (!config.options().get("no-data-sampling")) { + ILOG(Info, Support) << "Generating Data Sampling" << ENDM; + DataSampling::GenerateInfrastructure(specs, qcConfigurationSource); + } else { + ILOG(Info, Support) << "Omitting Data Sampling" << ENDM; + } + quality_control::generateStandaloneInfrastructure(specs, qcConfigurationSource); + break; } - quality_control::generateStandaloneInfrastructure(specs, qcConfigurationSource); - } - - if (config.options().get("local")) { - ILOG(Info, Support) << "Creating a local QC topology." << ENDM; - - if (!config.options().get("no-data-sampling")) { - ILOG(Info, Support) << "Generating Data Sampling" << ENDM; - DataSampling::GenerateInfrastructure(specs, qcConfigurationSource); - } else { - ILOG(Info, Support) << "Omitting Data Sampling" << ENDM; + case WorkflowType::Local: { + ILOG(Info, Support) << "Creating a local QC topology." << ENDM; + + if (!config.options().get("no-data-sampling")) { + ILOG(Info, Support) << "Generating Data Sampling" << ENDM; + DataSampling::GenerateInfrastructure(specs, qcConfigurationSource); + } else { + ILOG(Info, Support) << "Omitting Data Sampling" << ENDM; + } + + // Generation of the local QC topology (local QC tasks and their output proxies) + auto host = config.options().get("host").empty() + ? boost::asio::ip::host_name() + : config.options().get("host"); + quality_control::generateLocalInfrastructure(specs, qcConfigurationSource, host); + break; } + case WorkflowType::Remote: { + ILOG(Info, Support) << "Creating a remote QC workflow." << ENDM; - // Generation of the local QC topology (local QC tasks and their output proxies) - auto host = config.options().get("host").empty() - ? boost::asio::ip::host_name() - : config.options().get("host"); - quality_control::generateLocalInfrastructure(specs, qcConfigurationSource, host); - } - - if (config.options().get("remote")) { - ILOG(Info, Support) << "Creating a remote QC topology." << ENDM; - - // Generation of the remote QC topology (task for QC servers, input proxies, mergers and all check runners) - quality_control::generateRemoteInfrastructure(specs, qcConfigurationSource); + // Generation of the remote QC topology (task for QC servers, input proxies, mergers and all check runners) + quality_control::generateRemoteInfrastructure(specs, qcConfigurationSource); + break; + } + case WorkflowType::LocalBatch: { + ILOG(Info, Support) << "Creating a local batch QC workflow." << ENDM; + if (!config.options().get("no-data-sampling")) { + ILOG(Info, Support) << "Generating Data Sampling" << ENDM; + DataSampling::GenerateInfrastructure(specs, qcConfigurationSource); + } else { + ILOG(Info, Support) << "Omitting Data Sampling" << ENDM; + } + + auto localBatchFilePath = config.options().get("local-batch"); + // Generation of the local batch QC workflow (QC tasks and file sink) + quality_control::generateLocalBatchInfrastructure(specs, qcConfigurationSource, localBatchFilePath); + break; + } + case WorkflowType::RemoteBatch: { + auto remoteBatchFilePath = config.options().get("remote-batch"); + // Creating the remote batch QC topology (file reader, check runners, aggregator runners, postprocessing) + quality_control::generateRemoteBatchInfrastructure(specs, qcConfigurationSource, remoteBatchFilePath); + break; + } } } catch (const std::runtime_error& re) { ILOG(Fatal, Ops) << "Failed to build the workflow: " << re.what() << ENDM; diff --git a/Framework/test/testInfrastructureGenerator.cxx b/Framework/test/testInfrastructureGenerator.cxx index 38e8921f72..71fe70fe34 100644 --- a/Framework/test/testInfrastructureGenerator.cxx +++ b/Framework/test/testInfrastructureGenerator.cxx @@ -227,4 +227,99 @@ BOOST_AUTO_TEST_CASE(qc_factory_empty_config) BOOST_REQUIRE_NO_THROW(InfrastructureGenerator::generateRemoteInfrastructure(workflow, configFilePath)); BOOST_CHECK_EQUAL(workflow.size(), 0); } + { + WorkflowSpec workflow; + BOOST_REQUIRE_NO_THROW(InfrastructureGenerator::generateLocalBatchInfrastructure(workflow, configFilePath, "file.root")); + BOOST_CHECK_EQUAL(workflow.size(), 0); + } + { + WorkflowSpec workflow; + BOOST_REQUIRE_NO_THROW(InfrastructureGenerator::generateRemoteBatchInfrastructure(workflow, configFilePath, "file.root")); + BOOST_CHECK_EQUAL(workflow.size(), 0); + } +} + +BOOST_AUTO_TEST_CASE(qc_infrastructure_local_batch_test) +{ + std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; + std::cout << configFilePath << std::endl; + { + auto workflow = InfrastructureGenerator::generateLocalBatchInfrastructure(configFilePath, "file.root"); + + BOOST_REQUIRE_EQUAL(workflow.size(), 4); + + auto taskRunnerSkeleton = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "QC-TASK-RUNNER-skeletonTask" && + d.inputs.size() == 2 && + d.outputs.size() == 1; + }); + BOOST_CHECK(taskRunnerSkeleton != workflow.end()); + + auto taskRunnerAbcTask = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "QC-TASK-RUNNER-abcTask" && + d.inputs.size() == 2 && + d.outputs.size() == 1; + }); + BOOST_CHECK(taskRunnerAbcTask != workflow.end()); + + auto taskRunnerXyzTask = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "QC-TASK-RUNNER-xyzTask" && + d.inputs.size() == 2 && + d.outputs.size() == 1; + }); + BOOST_CHECK(taskRunnerXyzTask != workflow.end()); + + BOOST_CHECK_EQUAL(workflow[3].name, "qc-root-file-sink"); + BOOST_CHECK_EQUAL(workflow[3].inputs.size(), 3); + BOOST_CHECK_EQUAL(workflow[3].outputs.size(), 0); + } +} + +BOOST_AUTO_TEST_CASE(qc_infrastructure_remote_batch_test) +{ + std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; + auto workflow = InfrastructureGenerator::generateRemoteBatchInfrastructure(configFilePath, "file.root"); + + BOOST_REQUIRE_EQUAL(workflow.size(), 7); + + auto fileReader = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "qc-root-file-source" && + d.inputs.size() == 0 && + d.outputs.size() == 3; + }); + BOOST_CHECK(fileReader != workflow.end()); + + auto checkRunnerCount = std::count_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name.find("QC-CHECK-RUNNER") != std::string::npos && + d.inputs.size() == 1; + }); + BOOST_REQUIRE_EQUAL(checkRunnerCount, 4); + + auto postprocessingTask = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "PP-TASK-RUNNER-SkeletonPostProcessing" && + d.inputs.size() == 1 && + d.outputs.size() == 1; + }); + BOOST_CHECK(postprocessingTask != workflow.end()); + + auto aggregator = std::find_if( + workflow.begin(), workflow.end(), + [](const DataProcessorSpec& d) { + return d.name == "QC-AGGREGATOR-RUNNER" && + d.inputs.size() == 7 && + d.outputs.size() == 0; + }); + BOOST_CHECK(aggregator != workflow.end()); } \ No newline at end of file diff --git a/README.md b/README.md index 6611f1f28d..93efcecd12 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ For a general overview of our (O2) software, organization and processes, please * [Plugging the QC to an existing DPL workflow](doc/Advanced.md#plugging-the-qc-to-an-existing-dpl-workflow) * [Production of QC objects outside this framework](doc/Advanced.md#production-of-qc-objects-outside-this-framework) * [Multi-node setups](doc/Advanced.md#multi-node-setups) + * [Batch processing](doc/Advanced.md#batch-processing) * [Moving window](doc/Advanced.md#moving-window) * [Writing a DPL data producer](doc/Advanced.md#writing-a-dpl-data-producer) * [QC with DPL Analysis](doc/Advanced.md#qc-with-dpl-analysis) diff --git a/doc/Advanced.md b/doc/Advanced.md index c688df6d65..c93bd06734 100644 --- a/doc/Advanced.md +++ b/doc/Advanced.md @@ -13,6 +13,7 @@ Advanced topics * [Example 2: advanced](#example-2-advanced) * [Limitations](#limitations) * [Multi-node setups](#multi-node-setups) + * [Batch processing](#batch-processing) * [Moving window](#moving-window) * [Writing a DPL data producer](#writing-a-dpl-data-producer) * [QC with DPL Analysis](#qc-with-dpl-analysis) @@ -268,7 +269,34 @@ If there are no problems, on QCG you should see the `example` histogram updated and `qc/TST/MO/MultiNodeRemote`, and corresponding Checks under the path `qc/TST/QO/`. When using AliECS, one has to generate workflow templates and upload them to the corresponding repository. Please -contact the QC or AliECS developers to receive assistance or instruction on how to do that. +contact the QC or AliECS developers to receive assistance or instructions on how to do that. + +## Batch processing + +In certain cases merging results of parallel QC Tasks cannot be performed in form of message passing. +An example of this are the simulation workflows, which exchange data between processing stages via files + and produce (and process) consecutive TimeFrames in different directories in parallel. +Then, one can run QC Tasks on incomplete data and save the results to a file. +If the file already exists, the new objects will be merged with those obtained so far. +At the end, one can run the rest of processing chain (Checks, Aggregators) on the complete objects. + +Here is a simple example: +```bash +# Remove any existing results +rm results.root +# Run the Tasks 3 times, merge results into the file. +o2-qc-run-producer --message-amount 100 | o2-qc --config json:/${QUALITYCONTROL_ROOT}/etc/basic.json --local-batch results.root +o2-qc-run-producer --message-amount 100 | o2-qc --config json:/${QUALITYCONTROL_ROOT}/etc/basic.json --local-batch results.root +o2-qc-run-producer --message-amount 100 | o2-qc --config json:/${QUALITYCONTROL_ROOT}/etc/basic.json --local-batch results.root +# Run Checks and Aggregators, publish results to QCDB +o2-qc --config json:/${QUALITYCONTROL_ROOT}/etc/basic.json --remote-batch results.root +``` +Please note, that the local batch QC workflow should not work on the same file at the same time. +A semaphore mechanism is required if there is a risk they might be executed in parallel. + +To be done: +- merging multiple files into one, to allow for cases, when local batch workflows cannot access the same file. +- support for Post-Processing. ## Moving window