Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
#include "Framework/Signpost.h"
#include "Framework/DanglingEdgesContext.h"
#include "Framework/ConfigContext.h"
#include "Framework/ConfigContext.h"
#include "Framework/RunningWorkflowInfo.h"
#include "Framework/ConfigParamsHelper.h"
#include <arrow/array/builder_binary.h>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
Expand Down Expand Up @@ -72,6 +73,45 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
{
return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) {
auto& dec = ic.services().get<DanglingEdgesContext>();
auto& rwi = ic.services().get<RunningWorkflowInfo const>();

// Build a map from ccdb: option name -> URL, consulting the RunningWorkflow
// so that a Configurable<std::string>{"ccdb:fXxx", ...} declared in any
// analysis task is honoured. The CCDB device's own option (registered via
// ArrowSupport topology adjustment) takes precedence as the runtime override,
// falling back to the task device's declared value, then to the compile-time
// default stored in the InputSpec metadata.
std::unordered_map<std::string, std::string> ccdbUrls;
for (auto& input : dec.analysisCCDBInputs) {
for (auto& m : input.metadata) {
if (!m.name.starts_with("ccdb:") || ccdbUrls.count(m.name)) {
continue;
}
// Start with the compile-time default from the column declaration macro
std::string url = m.defaultValue.asString();
// Prefer the value from whichever task device has this option registered
// (i.e. declared a Configurable<std::string>{"ccdb:fXxx", ...})
for (auto& device : rwi.devices) {
if (device.name == spec.name) {
continue; // skip the CCDB device itself
}
for (auto& opt : device.options) {
if (opt.name == m.name) {
url = opt.defaultValue.asString();
break;
}
}
}
// Allow runtime override via the CCDB device's own registered option
// (set with --internal-dpl-aod-ccdb.ccdb:fXxx or JSON config)
if (ConfigParamsHelper::hasOption(spec.options, m.name)) {
url = options.get<std::string>(m.name.c_str());
}
LOGP(info, "CCDB path resolved for {}: {}", m.name, url);
ccdbUrls.emplace(m.name, std::move(url));
}
}

std::vector<std::shared_ptr<arrow::Schema>> schemas;
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();

Expand All @@ -92,9 +132,9 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
if (!m.name.starts_with("ccdb:")) {
continue;
}
// Create the schema of the output
// Create the schema of the output using the resolved URL
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
metadata->Append("url", m.defaultValue.asString());
metadata->Append("url", ccdbUrls.at(m.name));
auto columnName = m.name.substr(strlen("ccdb:"));
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
}
Expand Down
20 changes: 20 additions & 0 deletions Framework/Core/include/Framework/Configurable.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@ struct Configurable : IP {
template <typename T, ConfigParamKind K = ConfigParamKind::kGeneric>
using MutableConfigurable = Configurable<T, K, ConfigurablePolicyMutable<T, K>>;

/// Convenience wrapper for overriding the CCDB path of a CCDB column declared
/// with DECLARE_SOA_CCDB_COLUMN / DECLARE_SOA_CCDB_COLUMN_FULL.
///
/// The option name, default value, and help string are all derived automatically
/// from the column type: name = "ccdb:" + Column::mLabel, default = Column::query.
///
/// Example:
/// struct MyTask {
/// ConfigurableCCDBPath<tofcalib::LHCphase> lhcPhasePath;
/// };
template <typename Column>
struct ConfigurableCCDBPath : Configurable<std::string> {
ConfigurableCCDBPath()
: Configurable<std::string>{std::string{"ccdb:"} + Column::mLabel,
std::string{Column::query},
std::string{"CCDB path for "} + Column::mLabel + " (default: " + Column::query + ")"}
{
}
};

template <typename T>
concept is_configurable = requires(T t) {
requires std::same_as<std::string, decltype(t.name)>;
Expand Down
30 changes: 30 additions & 0 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "Framework/ServiceRegistryHelpers.h"
#include "Framework/Signpost.h"
#include "Framework/DefaultsHelpers.h"
#include "Framework/ConfigParamsHelper.h"

#include "CommonMessageBackendsHelpers.h"
#include <Monitoring/Monitoring.h>
Expand Down Expand Up @@ -637,6 +638,35 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
analysisCCDB->outputs.clear();
analysisCCDB->inputs.clear();
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
// Register each ccdb: column path as an actual device option on the CCDB
// device so it can be read from ConfigParamRegistry at runtime.
// If any analysis task declared a Configurable<std::string> with the same
// "ccdb:fXxx" name, prefer its default over the compile-time ::query value.
// First encountered wins (addOptionIfMissing semantics); log a warning if
// two tasks declare conflicting defaults for the same path.
for (auto& input : dec.analysisCCDBInputs) {
for (auto& m : input.metadata) {
if (!m.name.starts_with("ccdb:")) {
continue;
}
ConfigParamSpec effective = m; // start with compile-time default
for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
for (auto& opt : d.options) {
if (opt.name == m.name) {
if (opt.defaultValue.asString() != effective.defaultValue.asString()) {
LOGP(warn, "Task '{}' declares Configurable '{}' = '{}' which conflicts "
"with an earlier value '{}'; earlier value will be used.",
d.name, opt.name, opt.defaultValue.asString(),
effective.defaultValue.asString());
}
effective = opt; // task Configurable wins (first one found)
break;
}
}
}
ConfigParamsHelper::addOptionIfMissing(analysisCCDB->options, effective);
}
}
// load real AlgorithmSpec before deployment
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
}
Expand Down
5 changes: 4 additions & 1 deletion Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ struct DummyTimestampsTable {
};

struct SimpleCCDBConsumer {
ConfigurableCCDBPath<o2::aod::tofcalib::LHCphase> lhcPhasePath;

void process(o2::aod::TOFCalibrationObjects const& ccdbObjectsForAllTimestamps)
{
LOGP(info, "LHCphase CCDB path configurable value: {}", lhcPhasePath.value);
LOGP(info, "Looking at all the LHCphases associated to the timestamps");
for (auto& object : ccdbObjectsForAllTimestamps) {
std::cout << object.lhcPhase().getStartValidity() << " " << object.lhcPhase().getEndValidity() << std::endl;
Expand All @@ -64,6 +67,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
{
return WorkflowSpec{
adaptAnalysisTask<DummyTimestampsTable>(cfgc),
adaptAnalysisTask<SimpleCCDBConsumer>(cfgc, TaskName{"simple-ccdb-cunsumer"}),
adaptAnalysisTask<SimpleCCDBConsumer>(cfgc, TaskName{"simple-ccdb-consumer"}),
};
}
Loading