Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Framework/include/QualityControl/CheckConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct CheckConfig {
UpdatePolicyType policyType = UpdatePolicyType::OnAny;
std::vector<std::string> objectNames{}; // fixme: if object names are empty, allObjects are true, consider reducing to one var
bool allObjects = false;
bool allowBeautify = false;
framework::Inputs inputSpecs{};
framework::OutputSpec qoSpec{ "XXX", "INVALID" };
std::string conditionUrl{};
Expand Down
25 changes: 21 additions & 4 deletions Framework/include/QualityControl/CheckRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "QualityControl/CheckRunnerConfig.h"
#include "QualityControl/Check.h"
#include "QualityControl/MonitorObject.h"
#include "QualityControl/QualityObject.h"
#include "QualityControl/UpdatePolicyManager.h"

namespace o2::quality_control::core
Expand Down Expand Up @@ -78,6 +79,7 @@ namespace o2::quality_control::checker
class CheckRunner : public framework::Task
{
public:
/// Constructor
/**
* \brief CheckRunner constructor
*
Expand All @@ -86,9 +88,20 @@ class CheckRunner : public framework::Task
* Group check assumes that the input of the checks is the same!
*
* @param checkRunnerConfig configuration of CheckRunner
* @param checkConfigs configuration of all Checks that should run in this data processor
*/
CheckRunner(CheckRunnerConfig, const std::vector<CheckConfig>& checkConfigs);

/**
* \brief CheckRunner constructor
*
* Create a sink for the Input. It is expected to receive Monitor Object to store.
* It will not run any checks on a given input.
*
* @param checkRunnerConfig configuration of CheckRunner
* @param input Monitor Object input spec.
*/
CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, o2::framework::Inputs inputs);
CheckRunner(CheckRunnerConfig, o2::framework::InputSpec input);

/// Destructor
~CheckRunner() override;
Expand All @@ -105,16 +118,19 @@ class CheckRunner : public framework::Task
framework::Inputs getInputs() { return mInputs; };
framework::Outputs getOutputs() { return mOutputs; };

void setTaskStoreSet(std::unordered_set<std::string> storeSet) { mInputStoreSet = storeSet; }
std::string getDeviceName() { return mDeviceName; };

static framework::DataProcessorLabel getCheckRunnerLabel() { return { "qc-check" }; }
static std::string createCheckRunnerIdString() { return "qc-check"; };
static std::string createCheckRunnerName();
static std::string createCheckRunnerName(const std::vector<CheckConfig>& checks);
static std::string createSinkCheckRunnerName(o2::framework::InputSpec input);
static std::string createCheckRunnerFacility(std::string deviceName);

/// \brief Compute the detector name to be used for this checkrunner.
/// Compute the detector name to be used for this checkrunner.
/// If all checks belong to the same detector we use it, otherwise we use "MANY"
static std::string getDetectorName(const std::vector<CheckConfig>& checks);
static std::string getDetectorName(const std::vector<CheckConfig> checks);

private:
/**
Expand Down Expand Up @@ -205,7 +221,8 @@ class CheckRunner : public framework::Task
std::shared_ptr<Activity> mActivity; // shareable with the Checks
CheckRunnerConfig mConfig;
std::shared_ptr<o2::quality_control::repository::DatabaseInterface> mDatabase;
std::vector<std::shared_ptr<MonitorObject>> mToBeStored;
std::unordered_set<std::string> mInputStoreSet;
std::vector<std::shared_ptr<MonitorObject>> mMonitorObjectStoreVector;
UpdatePolicyManager updatePolicyManager;
bool mReceivedEOS = false;

Expand Down
12 changes: 11 additions & 1 deletion Framework/include/QualityControl/CheckRunnerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,17 @@ class CheckRunnerFactory
CheckRunnerFactory() = default;
virtual ~CheckRunnerFactory() = default;

static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs);
static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, std::vector<std::string> storeVector = {});

/*
* \brief Create a CheckRunner sink DPL device.
*
* The purpose of this device is to receive and store the MO from task.
*
* @param input InputSpec with the content to store
* @param configurationSource
*/
static framework::DataProcessorSpec createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input);

static CheckRunnerConfig extractConfig(const core::CommonSpec&);

Expand Down
11 changes: 11 additions & 0 deletions Framework/src/Check.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ QualityObjectsType Check::check(std::map<std::string, std::shared_ptr<MonitorObj

void Check::beautify(std::map<std::string, std::shared_ptr<MonitorObject>>& moMap, const Quality& quality)
{
if (!mCheckConfig.allowBeautify) {
return;
}

for (auto const& item : moMap) {
try {
mCheckInterface->beautify(item.second /*mo*/, quality);
Expand Down Expand Up @@ -232,6 +236,12 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec&
}
}

bool allowBeautify = checkSpec.dataSources.size() <= 1;
if (!allowBeautify) {
// See QC-299 for details
ILOG(Warning, Devel) << "Beautification disabled because more than one source is used in this Check (" << checkSpec.checkName << ")" << ENDM;
}

return {
checkSpec.checkName,
checkSpec.moduleName,
Expand All @@ -241,6 +251,7 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec&
updatePolicy,
std::move(objectNames),
checkAllObjects,
allowBeautify,
std::move(inputs),
createOutputSpec(checkSpec.checkName),
commonSpec.conditionDBUrl
Expand Down
89 changes: 76 additions & 13 deletions Framework/src/CheckRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,56 @@ std::size_t CheckRunner::hash(const std::string& inputString)
return checksum;
}

std::string CheckRunner::createCheckRunnerName()
std::string CheckRunner::createCheckRunnerName(const std::vector<CheckConfig>& checks)
{
return CheckRunner::createCheckRunnerIdString();
static const std::string alphanumeric =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const int NAME_LEN = 4;
std::string name(CheckRunner::createCheckRunnerIdString() + "-" + getDetectorName(checks) + "-");

if (checks.size() == 1) {
// If single check, use the check name
name += checks[0].name;
} else {
std::string hash_string;
std::vector<std::string> names;
// Fill vector with check names
for (const auto& c : checks) {
names.push_back(c.name);
}
// Be sure that after configuration shuffle, the name will be the same
std::sort(names.begin(), names.end());

// Create a single string and hash it
for (auto& n : names) {
hash_string += n;
}
std::size_t num = hash(hash_string);

// Change numerical to alphanumeric hash representation
for (int i = 0; i < NAME_LEN; ++i) {
name += alphanumeric[num % alphanumeric.size()];
num = num / alphanumeric.size();
}
}
return name;
}

std::string CheckRunner::createCheckRunnerFacility(std::string deviceName)
{
// it starts with "check/" and is followed by the unique part of the device name truncated to a maximum of 32 characters.
string facilityName = "check/" + deviceName.substr(CheckRunner::createCheckRunnerIdString().length() + 1, string::npos);
facilityName = facilityName.substr(0, 32);
return facilityName;
}

std::string CheckRunner::createSinkCheckRunnerName(InputSpec input)
{
std::string name(CheckRunner::createCheckRunnerIdString() + "-sink-");
name += DataSpecUtils::label(input);
return name;
}

o2::framework::Outputs CheckRunner::collectOutputs(const std::vector<CheckConfig>& checkConfigs)
Expand All @@ -83,12 +130,13 @@ o2::framework::Outputs CheckRunner::collectOutputs(const std::vector<CheckConfig
return outputs;
}

CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, o2::framework::Inputs inputs)
CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs)
: mDetectorName(getDetectorName(checkConfigs)),
mDeviceName(createCheckRunnerName()),
mDeviceName(createCheckRunnerName(checkConfigs)),
mConfig(std::move(checkRunnerConfig)),
mInputs{ inputs },
mOutputs{ CheckRunner::collectOutputs(checkConfigs) },
/* All checks have the same Input */
mInputs(checkConfigs.front().inputSpecs),
mOutputs(CheckRunner::collectOutputs(checkConfigs)),
mTotalNumberObjectsReceived(0),
mTotalNumberCheckExecuted(0),
mTotalNumberQOStored(0),
Expand All @@ -100,6 +148,19 @@ CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<
}
}

CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, InputSpec input)
: mDeviceName(createSinkCheckRunnerName(input)),
mConfig(std::move(checkRunnerConfig)),
mInputs{ input },
mOutputs{},
mTotalNumberObjectsReceived(0),
mTotalNumberCheckExecuted(0),
mTotalNumberQOStored(0),
mTotalNumberMOStored(0),
mTotalQOSent(0)
{
}

CheckRunner::~CheckRunner()
{
ILOG(Debug, Trace) << "CheckRunner destructor (" << this << ")" << ENDM;
Expand Down Expand Up @@ -151,7 +212,7 @@ void CheckRunner::refreshConfig(InitContext& iCtx)
void CheckRunner::init(framework::InitContext& iCtx)
{
try {
core::initInfologger(iCtx, mConfig.infologgerDiscardParameters, mDeviceName);
core::initInfologger(iCtx, mConfig.infologgerDiscardParameters, createCheckRunnerFacility(mDeviceName));
refreshConfig(iCtx);
Bookkeeping::getInstance().init(mConfig.bookkeepingUrl);
initDatabase();
Expand Down Expand Up @@ -197,7 +258,7 @@ void CheckRunner::run(framework::ProcessingContext& ctx)

auto now = getCurrentTimestamp();
store(qualityObjects, now);
store(mToBeStored, now);
store(mMonitorObjectStoreVector, now);

send(qualityObjects, ctx.outputs());

Expand All @@ -209,7 +270,7 @@ void CheckRunner::run(framework::ProcessingContext& ctx)

void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)
{
mToBeStored.clear();
mMonitorObjectStoreVector.clear();

for (const auto& input : mInputs) {
auto dataRef = inputRecord.get(input.binding.c_str());
Expand Down Expand Up @@ -239,6 +300,7 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)

// for each item of the array, check whether it is a MonitorObject. If not, create one and encapsulate.
// Then, store the MonitorObject in the various maps and vectors we will use later.
bool store = mInputStoreSet.count(DataSpecUtils::label(input)) > 0; // Check if this CheckRunner stores this input
for (const auto tObject : *array) {
std::shared_ptr<MonitorObject> mo{ dynamic_cast<MonitorObject*>(tObject) };

Expand All @@ -256,8 +318,9 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)
updatePolicyManager.updateObjectRevision(mo->getFullName());
mTotalNumberObjectsReceived++;

// Monitor Object will be stored later, after possible beautification
mToBeStored.push_back(mo);
if (store) { // Monitor Object will be stored later, after possible beautification
mMonitorObjectStoreVector.push_back(mo);
}
}
}
}
Expand Down Expand Up @@ -288,7 +351,7 @@ void CheckRunner::sendPeriodicMonitoring()

QualityObjectsType CheckRunner::check()
{
ILOG(Debug, Devel) << "check(): Trying " << mChecks.size() << " checks for " << mMonitorObjects.size() << " monitor objects"
ILOG(Debug, Devel) << "Trying " << mChecks.size() << " checks for " << mMonitorObjects.size() << " monitor objects"
<< ENDM;

QualityObjectsType allQOs;
Expand Down Expand Up @@ -491,7 +554,7 @@ void CheckRunner::reset()
mTotalQOSent = 0;
}

std::string CheckRunner::getDetectorName(const std::vector<CheckConfig>& checks)
std::string CheckRunner::getDetectorName(const std::vector<CheckConfig> checks)
{
std::string detectorName;
for (auto& check : checks) {
Expand Down
39 changes: 19 additions & 20 deletions Framework/src/CheckRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,17 @@
#include "QualityControl/CheckRunner.h"
#include "QualityControl/CheckRunnerFactory.h"
#include "QualityControl/CommonSpec.h"
#include <set>

namespace o2::quality_control::checker
{

using namespace o2::framework;

DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs)
DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, std::vector<std::string> storeVector)
{
auto options = checkRunnerConfig.options;

// concatenate all inputs
o2::framework::Inputs allInputs;
for (auto config : checkConfigs) {
allInputs.insert(allInputs.end(), config.inputSpecs.begin(), config.inputSpecs.end());
}

// We can end up with duplicated inputs that will later lead to circular dependencies on the checkRunner device.
o2::framework::Inputs allInputsNoDups;
std::set<std::string> alreadySeen;
for (auto input : allInputs) {
if (alreadySeen.count(input.binding) == 0) {
allInputsNoDups.push_back(input);
}
alreadySeen.insert(input.binding);
}

CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs, allInputsNoDups };
CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs };
qcCheckRunner.setTaskStoreSet({ storeVector.begin(), storeVector.end() });

DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(),
qcCheckRunner.getInputs(),
Expand All @@ -66,6 +49,22 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig
return newCheckRunner;
}

DataProcessorSpec CheckRunnerFactory::createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input)
{
CheckRunner qcCheckRunner{ checkRunnerConfig, input };
qcCheckRunner.setTaskStoreSet({ DataSpecUtils::label(input) });

DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(),
qcCheckRunner.getInputs(),
Outputs{ qcCheckRunner.getOutputs() },
adaptFromTask<CheckRunner>(std::move(qcCheckRunner)),
checkRunnerConfig.options,
{},
{ o2::framework::ecs::qcReconfigurable } };

return newCheckRunner;
}

void CheckRunnerFactory::customizeInfrastructure(std::vector<framework::CompletionPolicy>& policies)
{
auto matcher = [label = CheckRunner::getCheckRunnerLabel()](framework::DeviceSpec const& device) {
Expand Down
Loading