Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
// create list of requested tables
bool reportTFN = false;
bool reportTFFileName = false;
bool reportMetadata = false;
header::DataHeader TFNumberHeader;
header::DataHeader TFFileNameHeader;
std::vector<OutputRoute> requestedTables;
Expand All @@ -179,6 +180,8 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
TFFileNameHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
reportTFFileName = true;
} else if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("AODM"))) {
reportMetadata = true;
} else {
requestedTables.emplace_back(route);
}
Expand All @@ -192,7 +195,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
fileCounter,
numTF,
watchdog,
didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
didir, reportTFN, reportTFFileName, reportMetadata](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
// Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
// the TF to read is numTF
assert(device.inputTimesliceId < device.maxInputTimeslices);
Expand Down Expand Up @@ -275,6 +278,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
}
outputs.make<std::string>(o2) = currentFilename;
}
if (reportMetadata) {
didir->readMetadata(outputs, dh);
}
}
first = false;
}
Expand Down Expand Up @@ -306,7 +312,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
control.readyToQuit(QuitRequest::Me);
return;
}
}
}
});
})};

Expand Down
50 changes: 32 additions & 18 deletions Framework/AnalysisSupport/src/DataInputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,28 @@ bool DataInputDescriptor::setFile(int counter)
}
mcurrentFile->SetReadaheadSize(50 * 1024 * 1024);

// Get the metadata, if any
auto m = (TMap*)mcurrentFile->Get("metaData");
auto it = m->MakeIterator();
mMetadata.clear();

// Serialise metadata into a ; separated string with : separating key and value
bool first = true;
while (auto obj = it->Next()) {
if (first) {
LOGP(info, "Metadata for file \"{}\":", filename);
} else {
mMetadata += ";";
first = false;
}
auto objString = (TObjString*)m->GetValue(obj);
mMetadata += obj->GetName();
mMetadata += ":";
mMetadata += objString->String();
mMetadata += ";";
LOGP(info, "- {}: {}", obj->GetName(), objString->String());
}

// get the parent file map if exists
mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
if (mParentFileMap && !mParentFileReplacement.empty()) {
Expand Down Expand Up @@ -710,58 +732,50 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade
return result;
}

std::unique_ptr<TTreeReader> DataInputDirector::getTreeReader(header::DataHeader dh, int counter, int numTF, std::string treename)
FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
{
std::unique_ptr<TTreeReader> reader = nullptr;
auto didesc = getDataInputDescriptor(dh);
// if NOT match then use defaultDataInputDescriptor
if (!didesc) {
didesc = mdefaultDataInputDescriptor;
}

auto fileAndFolder = didesc->getFileFolder(counter, numTF);
if (fileAndFolder.file) {
treename = fileAndFolder.folderName + "/" + treename;
reader = std::make_unique<TTreeReader>(treename.c_str(), fileAndFolder.file);
if (!reader) {
throw std::runtime_error(fmt::format(R"(Couldn't create TTreeReader for tree "{}" in file "{}")", treename, fileAndFolder.file->GetName()));
}
}

return reader;
return didesc->getFileFolder(counter, numTF);
}

FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
int DataInputDirector::getTimeFramesInFile(header::DataHeader dh, int counter)
{
auto didesc = getDataInputDescriptor(dh);
// if NOT match then use defaultDataInputDescriptor
if (!didesc) {
didesc = mdefaultDataInputDescriptor;
}

return didesc->getFileFolder(counter, numTF);
return didesc->getTimeFramesInFile(counter);
}

int DataInputDirector::getTimeFramesInFile(header::DataHeader dh, int counter)
uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counter, int numTF)
{
auto didesc = getDataInputDescriptor(dh);
// if NOT match then use defaultDataInputDescriptor
if (!didesc) {
didesc = mdefaultDataInputDescriptor;
}

return didesc->getTimeFramesInFile(counter);
return didesc->getTimeFrameNumber(counter, numTF);
}

uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counter, int numTF)
// read metadata from file. Notice we must have read at least one table to get the metadata
void DataInputDirector::readMetadata(DataAllocator& outputs, header::DataHeader dh)
{
auto didesc = getDataInputDescriptor(dh);
// if NOT match then use defaultDataInputDescriptor
if (!didesc) {
didesc = mdefaultDataInputDescriptor;
}

return didesc->getTimeFrameNumber(counter, numTF);
auto& m = outputs.make<std::string>(Output{"AODM", "METADATA", 0});
m = didesc->getMetadata();
}

bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
Expand Down
6 changes: 5 additions & 1 deletion Framework/AnalysisSupport/src/DataInputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class DataInputDescriptor
void addFileNameHolder(FileNameHolder* fn);
int fillInputfiles();
bool setFile(int counter);
[[nodiscard]] std::string_view const getMetadata() const { return mMetadata; }

// getters
std::string getInputfilesFilename();
Expand Down Expand Up @@ -96,6 +97,7 @@ class DataInputDescriptor
std::vector<FileNameHolder*> mfilenames;
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
TFile* mcurrentFile = nullptr;
std::string mMetadata;
int mCurrentFileID = -1;
bool mAlienSupport = false;

Expand Down Expand Up @@ -138,8 +140,10 @@ class DataInputDirector
DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh);
int getNumberInputDescriptors() { return mdataInputDescriptors.size(); }

std::unique_ptr<TTreeReader> getTreeReader(header::DataHeader dh, int counter, int numTF, std::string treeName);
bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed);
// Send the metadata for the current file
void readMetadata(DataAllocator& outputs, header::DataHeader dh);

uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF);
int getTimeFramesInFile(header::DataHeader dh, int counter);
Expand Down
46 changes: 46 additions & 0 deletions Framework/Core/include/Framework/AnalysisManagers.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "Framework/Plugins.h"
#include "Framework/RootMessageContext.h"
#include "Framework/DeviceSpec.h"
#include "Framework/Metadata.h"

namespace o2::framework
{
Expand Down Expand Up @@ -198,6 +199,51 @@ struct ConditionManager<Condition<OBJ>> {
}
};

/// A manager which takes care of metadata queries
template <typename T>
struct MetadataManager {
template <typename ANY>
static bool hasMetadata(ANY& x)
{
// If we have a metadata group or a metadata object, we have metadata
// Otherwise, we don't
if constexpr (std::is_base_of_v<MetadataGroup, ANY>) {
return true;
} else if constexpr (std::is_same_v<ANY, Metadata>) {
return true;
} else {
return false;
}
}

template <typename ANY>
static bool newDataframe(InputRecord& record, ANY& x)
{
if constexpr (std::is_base_of_v<MetadataGroup, ANY>) {
homogeneous_apply_refs<true>([&record](auto&& y) { return MetadataManager<std::decay_t<decltype(y)>>::newDataframe(record, y); }, x);
return true;
} else if constexpr (std::is_same_v<ANY, Metadata>) {
auto metadata = record.get<std::string>("metadata");
// Split the metadata string into a map of key-value pairs
std::map<std::string, std::string> metadataMap;
std::string key, value;
std::istringstream iss(metadata);
while (std::getline(std::getline(iss, key, ':'), value, ';')) {
metadataMap[key] = value;
}
x.value = metadataMap[x.key];
return true;
} else {
return false;
}
}

static InputSpec metadataSpec()
{
return InputSpec{"metadata", "AODM", "METADATA", Lifetime::Timeframe};
}
};

/// SFINAE placeholder
template <typename T>
struct OutputManager {
Expand Down
14 changes: 14 additions & 0 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,18 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
homogeneous_apply_refs([&options, &hash](auto& x) { return OptionManager<std::decay_t<decltype(x)>>::appendOption(options, x); }, *task.get());
/// extract conditions and append them as inputs
homogeneous_apply_refs([&inputs](auto& x) { return ConditionManager<std::decay_t<decltype(x)>>::appendCondition(inputs, x); }, *task.get());
/// extract metadata requests and append the metadata object as input
bool hasMetadata = false;
auto checkForMetadata = [&inputs, &hasMetadata](auto& x) -> bool {
bool result = MetadataManager<std::decay_t<decltype(x)>>::hasMetadata(x);
hasMetadata |= result;
return result;
};
homogeneous_apply_refs(checkForMetadata, *task.get());
if (hasMetadata) {
// Append the input spec for metadata if at any point we encountered it.
inputs.emplace_back(MetadataManager<std::void_t<>>::metadataSpec());
}

/// parse process functions defined by corresponding configurables
if constexpr (has_process_v<T>) {
Expand Down Expand Up @@ -686,6 +698,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
homogeneous_apply_refs([&pc](auto&& x) { return ConditionManager<std::decay_t<decltype(x)>>::newDataframe(pc.inputs(), x); }, *task.get());
// reset partitions once per dataframe
homogeneous_apply_refs([](auto&& x) { return PartitionManager<std::decay_t<decltype(x)>>::newDataframe(x); }, *task.get());
// get the metadata for the dataframe
homogeneous_apply_refs([&pc](auto&& x) { return MetadataManager<std::decay_t<decltype(x)>>::newDataframe(pc.inputs(), x); }, *task.get());
// reset selections for the next dataframe
for (auto& info : expressionInfos) {
info.resetSelection = true;
Expand Down
82 changes: 82 additions & 0 deletions Framework/Core/include/Framework/Metadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_METADATA_H_
#define O2_FRAMEWORK_METADATA_H_
#include <string>

namespace o2::framework
{
/// Metadata for the analysis framework.
/// You need to declare it as a member of your AnalysisTask
/// like you do for configurable parameters. The constructor
/// allows you to subscribe to a given key, and the value
/// will be read from the data file and provided to you via
/// the get() method or by using the dereference operator.
/// Notice that for the moment metadata is provided as a string,
/// we might provide polymorphic access if people ask for it.
/// \example
/// \code{.cpp}
/// struct MyTask : AnalysisTask {
/// Metadata<std::string> someMetadata{"label"};
/// ...
/// void process(aod::Tracks const& tracks) {
/// std::cout << someMetadata.get() << std::endl;
/// }
/// };
/// \endcode
struct Metadata {
std::string key;
std::string value;

Metadata(std::string key)
: key(std::move(key))
{
}

[[nodiscard]] std::string_view get() const
{
return this->value;
}

operator std::string_view() const
{
return this->value;
}

std::string_view const operator*() const
{
return this->value;
}

std::string_view const operator->() const
{
return this->value;
}
};

/// Can be used to group together a number of Configurables
/// to overcome the limit of 100 Configurables per task.
/// In order to do so you can do:
///
/// struct MyTask {
/// struct : MetadataGroup {
/// Metadata<std::string> someMetadata{"label"};
/// } group;
/// };
///
/// and access it with
///
/// group.someMetadata;
struct MetadataGroup {
};

} // namespace o2::framework
#endif // O2_FRAMEWORK_METADATA_H_
2 changes: 1 addition & 1 deletion Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()

// remove unmatched outputs
auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(requestedAODs.begin(), requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && !DataSpecUtils::partialMatch(o, o2::header::DataOrigin{"AODM"}) && std::none_of(requestedAODs.begin(), requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
});
reader->outputs.erase(o_end, reader->outputs.end());
if (reader->outputs.empty()) {
Expand Down
12 changes: 12 additions & 0 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext

std::vector<InputSpec> requestedAODs;
std::vector<OutputSpec> providedAODs;
// We treat metadata separately. In principle
// we could even use this to allow non table based
// objects (e.g. histograms) to be created based
// on table inputs and be passed around.
std::vector<InputSpec> requestedAODMs;
std::vector<OutputSpec> providedAODMs;
std::vector<InputSpec> requestedDYNs;
std::vector<OutputSpec> providedDYNs;
std::vector<InputSpec> requestedIDXs;
Expand Down Expand Up @@ -427,6 +433,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
if (DataSpecUtils::partialMatch(input, header::DataOrigin{"AOD"})) {
DataSpecUtils::updateInputList(requestedAODs, InputSpec{input});
}
if (DataSpecUtils::partialMatch(input, header::DataOrigin{"AODM"})) {
DataSpecUtils::updateInputList(requestedAODMs, InputSpec{input});
}
if (DataSpecUtils::partialMatch(input, header::DataOrigin{"DYN"})) {
DataSpecUtils::updateInputList(requestedDYNs, InputSpec{input});
}
Expand All @@ -440,6 +449,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
for (auto& output : processor.outputs) {
if (DataSpecUtils::partialMatch(output, header::DataOrigin{"AOD"})) {
providedAODs.emplace_back(output);
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"AODM"})) {
providedAODMs.emplace_back(output);
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"DYN"})) {
providedDYNs.emplace_back(output);
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"ATSK"})) {
Expand Down Expand Up @@ -486,6 +497,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
addMissingOutputsToSpawner({}, spawnerInputs, requestedAODs, aodSpawner);

addMissingOutputsToReader(providedAODs, requestedAODs, aodReader);
addMissingOutputsToReader(providedAODMs, requestedAODMs, aodReader);
addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);

std::vector<DataProcessorSpec> extraSpecs;
Expand Down
Loading