Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AliEn metrics #5177

Merged
merged 2 commits into from Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 39 additions & 43 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Expand Up @@ -25,8 +25,6 @@
#include "Framework/ChannelInfo.h"
#include "Framework/Logger.h"

#include <Monitoring/Monitoring.h>

#include <ROOT/RDataFrame.hxx>
#if __has_include(<TJAlienFile.h>)
#include <TJAlienFile.h>
Expand Down Expand Up @@ -147,6 +145,18 @@ static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingConte
return std::make_tuple(extractTypedOriginal<Os>(pc)...);
}

void AODJAlienReaderHelpers::dumpFileMetrics(Monitoring& monitoring, TFile* currentFile, int tfPerFile, int tfRead)
{
std::string monitoringInfo(fmt::format("lfn={},size={},total_tf={},read_tf={},read_bytes={},read_calls={}", currentFile->GetPath(), currentFile->GetSize(), tfPerFile, tfRead, currentFile->GetBytesRead(), currentFile->GetReadCalls()));
#if __has_include(<TJAlienFile.h>)
auto alienFile = dynamic_cast<TJAlienFile*>(currentFile);
if (alienFile) {
monitoringInfo += fmt::format(",se={}", alienFile->GetSE());
}
#endif
monitoring.send(Metric{monitoringInfo, "aod-file-read-info"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
}

AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
{
auto callback = AlgorithmSpec{adaptStateful([](ConfigParamRegistry const& options,
Expand All @@ -159,7 +169,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
monitoring.flushBuffer();

if (!options.isSet("aod-file")) {
LOGP(ERROR, "No input file defined!");
LOGP(FATAL, "No input file defined!");
throw std::runtime_error("Processing is stopped!");
}

Expand Down Expand Up @@ -199,24 +209,12 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
numTF,
watchdog,
didir](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
// check if RuntimeLimit is reached
if (!watchdog->update()) {
LOGP(INFO, "Run time exceeds run time limit of {} seconds!", watchdog->runTimeLimit);
LOGP(INFO, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1);
monitoring.flushBuffer();
didir->closeInputFiles();
control.endOfStream();
control.readyToQuit(QuitRequest::Me);
return;
}

// Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
// the TF to read is numTF
assert(device.inputTimesliceId < device.maxInputTimeslices);
uint64_t timeFrameNumber = 0;
int fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
int ntf = *numTF + 1;
monitoring.send(Metric{(uint64_t)ntf, "tf-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
static int currentFileCounter = -1;
static int filesProcessed = 0;
if (currentFileCounter != *fileCounter) {
Expand All @@ -225,11 +223,25 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
}

// loop over requested tables
TTree* tr = nullptr;
bool first = true;
static size_t totalSizeUncompressed = 0;
static size_t totalSizeCompressed = 0;
static size_t totalReadCalls = 0;
static TFile* currentFile = nullptr;
static int tfCurrentFile = -1;

// check if RuntimeLimit is reached
if (!watchdog->update()) {
LOGP(INFO, "Run time exceeds run time limit of {} seconds!", watchdog->runTimeLimit);
LOGP(INFO, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1);
if (currentFile) {
dumpFileMetrics(monitoring, currentFile, tfCurrentFile, ntf);
}
monitoring.flushBuffer();
didir->closeInputFiles();
control.endOfStream();
control.readyToQuit(QuitRequest::Me);
return;
}

for (auto route : requestedTables) {

Expand All @@ -238,11 +250,12 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);

// create a TreeToTable object
auto info = didir->getFileFolder(dh, fcnt, ntf);
size_t before = 0;
tr = didir->getDataTree(dh, fcnt, ntf);
TTree* tr = didir->getDataTree(dh, fcnt, ntf);
if (!tr) {
if (first) {
// dump metrics of file which is done for reading
dumpFileMetrics(monitoring, currentFile, tfCurrentFile, ntf);

// check if there is a next file to read
fcnt += device.maxInputTimeslices;
if (didir->atEnd(fcnt)) {
Expand Down Expand Up @@ -278,7 +291,6 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()

// add branches to read
// fill the table

auto colnames = getColumnNames(dh);
if (colnames.size() == 0) {
totalSizeCompressed += tr->GetZipBytes();
Expand All @@ -293,35 +305,19 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
}
}
t2t.fill(tr);
if (info.file) {
totalReadCalls += info.file->GetReadCalls() - before;
static std::string currentFileRead = "";
std::string nextFileRead = info.file->GetPath();
if (currentFileRead != nextFileRead) {
currentFileRead = nextFileRead;
std::string monitoringInfo(currentFileRead);
monitoringInfo += ",";
monitoringInfo += std::to_string(info.file->GetSize());
#if __has_include(<TJAlienFile.h>)
auto alienFile = dynamic_cast<TJAlienFile*>(info.file);
if (alienFile) {
monitoringInfo += ",";
monitoringInfo += alienFile->GetSE();
}
#endif
monitoring.send(Metric{monitoringInfo, "aod-file-read-info"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
LOGP(INFO, "File read info: {}", monitoringInfo);
// TODO extend to publish at the end of the file (or on each TF?) the sizes read *per file*
}
}
monitoring.send(Metric{(double)ps.GetReadCalls(), "aod-tree-read-calls"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
delete tr;

// needed for metrics dumping (upon next file read, or terminate due to watchdog)
auto info = didir->getFileFolder(dh, fcnt, ntf);
currentFile = info.file;
tfCurrentFile = didir->getTimeFramesInFile(dh, fcnt);

first = false;
}
monitoring.send(Metric{(uint64_t)ntf, "tf-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)totalReadCalls, "aod-total-read-calls"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));

// save file number and time frame
*fileCounter = (fcnt - device.inputTimesliceId) / device.maxInputTimeslices;
Expand Down
2 changes: 2 additions & 0 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h
Expand Up @@ -14,13 +14,15 @@
#include "Framework/TableBuilder.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/Logger.h"
#include <Monitoring/Monitoring.h>
#include <uv.h>

namespace o2::framework::readers
{

struct AODJAlienReaderHelpers {
static AlgorithmSpec rootFileReaderCallback();
static void dumpFileMetrics(o2::monitoring::Monitoring& monitoring, TFile* currentFile, int tfPerFile, int tfRead);
};

} // namespace o2::framework::readers
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DataInputDirector.h
Expand Up @@ -69,6 +69,7 @@ struct DataInputDescriptor {

uint64_t getTimeFrameNumber(int counter, int numTF);
FileAndFolder getFileFolder(int counter, int numTF);
int getTimeFramesInFile(int counter);

void closeInputFile();
bool isAlienSupportOn() { return mAlienSupport; }
Expand Down Expand Up @@ -114,6 +115,7 @@ struct DataInputDirector {
TTree* getDataTree(header::DataHeader dh, int counter, int numTF);
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF);
int getTimeFramesInFile(header::DataHeader dh, int counter);

private:
std::string minputfilesFile;
Expand Down
16 changes: 16 additions & 0 deletions Framework/Core/src/DataInputDirector.cxx
Expand Up @@ -164,6 +164,11 @@ FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF)
return fileAndFolder;
}

int DataInputDescriptor::getTimeFramesInFile(int counter)
{
return mfilenames.at(counter)->numberOfTimeFrames;
}

void DataInputDescriptor::closeInputFile()
{
if (mcurrentFile) {
Expand Down Expand Up @@ -526,6 +531,17 @@ FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counte
return didesc->getFileFolder(counter, numTF);
}

int DataInputDirector::getTimeFramesInFile(header::DataHeader dh, int counter)
{
auto didesc = getDataInputDescriptor(dh);
// if NOT match then use defaultDataInputDescriptor
if (!didesc) {
didesc = mdefaultDataInputDescriptor;
}

return didesc->getTimeFramesInFile(counter);
}

uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counter, int numTF)
{
auto didesc = getDataInputDescriptor(dh);
Expand Down
3 changes: 1 addition & 2 deletions Framework/Core/src/runDataProcessing.cxx
Expand Up @@ -1324,8 +1324,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
performanceMetrics.push_back("arrow-bytes-delta");
performanceMetrics.push_back("aod-bytes-read-uncompressed");
performanceMetrics.push_back("aod-bytes-read-compressed");
performanceMetrics.push_back("aod-total-read-calls");
performanceMetrics.push_back("aod-file-read-path");
performanceMetrics.push_back("aod-file-read-info");
ResourcesMonitoringHelper::dumpMetricsToJSON(metricsInfos, driverInfo.metrics, deviceSpecs, performanceMetrics);
}
// This is a clean exit. Before we do so, if required,
Expand Down