Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPL: allow decoupling from JAlien and retrieve JAlienFile, if available #5073

Merged
merged 3 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion Framework/AnalysisSupport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@

# Given GCC 7.3 does not provide std::filesystem we use Boost instead
# Drop this once we move to GCC 8.2+

if(TARGET XRootD::Client)
set(EXTRA_TARGETS XRootD::Client JAliEn::JAliEn)
endif()

o2_add_library(FrameworkAnalysisSupport
SOURCES src/Plugin.cxx
src/AODJAlienReaderHelpers.cxx
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
PUBLIC_LINK_LIBRARIES O2::Framework)
PUBLIC_LINK_LIBRARIES O2::Framework ${EXTRA_TARGETS})
329 changes: 329 additions & 0 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "AODJAlienReaderHelpers.h"
#include "Framework/TableTreeHelpers.h"
#include "Framework/AnalysisHelpers.h"
#include "Framework/RootTableBuilderHelpers.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/ControlService.h"
#include "Framework/CallbackService.h"
#include "Framework/EndOfStreamContext.h"
#include "Framework/DeviceSpec.h"
#include "Framework/RawDeviceService.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/DataInputDirector.h"
#include "Framework/SourceInfoHeader.h"
#include "Framework/ChannelInfo.h"
#include "Framework/Logger.h"

#include <Monitoring/Monitoring.h>

#include <ROOT/RDataFrame.hxx>
#if __has_include(<TJAlienFile.h>)
#include <TJAlienFile.h>
#endif
#include <TGrid.h>
#include <TFile.h>
#include <TTreeCache.h>
#include <TTreePerfStats.h>

#include <arrow/ipc/reader.h>
#include <arrow/ipc/writer.h>
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>

#include <thread>

using namespace o2;
using namespace o2::aod;

struct RuntimeWatchdog {
int numberTimeFrames;
uint64_t startTime;
uint64_t lastTime;
double runTime;
uint64_t runTimeLimit;

RuntimeWatchdog(Long64_t limit)
{
numberTimeFrames = -1;
startTime = uv_hrtime();
lastTime = startTime;
runTime = 0.;
runTimeLimit = limit;
}

bool update()
{
numberTimeFrames++;
if (runTimeLimit <= 0) {
return true;
}

auto nowTime = uv_hrtime();

// time spent to process the time frame
double time_spent = numberTimeFrames < 1 ? (double)(nowTime - lastTime) / 1.E9 : 0.;
runTime += time_spent;
lastTime = nowTime;

return ((double)(lastTime - startTime) / 1.E9 + runTime / (numberTimeFrames + 1)) < runTimeLimit;
}

void printOut()
{
LOGP(INFO, "RuntimeWatchdog");
LOGP(INFO, " run time limit: {}", runTimeLimit);
LOGP(INFO, " number of time frames: {}", numberTimeFrames);
LOGP(INFO, " estimated run time per time frame: {}", (numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.);
LOGP(INFO, " estimated total run time: {}", (double)(lastTime - startTime) / 1.E9 + ((numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.));
}
};

template <typename... C>
static constexpr auto columnNamesTrait(framework::pack<C...>)
{
return std::vector<std::string>{C::columnLabel()...};
}

std::vector<std::string> getColumnNames(header::DataHeader dh)
{
auto description = std::string(dh.dataDescription.str);
auto origin = std::string(dh.dataOrigin.str);

// get column names
// AOD / RN2
if (origin == "AOD") {
if (description == "TRACK:PAR") {
return columnNamesTrait(typename StoredTracksMetadata::table_t::persistent_columns_t{});
} else if (description == "TRACK:PARCOV") {
return columnNamesTrait(typename StoredTracksCovMetadata::table_t::persistent_columns_t{});
} else if (description == "TRACK:EXTRA") {
return columnNamesTrait(typename TracksExtraMetadata::table_t::persistent_columns_t{});
}
}

// default: column names = {}
return std::vector<std::string>({});
}

using o2::monitoring::Metric;
using o2::monitoring::Monitoring;
using o2::monitoring::tags::Key;
using o2::monitoring::tags::Value;

namespace o2::framework::readers
{
auto setEOSCallback(InitContext& ic)
{
ic.services().get<CallbackService>().set(CallbackService::Id::EndOfStream,
[](EndOfStreamContext& eosc) {
auto& control = eosc.services().get<ControlService>();
control.endOfStream();
control.readyToQuit(QuitRequest::Me);
});
}

template <typename O>
static inline auto extractTypedOriginal(ProcessingContext& pc)
{
///FIXME: this should be done in invokeProcess() as some of the originals may be compound tables
return O{pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable()};
}

template <typename... Os>
static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
{
return std::make_tuple(extractTypedOriginal<Os>(pc)...);
}

AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
{
auto callback = AlgorithmSpec{adaptStateful([](ConfigParamRegistry const& options,
DeviceSpec const& spec,
Monitoring& monitoring) {
monitoring.send(Metric{(uint64_t)0, "arrow-bytes-created"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)0, "arrow-messages-created"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)0, "arrow-bytes-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)0, "arrow-messages-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.flushBuffer();

if (!options.isSet("aod-file")) {
LOGP(ERROR, "No input file defined!");
throw std::runtime_error("Processing is stopped!");
}

auto filename = options.get<std::string>("aod-file");

// create a DataInputDirector
auto didir = std::make_shared<DataInputDirector>(filename);
if (options.isSet("aod-reader-json")) {
auto jsonFile = options.get<std::string>("aod-reader-json");
if (!didir->readJson(jsonFile)) {
LOGP(ERROR, "Check the JSON document! Can not be properly parsed!");
}
}

// get the run time watchdog
auto* watchdog = new RuntimeWatchdog(options.get<int64_t>("time-limit"));

// selected the TFN input and
// create list of requested tables
header::DataHeader TFNumberHeader;
std::vector<OutputRoute> requestedTables;
std::vector<OutputRoute> routes(spec.outputs);
for (auto route : routes) {
if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFN"))) {
auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
TFNumberHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
} else {
requestedTables.emplace_back(route);
}
}

auto fileCounter = std::make_shared<int>(0);
auto numTF = std::make_shared<int>(-1);
return adaptStateless([TFNumberHeader,
requestedTables,
fileCounter,
numTF,
watchdog,
didir](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
// check if RuntimeLimit is reached
if (!watchdog->update()) {
LOGP(INFO, "Run time exceeds run time limit of {} seconds!", watchdog->runTimeLimit);
LOGP(INFO, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1);
monitoring.flushBuffer();
didir->closeInputFiles();
control.endOfStream();
control.readyToQuit(QuitRequest::Me);
return;
}

// Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
// the TF to read is numTF
assert(device.inputTimesliceId < device.maxInputTimeslices);
uint64_t timeFrameNumber = 0;
int fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
int ntf = *numTF + 1;
monitoring.send(Metric{(uint64_t)ntf, "tf-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
static int currentFileCounter = -1;
static int filesProcessed = 0;
if (currentFileCounter != *fileCounter) {
currentFileCounter = *fileCounter;
monitoring.send(Metric{(uint64_t)++filesProcessed, "files-opened"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
}

// loop over requested tables
TTree* tr = nullptr;
bool first = true;
static size_t totalSizeUncompressed = 0;
static size_t totalSizeCompressed = 0;
static size_t totalReadCalls = 0;

for (auto route : requestedTables) {

// create header
auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);

// create a TreeToTable object
auto info = didir->getFileFolder(dh, fcnt, ntf);
size_t before = 0;
tr = didir->getDataTree(dh, fcnt, ntf);
if (!tr) {
if (first) {
// check if there is a next file to read
fcnt += device.maxInputTimeslices;
if (didir->atEnd(fcnt)) {
LOGP(INFO, "No input files left to read for reader {}!", device.inputTimesliceId);
didir->closeInputFiles();
control.endOfStream();
control.readyToQuit(QuitRequest::Me);
return;
}
// get first folder of next file
ntf = 0;
tr = didir->getDataTree(dh, fcnt, ntf);
if (!tr) {
LOGP(FATAL, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin, fcnt, ntf);
throw std::runtime_error("Processing is stopped!");
}
} else {
LOGP(FATAL, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin, fcnt, ntf);
throw std::runtime_error("Processing is stopped!");
}
}
TTreePerfStats ps("ioperf", tr);

if (first) {
timeFrameNumber = didir->getTimeFrameNumber(dh, fcnt, ntf);
auto o = Output(TFNumberHeader);
outputs.make<uint64_t>(o) = timeFrameNumber;
}

// create table output
auto o = Output(dh);
auto& t2t = outputs.make<TreeToTable>(o);

// add branches to read
// fill the table

auto colnames = getColumnNames(dh);
if (colnames.size() == 0) {
totalSizeCompressed += tr->GetZipBytes();
totalSizeUncompressed += tr->GetTotBytes();
t2t.addAllColumns(tr);
} else {
for (auto& colname : colnames) {
TBranch* branch = tr->GetBranch(colname.c_str());
totalSizeCompressed += branch->GetZipBytes("*");
totalSizeUncompressed += branch->GetTotBytes("*");
t2t.addColumn(colname.c_str());
}
}
t2t.fill(tr);
if (info.file) {
totalReadCalls += info.file->GetReadCalls() - before;
static std::string currentFileRead = "";
std::string nextFileRead = info.file->GetPath();
if (currentFileRead != nextFileRead) {
currentFileRead = nextFileRead;
#if __has_include(<TJAlienFile.h>)
auto alienFile = dynamic_cast<TJAlienFile*>(info.file);
if (alienFile) {
/// FIXME: get the JAlien stats
}
#endif
monitoring.send(Metric{currentFileRead, "aod-file-read-path"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
}
}
monitoring.send(Metric{(double)ps.GetReadCalls(), "aod-tree-read-calls"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
delete tr;

first = false;
}
monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)totalReadCalls, "aod-total-read-calls"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));

// save file number and time frame
*fileCounter = (fcnt - device.inputTimesliceId) / device.maxInputTimeslices;
*numTF = ntf;
});
})};

return callback;
}

} // namespace o2::framework::readers
28 changes: 28 additions & 0 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef O2_FRAMEWORK_AODJALIENREADERHELPERS_H_
#define O2_FRAMEWORK_AODJALIENREADERHELPERS_H_

#include "Framework/TableBuilder.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/Logger.h"
#include <uv.h>

namespace o2::framework::readers
{

struct AODJAlienReaderHelpers {
static AlgorithmSpec rootFileReaderCallback();
};

} // namespace o2::framework::readers

#endif // O2_FRAMEWORK_AODREADERHELPERS_H_
4 changes: 2 additions & 2 deletions Framework/AnalysisSupport/src/Plugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
// or submit itself to any jurisdiction.
#include "Framework/Plugins.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/AODReaderHelpers.h"
#include "AODJAlienReaderHelpers.h"

struct ROOTFileReader : o2::framework::AlgorithmPlugin {
o2::framework::AlgorithmSpec create() override
{
return o2::framework::readers::AODReaderHelpers::rootFileReaderCallback();
return o2::framework::readers::AODJAlienReaderHelpers::rootFileReaderCallback();
}
};

Expand Down