Skip to content

Commit

Permalink
DPL Analysis: provide metadata to workflow construction
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed May 13, 2024
1 parent 221c1f0 commit 9d48607
Show file tree
Hide file tree
Showing 18 changed files with 404 additions and 14 deletions.
6 changes: 3 additions & 3 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, 0});
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, 0});

if (!options.isSet("aod-file")) {
if (!options.isSet("aod-file-private")) {
LOGP(fatal, "No input file defined!");
throw std::runtime_error("Processing is stopped!");
}

auto filename = options.get<std::string>("aod-file");
auto filename = options.get<std::string>("aod-file-private");

std::string parentFileReplacement;
if (options.isSet("aod-parent-base-path-replacement")) {
Expand Down Expand Up @@ -306,7 +306,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
control.readyToQuit(QuitRequest::Me);
return;
}
}
}
});
})};

Expand Down
121 changes: 121 additions & 0 deletions Framework/AnalysisSupport/src/Plugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,17 @@
#include "Framework/AlgorithmSpec.h"
#include "Framework/ServiceSpec.h"
#include "Framework/ServiceMetricsInfo.h"
#include "Framework/ConfigParamDiscovery.h"
#include "Framework/Capability.h"
#include "Framework/Signpost.h"
#include "AODJAlienReaderHelpers.h"
#include <TFile.h>
#include <TMap.h>
#include <TObjString.h>
#include <TString.h>
#include <fmt/format.h>

O2_DECLARE_DYNAMIC_LOG(analysis_support);

struct ROOTFileReader : o2::framework::AlgorithmPlugin {
o2::framework::AlgorithmSpec create() override
Expand Down Expand Up @@ -54,7 +64,118 @@ struct RunSummary : o2::framework::ServicePlugin {
}
};

auto lookForAodFile = [](ConfigParamRegistry& registry) -> bool {
if (registry.hasOption("aod-file") && registry.isSet("aod-file")) {
// Do not load the metadata again if we find it already in the registry.
// This should avoid initialising ROOT too many times
if (registry.hasOption("aod-metadata-run")) {
return false;
}
return true;
}
return false;
};

struct DiscoverMetadataInAODCapability : o2::framework::CapabilityPlugin {
Capability* create() override
{
return new Capability{
.checkIfNeeded = lookForAodFile,
.requiredPlugin = "O2FrameworkAnalysisSupport:DiscoverMetadataInAOD"};
}
};

struct DiscoverMetadataInEnvironmentCapability : o2::framework::CapabilityPlugin {
Capability* create() override
{
return new Capability{
.checkIfNeeded = lookForAodFile,
.requiredPlugin = "O2FrameworkAnalysisSupport:DiscoverMetadataInEnvironment"};
}
};

struct DiscoverMetadataInAOD : o2::framework::ConfigDiscoveryPlugin {
ConfigDiscovery* create() override
{
return new ConfigDiscovery{
.init = []() {},
.discover = [](ConfigParamRegistry& registry) -> std::vector<ConfigParamSpec> {
auto filename = registry.get<std::string>("aod-file");
if (filename.empty()) {
return {};
}
LOGP(info, "Loading metadata from file {} in PID {}", filename, getpid());
std::vector<ConfigParamSpec> results;
TFile* currentFile = nullptr;
if (filename.at(0) == '@') {
filename.erase(0, 1);
// read the text file and set filename to the contents of the first line
std::ifstream file(filename);
if (!file.is_open()) {
LOGP(fatal, "Couldn't open file \"{}\"!", filename);
}
std::getline(file, filename);
file.close();
currentFile = TFile::Open(filename.c_str());
} else {
currentFile = TFile::Open(filename.c_str());
}
if (!currentFile) {
LOGP(fatal, "Couldn't open file \"{}\"!", filename);
}

// Get the metadata, if any
auto m = (TMap*)currentFile->Get("metaData");
if (!m) {
LOGP(warning, "No metadata found in file \"{}\"", filename);
return results;
}
auto it = m->MakeIterator();

// Serialise metadata into a ; separated string with : separating key and value
bool first = true;
while (auto obj = it->Next()) {
if (first) {
LOGP(info, "Metadata for file \"{}\":", filename);
first = false;
}
auto objString = (TObjString*)m->GetValue(obj);
LOGP(info, "- {}: {}", obj->GetName(), objString->String().Data());
std::string key = "aod-metadata-" + std::string(obj->GetName());
char const* value = strdup(objString->String());
results.push_back(ConfigParamSpec{key, VariantType::String, value, {"Metadata in AOD"}});
}
return results;
}};
}
};

struct DiscoverMetadataInEnvironment : o2::framework::ConfigDiscoveryPlugin {
ConfigDiscovery* create() override
{
return new ConfigDiscovery{
.init = []() {},
.discover = [](ConfigParamRegistry& registry) -> std::vector<ConfigParamSpec> {
O2_SIGNPOST_ID_GENERATE(sid, analysis_support);
O2_SIGNPOST_EVENT_EMIT(analysis_support, sid, "DiscoverMetadataInEnvironment",
"Discovering metadata for analysis from well known environment variables.");
std::vector<ConfigParamSpec> results;
char* runType = getenv("DPL_ANALYSIS_RUN_TYPE");
if (runType) {
O2_SIGNPOST_EVENT_EMIT(analysis_support, sid, "DiscoverMetadataInEnvironment",
"Found DPL_ANALYSIS_RUN_TYPE with value %{public}s.", runType);
results.push_back(ConfigParamSpec{"aod-metadata-run-type", VariantType::String, runType, {"Metadata in environment"}});
}
return results;
}};
}
};

DEFINE_DPL_PLUGINS_BEGIN
DEFINE_DPL_PLUGIN_INSTANCE(ROOTFileReader, CustomAlgorithm);
DEFINE_DPL_PLUGIN_INSTANCE(RunSummary, CustomService);
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInAODCapability, Capability);
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInAOD, ConfigDiscovery);
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInEnvironmentCapability, Capability);
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInEnvironment, ConfigDiscovery);
DEFINE_DPL_PLUGINS_END
1 change: 1 addition & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ o2_add_library(Framework
src/DefaultsHelpers.cxx
src/DomainInfoHeader.cxx
src/ProcessingPoliciesHelpers.cxx
src/ConfigParamDiscovery.cxx
src/ConfigParamStore.cxx
src/ConfigParamsHelper.cxx
src/ChannelParamSpec.cxx
Expand Down
42 changes: 42 additions & 0 deletions Framework/Core/include/Framework/Capability.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef O2_FRAMEWORK_CAPABILITY_H_
#define O2_FRAMEWORK_CAPABILITY_H_

#include <string_view>
#include <functional>

namespace o2::framework
{

struct ConfigParamRegistry;

/// A plugin which is able to discover more option then the ones
/// provided on the command line. The actual loading is in two parts,
/// the first one decides if the options are enough to actually perform
/// the discovery, the second part will do the discovery itself.
///
/// Its a good idea to have the Loader part in a standalone library to
/// minimize dependency on unneed thir party code, e.g. ROOT.
struct Capability {
// Wether or not this capability is required.
std::function<bool(ConfigParamRegistry&)> checkIfNeeded;
char const* requiredPlugin;
};

struct CapabilityPlugin {
virtual Capability* create() = 0;
};

} // namespace o2::framework

#endif // O2_FRAMEWORK_CAPABILITY_H_
37 changes: 37 additions & 0 deletions Framework/Core/include/Framework/ConfigParamDiscovery.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef O2_FRAMEWORK_CONFIGPARAMDISCOVERY_H_
#define O2_FRAMEWORK_CONFIGPARAMDISCOVERY_H_

#include "Framework/ConfigParamRegistry.h"

namespace o2::framework
{
// A plugin which can be used to inject extra configuration
// options.
struct ConfigParamDiscovery {
static std::vector<ConfigParamSpec> discover(ConfigParamRegistry&);
};

struct ConfigDiscovery {
std::function<void()> init;
/// A function which uses the arguments available so far to discover more
/// @return the extra ConfigParamSpecs it derives.
std::function<std::vector<ConfigParamSpec>(ConfigParamRegistry&)> discover;
};

struct ConfigDiscoveryPlugin {
virtual ConfigDiscovery* create() = 0;
};

} // namespace o2::framework
#endif // O2_FRAMEWORK_CONFIGPARAMDISCOVERY_H_
6 changes: 6 additions & 0 deletions Framework/Core/include/Framework/ConfigParamRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class ConfigParamRegistry
}
}

// Load extra parameters discovered while we process data
void loadExtra(std::vector<ConfigParamSpec>& extras)
{
mStore->load(extras);
}

private:
std::unique_ptr<ConfigParamStore> mStore;
};
Expand Down
4 changes: 3 additions & 1 deletion Framework/Core/include/Framework/ConfigParamStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ class ConfigParamStore
/// the configuration.
void preload();

void load(std::vector<ConfigParamSpec>& specs);

/// Get the store
boost::property_tree::ptree& store() { return *mStore; };
boost::property_tree::ptree& provenanceTree() { return *mProvenance; };

/// Get the specs
std::vector<ConfigParamSpec> const& specs() const
[[nodiscard]] std::vector<ConfigParamSpec> const& specs() const
{
return mSpecs;
}
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/PluginManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ struct PluginManager {

struct LoadedPlugin {
std::string name;
ServicePlugin* factory;
PLUGIN* factory;
};
std::vector<LoadedDSO> loadedDSOs;
std::vector<LoadedPlugin> loadedPlugins;
Expand Down
9 changes: 9 additions & 0 deletions Framework/Core/include/Framework/Plugins.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ enum struct DplPluginKind : int {
// A plugin which implements a custom Services. Needs to return
// an object of the kind o2::framework::ServiceSpec
CustomService,
// A plugin which implements a new way to discover extra configuration.
// parameters. E.g. it can be used to read metadata from a file or a service
// if a certain parameter is available.
ConfigDiscovery,
// A capability plugin is a plugin used to discover other viable plugins.
// For example, if you find out that you have the --aod-file option
// set, you might want to load metadata from it and attach it to the
// configuration.
Capability,
// A plugin which was not initialised properly.
Unknown
};
Expand Down
5 changes: 0 additions & 5 deletions Framework/Core/include/Framework/ServiceSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,6 @@ struct ServicePlugin {
virtual ServiceSpec* create() = 0;
};

struct LoadableService {
std::string name;
std::string library;
};

} // namespace o2::framework

#endif // O2_FRAMEWORK_SERVICESPEC_H_
8 changes: 8 additions & 0 deletions Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "Framework/Logger.h"
#include "Framework/CheckTypes.h"
#include "Framework/StructToTuple.h"
#include "Framework/ConfigParamDiscovery.h"

#include <vector>

Expand Down Expand Up @@ -192,6 +193,13 @@ int mainNoCatch(int argc, char** argv)
workflowOptionsStore->preload();
workflowOptionsStore->activate();
ConfigParamRegistry workflowOptionsRegistry(std::move(workflowOptionsStore));

auto extraOptions = o2::framework::ConfigParamDiscovery::discover(workflowOptionsRegistry);
for (auto& extra : extraOptions) {
workflowOptions.push_back(extra);
}
workflowOptionsRegistry.loadExtra(extraOptions);

ConfigContext configContext(workflowOptionsRegistry, argc, argv);
o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
overrideCloning(configContext, specs);
Expand Down
Loading

0 comments on commit 9d48607

Please sign in to comment.