diff --git a/Framework/include/QualityControl/CheckConfig.h b/Framework/include/QualityControl/CheckConfig.h index 07292e5ad5..ee37e061b5 100644 --- a/Framework/include/QualityControl/CheckConfig.h +++ b/Framework/include/QualityControl/CheckConfig.h @@ -38,6 +38,7 @@ struct CheckConfig { UpdatePolicyType policyType = UpdatePolicyType::OnAny; std::vector objectNames{}; // fixme: if object names are empty, allObjects are true, consider reducing to one var bool allObjects = false; + bool allowBeautify = false; framework::Inputs inputSpecs{}; framework::OutputSpec qoSpec{ "XXX", "INVALID" }; std::string conditionUrl{}; diff --git a/Framework/include/QualityControl/CheckRunner.h b/Framework/include/QualityControl/CheckRunner.h index 3d14a8fc25..90a5f13f30 100644 --- a/Framework/include/QualityControl/CheckRunner.h +++ b/Framework/include/QualityControl/CheckRunner.h @@ -34,6 +34,7 @@ #include "QualityControl/CheckRunnerConfig.h" #include "QualityControl/Check.h" #include "QualityControl/MonitorObject.h" +#include "QualityControl/QualityObject.h" #include "QualityControl/UpdatePolicyManager.h" namespace o2::quality_control::core @@ -78,6 +79,7 @@ namespace o2::quality_control::checker class CheckRunner : public framework::Task { public: + /// Constructor /** * \brief CheckRunner constructor * @@ -86,9 +88,20 @@ class CheckRunner : public framework::Task * Group check assumes that the input of the checks is the same! * * @param checkRunnerConfig configuration of CheckRunner + * @param checkConfigs configuration of all Checks that should run in this data processor + */ + CheckRunner(CheckRunnerConfig, const std::vector& checkConfigs); + + /** + * \brief CheckRunner constructor + * + * Create a sink for the Input. It is expected to receive Monitor Object to store. + * It will not run any checks on a given input. + * + * @param checkRunnerConfig configuration of CheckRunner * @param input Monitor Object input spec. */ - CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs, o2::framework::Inputs inputs); + CheckRunner(CheckRunnerConfig, o2::framework::InputSpec input); /// Destructor ~CheckRunner() override; @@ -105,16 +118,19 @@ class CheckRunner : public framework::Task framework::Inputs getInputs() { return mInputs; }; framework::Outputs getOutputs() { return mOutputs; }; + void setTaskStoreSet(std::unordered_set storeSet) { mInputStoreSet = storeSet; } std::string getDeviceName() { return mDeviceName; }; static framework::DataProcessorLabel getCheckRunnerLabel() { return { "qc-check" }; } static std::string createCheckRunnerIdString() { return "qc-check"; }; - static std::string createCheckRunnerName(); + static std::string createCheckRunnerName(const std::vector& checks); + static std::string createSinkCheckRunnerName(o2::framework::InputSpec input); + static std::string createCheckRunnerFacility(std::string deviceName); /// \brief Compute the detector name to be used for this checkrunner. /// Compute the detector name to be used for this checkrunner. /// If all checks belong to the same detector we use it, otherwise we use "MANY" - static std::string getDetectorName(const std::vector& checks); + static std::string getDetectorName(const std::vector checks); private: /** @@ -205,7 +221,8 @@ class CheckRunner : public framework::Task std::shared_ptr mActivity; // shareable with the Checks CheckRunnerConfig mConfig; std::shared_ptr mDatabase; - std::vector> mToBeStored; + std::unordered_set mInputStoreSet; + std::vector> mMonitorObjectStoreVector; UpdatePolicyManager updatePolicyManager; bool mReceivedEOS = false; diff --git a/Framework/include/QualityControl/CheckRunnerFactory.h b/Framework/include/QualityControl/CheckRunnerFactory.h index c35a4b8557..ff3fba1859 100644 --- a/Framework/include/QualityControl/CheckRunnerFactory.h +++ b/Framework/include/QualityControl/CheckRunnerFactory.h @@ -43,7 +43,17 @@ class CheckRunnerFactory CheckRunnerFactory() = default; virtual ~CheckRunnerFactory() = default; - static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs); + static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs, std::vector storeVector = {}); + + /* + * \brief Create a CheckRunner sink DPL device. + * + * The purpose of this device is to receive and store the MO from task. + * + * @param input InputSpec with the content to store + * @param configurationSource + */ + static framework::DataProcessorSpec createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input); static CheckRunnerConfig extractConfig(const core::CommonSpec&); diff --git a/Framework/src/Check.cxx b/Framework/src/Check.cxx index 875be9ffd0..0686c65d43 100644 --- a/Framework/src/Check.cxx +++ b/Framework/src/Check.cxx @@ -172,6 +172,10 @@ QualityObjectsType Check::check(std::map>& moMap, const Quality& quality) { + if (!mCheckConfig.allowBeautify) { + return; + } + for (auto const& item : moMap) { try { mCheckInterface->beautify(item.second /*mo*/, quality); @@ -232,6 +236,12 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec& } } + bool allowBeautify = checkSpec.dataSources.size() <= 1; + if (!allowBeautify) { + // See QC-299 for details + ILOG(Warning, Devel) << "Beautification disabled because more than one source is used in this Check (" << checkSpec.checkName << ")" << ENDM; + } + return { checkSpec.checkName, checkSpec.moduleName, @@ -241,6 +251,7 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec& updatePolicy, std::move(objectNames), checkAllObjects, + allowBeautify, std::move(inputs), createOutputSpec(checkSpec.checkName), commonSpec.conditionDBUrl diff --git a/Framework/src/CheckRunner.cxx b/Framework/src/CheckRunner.cxx index a58b13cf80..70eba1120b 100644 --- a/Framework/src/CheckRunner.cxx +++ b/Framework/src/CheckRunner.cxx @@ -69,9 +69,56 @@ std::size_t CheckRunner::hash(const std::string& inputString) return checksum; } -std::string CheckRunner::createCheckRunnerName() +std::string CheckRunner::createCheckRunnerName(const std::vector& checks) { - return CheckRunner::createCheckRunnerIdString(); + static const std::string alphanumeric = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + const int NAME_LEN = 4; + std::string name(CheckRunner::createCheckRunnerIdString() + "-" + getDetectorName(checks) + "-"); + + if (checks.size() == 1) { + // If single check, use the check name + name += checks[0].name; + } else { + std::string hash_string; + std::vector names; + // Fill vector with check names + for (const auto& c : checks) { + names.push_back(c.name); + } + // Be sure that after configuration shuffle, the name will be the same + std::sort(names.begin(), names.end()); + + // Create a single string and hash it + for (auto& n : names) { + hash_string += n; + } + std::size_t num = hash(hash_string); + + // Change numerical to alphanumeric hash representation + for (int i = 0; i < NAME_LEN; ++i) { + name += alphanumeric[num % alphanumeric.size()]; + num = num / alphanumeric.size(); + } + } + return name; +} + +std::string CheckRunner::createCheckRunnerFacility(std::string deviceName) +{ + // it starts with "check/" and is followed by the unique part of the device name truncated to a maximum of 32 characters. + string facilityName = "check/" + deviceName.substr(CheckRunner::createCheckRunnerIdString().length() + 1, string::npos); + facilityName = facilityName.substr(0, 32); + return facilityName; +} + +std::string CheckRunner::createSinkCheckRunnerName(InputSpec input) +{ + std::string name(CheckRunner::createCheckRunnerIdString() + "-sink-"); + name += DataSpecUtils::label(input); + return name; } o2::framework::Outputs CheckRunner::collectOutputs(const std::vector& checkConfigs) @@ -83,12 +130,13 @@ o2::framework::Outputs CheckRunner::collectOutputs(const std::vector& checkConfigs, o2::framework::Inputs inputs) +CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs) : mDetectorName(getDetectorName(checkConfigs)), - mDeviceName(createCheckRunnerName()), + mDeviceName(createCheckRunnerName(checkConfigs)), mConfig(std::move(checkRunnerConfig)), - mInputs{ inputs }, - mOutputs{ CheckRunner::collectOutputs(checkConfigs) }, + /* All checks have the same Input */ + mInputs(checkConfigs.front().inputSpecs), + mOutputs(CheckRunner::collectOutputs(checkConfigs)), mTotalNumberObjectsReceived(0), mTotalNumberCheckExecuted(0), mTotalNumberQOStored(0), @@ -100,6 +148,19 @@ CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector< } } +CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, InputSpec input) + : mDeviceName(createSinkCheckRunnerName(input)), + mConfig(std::move(checkRunnerConfig)), + mInputs{ input }, + mOutputs{}, + mTotalNumberObjectsReceived(0), + mTotalNumberCheckExecuted(0), + mTotalNumberQOStored(0), + mTotalNumberMOStored(0), + mTotalQOSent(0) +{ +} + CheckRunner::~CheckRunner() { ILOG(Debug, Trace) << "CheckRunner destructor (" << this << ")" << ENDM; @@ -151,7 +212,7 @@ void CheckRunner::refreshConfig(InitContext& iCtx) void CheckRunner::init(framework::InitContext& iCtx) { try { - core::initInfologger(iCtx, mConfig.infologgerDiscardParameters, mDeviceName); + core::initInfologger(iCtx, mConfig.infologgerDiscardParameters, createCheckRunnerFacility(mDeviceName)); refreshConfig(iCtx); Bookkeeping::getInstance().init(mConfig.bookkeepingUrl); initDatabase(); @@ -197,7 +258,7 @@ void CheckRunner::run(framework::ProcessingContext& ctx) auto now = getCurrentTimestamp(); store(qualityObjects, now); - store(mToBeStored, now); + store(mMonitorObjectStoreVector, now); send(qualityObjects, ctx.outputs()); @@ -209,7 +270,7 @@ void CheckRunner::run(framework::ProcessingContext& ctx) void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord) { - mToBeStored.clear(); + mMonitorObjectStoreVector.clear(); for (const auto& input : mInputs) { auto dataRef = inputRecord.get(input.binding.c_str()); @@ -239,6 +300,7 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord) // for each item of the array, check whether it is a MonitorObject. If not, create one and encapsulate. // Then, store the MonitorObject in the various maps and vectors we will use later. + bool store = mInputStoreSet.count(DataSpecUtils::label(input)) > 0; // Check if this CheckRunner stores this input for (const auto tObject : *array) { std::shared_ptr mo{ dynamic_cast(tObject) }; @@ -256,8 +318,9 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord) updatePolicyManager.updateObjectRevision(mo->getFullName()); mTotalNumberObjectsReceived++; - // Monitor Object will be stored later, after possible beautification - mToBeStored.push_back(mo); + if (store) { // Monitor Object will be stored later, after possible beautification + mMonitorObjectStoreVector.push_back(mo); + } } } } @@ -288,7 +351,7 @@ void CheckRunner::sendPeriodicMonitoring() QualityObjectsType CheckRunner::check() { - ILOG(Debug, Devel) << "check(): Trying " << mChecks.size() << " checks for " << mMonitorObjects.size() << " monitor objects" + ILOG(Debug, Devel) << "Trying " << mChecks.size() << " checks for " << mMonitorObjects.size() << " monitor objects" << ENDM; QualityObjectsType allQOs; @@ -491,7 +554,7 @@ void CheckRunner::reset() mTotalQOSent = 0; } -std::string CheckRunner::getDetectorName(const std::vector& checks) +std::string CheckRunner::getDetectorName(const std::vector checks) { std::string detectorName; for (auto& check : checks) { diff --git a/Framework/src/CheckRunnerFactory.cxx b/Framework/src/CheckRunnerFactory.cxx index be85fed96c..5b0a1e1597 100644 --- a/Framework/src/CheckRunnerFactory.cxx +++ b/Framework/src/CheckRunnerFactory.cxx @@ -26,34 +26,17 @@ #include "QualityControl/CheckRunner.h" #include "QualityControl/CheckRunnerFactory.h" #include "QualityControl/CommonSpec.h" -#include namespace o2::quality_control::checker { using namespace o2::framework; -DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs) +DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector& checkConfigs, std::vector storeVector) { auto options = checkRunnerConfig.options; - - // concatenate all inputs - o2::framework::Inputs allInputs; - for (auto config : checkConfigs) { - allInputs.insert(allInputs.end(), config.inputSpecs.begin(), config.inputSpecs.end()); - } - - // We can end up with duplicated inputs that will later lead to circular dependencies on the checkRunner device. - o2::framework::Inputs allInputsNoDups; - std::set alreadySeen; - for (auto input : allInputs) { - if (alreadySeen.count(input.binding) == 0) { - allInputsNoDups.push_back(input); - } - alreadySeen.insert(input.binding); - } - - CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs, allInputsNoDups }; + CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs }; + qcCheckRunner.setTaskStoreSet({ storeVector.begin(), storeVector.end() }); DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(), qcCheckRunner.getInputs(), @@ -66,6 +49,22 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig return newCheckRunner; } +DataProcessorSpec CheckRunnerFactory::createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input) +{ + CheckRunner qcCheckRunner{ checkRunnerConfig, input }; + qcCheckRunner.setTaskStoreSet({ DataSpecUtils::label(input) }); + + DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(), + qcCheckRunner.getInputs(), + Outputs{ qcCheckRunner.getOutputs() }, + adaptFromTask(std::move(qcCheckRunner)), + checkRunnerConfig.options, + {}, + { o2::framework::ecs::qcReconfigurable } }; + + return newCheckRunner; +} + void CheckRunnerFactory::customizeInfrastructure(std::vector& policies) { auto matcher = [label = CheckRunner::getCheckRunnerLabel()](framework::DeviceSpec const& device) { diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index 5d4a946ed5..116be4ea70 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -626,24 +626,116 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec) { - if (infrastructureSpec.checks.empty()) { - ILOG(Debug, Devel) << "No \"checks\" structure found in the config file. If no check is expected, then it is completely fine." << ENDM; - return; + // todo have a look if this complex procedure can be simplified. + // todo also make well defined and scoped functions to make it more readable and clearer. + typedef std::vector InputNames; + typedef std::vector CheckConfigs; + std::map tasksOutputMap; // all active tasks' output, as inputs, keyed by their label + std::map checksMap; // all the Checks defined in the config mapped keyed by their sorted inputNames + std::map storeVectorMap; + + // todo: avoid code repetition + for (const auto& taskSpec : infrastructureSpec.tasks) { + if (taskSpec.active) { + InputSpec taskOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; + tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput }); + bool movingWindowsEnabled = !taskSpec.movingWindows.empty(); + bool synchronousRemote = taskSpec.location == TaskLocationSpec::Local && (infrastructureSpec.workflowType == WorkflowType::Remote || infrastructureSpec.workflowType == WorkflowType::FullChain); + bool asynchronousRemote = infrastructureSpec.workflowType == WorkflowType::RemoteBatch; + if (movingWindowsEnabled && (synchronousRemote || asynchronousRemote)) { + InputSpec taskMovingWindowOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName, true), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic }; + tasksOutputMap.insert({ DataSpecUtils::label(taskMovingWindowOutput), taskMovingWindowOutput }); + } + } } - typedef std::vector CheckConfigs; + for (const auto& ppTaskSpec : infrastructureSpec.postProcessingTasks) { + if (ppTaskSpec.active) { + InputSpec ppTaskOutput{ ppTaskSpec.taskName, + PostProcessingDevice::createPostProcessingDataOrigin(ppTaskSpec.detectorName), + PostProcessingDevice::createPostProcessingDataDescription(ppTaskSpec.taskName), + Lifetime::Sporadic }; + tasksOutputMap.insert({ DataSpecUtils::label(ppTaskOutput), ppTaskOutput }); + } + } + + for (const auto& externalTaskSpec : infrastructureSpec.externalTasks) { + if (externalTaskSpec.active) { + auto query = externalTaskSpec.query; + Inputs inputs = DataDescriptorQueryBuilder::parse(query.c_str()); + for (const auto& taskOutput : inputs) { + tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput }); + } + } + } - CheckConfigs checkConfigs; // all the checkConfigs + // Instantiate Checks based on the configuration and build a map of checks (keyed by their inputs names) for (const auto& checkSpec : infrastructureSpec.checks) { + ILOG(Debug, Devel) << ">> Check name : " << checkSpec.checkName << ENDM; if (checkSpec.active) { auto checkConfig = Check::extractConfig(infrastructureSpec.common, checkSpec); - checkConfigs.push_back(checkConfig); + InputNames inputNames; + + for (const auto& inputSpec : checkConfig.inputSpecs) { + inputNames.push_back(DataSpecUtils::label(inputSpec)); + } + // Create a grouping key - sorted vector of stringified InputSpecs //todo: consider std::set, which is sorted + std::sort(inputNames.begin(), inputNames.end()); + // Group checks + checksMap[inputNames].push_back(checkConfig); } } + // For every Task output, find a Check to store the MOs in the database. + // If none is found we create a sink device. + for (auto& [label, inputSpec] : tasksOutputMap) { // for each task output + (void)inputSpec; + bool isStored = false; + // Look for this task as input in the Checks' inputs, if we found it then we are done + for (auto& [inputNames, checks] : checksMap) { // for each set of inputs + (void)checks; + + if (std::find(inputNames.begin(), inputNames.end(), label) != inputNames.end() && inputNames.size() == 1) { + storeVectorMap[inputNames].push_back(label); + break; + } + } + if (!isStored) { // fixme: statement is always true + // If there is no Check for a given input, create a candidate for a sink device + InputNames singleEntry{ label }; + // Init empty Check vector to appear in the next step + checksMap[singleEntry]; + storeVectorMap[singleEntry].push_back(label); + } + } + + // Create CheckRunners: 1 per set of inputs + std::vector checkRunnerOutputs; auto checkRunnerConfig = CheckRunnerFactory::extractConfig(infrastructureSpec.common); - const DataProcessorSpec spec = CheckRunnerFactory::create(checkRunnerConfig, checkConfigs); - workflow.emplace_back(spec); + for (auto& [inputNames, checkConfigs] : checksMap) { + // Logging + ILOG(Debug, Devel) << ">> Inputs (" << inputNames.size() << "): "; + for (const auto& name : inputNames) + ILOG(Debug, Devel) << name << " "; + ILOG(Debug, Devel) << " ; Checks (" << checkConfigs.size() << "): "; + for (const auto& checkConfig : checkConfigs) + ILOG(Debug, Devel) << checkConfig.name << " "; + ILOG(Debug, Devel) << " ; Stores (" << storeVectorMap[inputNames].size() << "): "; + for (const auto& input : storeVectorMap[inputNames]) + ILOG(Debug, Devel) << input << " "; + ILOG(Debug, Devel) << ENDM; + + DataProcessorSpec spec = checkConfigs.empty() + ? CheckRunnerFactory::createSinkDevice(checkRunnerConfig, tasksOutputMap.find(inputNames[0])->second) + : CheckRunnerFactory::create(checkRunnerConfig, checkConfigs, storeVectorMap[inputNames]); + workflow.emplace_back(spec); + checkRunnerOutputs.insert(checkRunnerOutputs.end(), spec.outputs.begin(), spec.outputs.end()); + } + + ILOG(Debug, Devel) << ">> Outputs (" << checkRunnerOutputs.size() << "): "; + for (const auto& output : checkRunnerOutputs) + ILOG(Debug, Devel) << DataSpecUtils::describe(output) << " "; + ILOG(Debug, Devel) << ENDM; } void InfrastructureGenerator::throwIfAggNamesClashCheckNames(const InfrastructureSpec& infrastructureSpec) diff --git a/Framework/test/testCheckRunner.cxx b/Framework/test/testCheckRunner.cxx index 9157a0c6ed..b7b3857fa7 100644 --- a/Framework/test/testCheckRunner.cxx +++ b/Framework/test/testCheckRunner.cxx @@ -24,6 +24,14 @@ using namespace std; using namespace o2::framework; using namespace o2::header; +TEST_CASE("test_check_runner_static") +{ + // facility name + CHECK(CheckRunner::createCheckRunnerFacility(CheckRunner::createCheckRunnerIdString() + "-test") == "check/test"); + CHECK(CheckRunner::createCheckRunnerFacility(CheckRunner::createCheckRunnerIdString() + "-abcdefghijklmnopqrstuvwxyz") == "check/abcdefghijklmnopqrstuvwxyz"); + CHECK(CheckRunner::createCheckRunnerFacility(CheckRunner::createCheckRunnerIdString() + "-abcdefghijklmnopqrstuvwxyz123456789") == "check/abcdefghijklmnopqrstuvwxyz"); +} + TEST_CASE("test_checkRunner_getDetector") { CheckConfig config; diff --git a/Framework/test/testInfrastructureGenerator.cxx b/Framework/test/testInfrastructureGenerator.cxx index 7fcb1454bd..6625a7ff08 100644 --- a/Framework/test/testInfrastructureGenerator.cxx +++ b/Framework/test/testInfrastructureGenerator.cxx @@ -110,7 +110,7 @@ TEST_CASE("qc_factory_remote_test") // the infrastructure should consist of two proxies, mergers and checkers for 'skeletonTask' and 'recoTask' // (their taskRunner are declared to be local) and also taskRunner and checker for the 'abcTask' and 'xyzTask'. // Post processing adds one process for the task and one for checks. - REQUIRE(workflow.size() == 10); + REQUIRE(workflow.size() == 15); auto tcpclustProxy = std::find_if( workflow.begin(), workflow.end(), @@ -187,9 +187,9 @@ TEST_CASE("qc_factory_remote_test") workflow.begin(), workflow.end(), [](const DataProcessorSpec& d) { return d.name.find("qc-check") != std::string::npos && - d.inputs.size() == 4; + d.inputs.size() == 1; }); - REQUIRE(checkRunnerCount == 1); + REQUIRE(checkRunnerCount == 6); auto postprocessingTask = std::find_if( workflow.begin(), workflow.end(), @@ -217,8 +217,8 @@ TEST_CASE("qc_factory_standalone_test") auto configTree = configInterface->getRecursive(); auto workflow = InfrastructureGenerator::generateStandaloneInfrastructure(configTree); - // the infrastructure should consist of 4 TaskRunners, 1 PostProcessingRunner, 1 CheckRunner, 1 AggregatorRunner - REQUIRE(workflow.size() == 7); + // the infrastructure should consist of 4 TaskRunners, 1 PostProcessingRunner, 5 CheckRunners (including one for PP), 1 AggregatorRunner + REQUIRE(workflow.size() == 11); auto taskRunnerSkeleton = std::find_if( workflow.begin(), workflow.end(), @@ -260,9 +260,9 @@ TEST_CASE("qc_factory_standalone_test") workflow.begin(), workflow.end(), [](const DataProcessorSpec& d) { return d.name.find("qc-check") != std::string::npos && - d.inputs.size() == 4; + d.inputs.size() == 1; }); - REQUIRE(checkRunnerCount == 1); + REQUIRE(checkRunnerCount == 5); auto postprocessingTask = std::find_if( workflow.begin(), workflow.end(), @@ -367,6 +367,7 @@ TEST_CASE("qc_infrastructure_local_batch_test") CHECK(workflow[4].outputs.size() == 0); } } + TEST_CASE("qc_infrastructure_remote_batch_test") { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; @@ -374,7 +375,7 @@ TEST_CASE("qc_infrastructure_remote_batch_test") auto configTree = configInterface->getRecursive(); auto workflow = InfrastructureGenerator::generateRemoteBatchInfrastructure(configTree, "file.root"); - REQUIRE(workflow.size() == 4); + REQUIRE(workflow.size() == 9); auto fileReader = std::find_if( workflow.begin(), workflow.end(), @@ -389,9 +390,9 @@ TEST_CASE("qc_infrastructure_remote_batch_test") workflow.begin(), workflow.end(), [](const DataProcessorSpec& d) { return d.name.find("qc-check") != std::string::npos && - d.inputs.size() == 4; + d.inputs.size() == 1; }); - REQUIRE(checkRunnerCount == 1); + REQUIRE(checkRunnerCount == 6); auto postprocessingTask = std::find_if( workflow.begin(), workflow.end(),