From 85707a33506668977ba5d510a596fb632d950a19 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Wed, 24 Mar 2021 14:02:34 +0100 Subject: [PATCH] DPL: Reconstruct the full command and propagate it to workflow dump tools --- Framework/Core/CMakeLists.txt | 1 + .../Core/include/Framework/CommandInfo.h | 31 ++++++++++++ .../Core/include/Framework/DriverControl.h | 4 +- Framework/Core/src/CommandInfo.cxx | 48 +++++++++++++++++++ Framework/Core/src/O2ControlHelpers.cxx | 5 +- Framework/Core/src/O2ControlHelpers.h | 4 +- .../Core/src/WorkflowSerializationHelpers.cxx | 29 +++++++++-- .../Core/src/WorkflowSerializationHelpers.h | 7 ++- Framework/Core/src/runDataProcessing.cxx | 30 ++++++++---- .../Core/test/test_WorkflowSerialization.cxx | 10 ++-- 10 files changed, 145 insertions(+), 24 deletions(-) create mode 100644 Framework/Core/include/Framework/CommandInfo.h create mode 100644 Framework/Core/src/CommandInfo.cxx diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 8041006686860..d3cf509beaad3 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -24,6 +24,7 @@ o2_add_library(Framework src/ChannelMatching.cxx src/ChannelConfigurationPolicyHelpers.cxx src/ChannelSpecHelpers.cxx + src/CommandInfo.cxx src/CommonDataProcessors.cxx src/CommonServices.cxx src/CommonMessageBackends.cxx diff --git a/Framework/Core/include/Framework/CommandInfo.h b/Framework/Core/include/Framework/CommandInfo.h new file mode 100644 index 0000000000000..c14e1a6527658 --- /dev/null +++ b/Framework/Core/include/Framework/CommandInfo.h @@ -0,0 +1,31 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef O2_FRAMEWORK_COMMANDINFO_H_ +#define O2_FRAMEWORK_COMMANDINFO_H_ + +#include + +namespace o2::framework +{ + +struct CommandInfo { + CommandInfo() = default; + CommandInfo(std::string command) : command(std::move(command)) {} + CommandInfo(int argc, char* const* argv); + + void merge(CommandInfo const& other); + + std::string command; +}; + +} // namespace o2::framework + +#endif //O2_FRAMEWORK_COMMANDINFO_H_ \ No newline at end of file diff --git a/Framework/Core/include/Framework/DriverControl.h b/Framework/Core/include/Framework/DriverControl.h index b9ac9f303d41e..4112f2c5808a7 100644 --- a/Framework/Core/include/Framework/DriverControl.h +++ b/Framework/Core/include/Framework/DriverControl.h @@ -13,6 +13,7 @@ #include #include +#include "Framework/CommandInfo.h" #include "Framework/DriverInfo.h" #include "Framework/DataProcessorSpec.h" #include "Framework/DeviceSpec.h" @@ -37,7 +38,8 @@ struct DriverControl { using Callback = std::function const& workflow, std::vector const&, std::vector const&, - std::vector&)>; + std::vector&, + CommandInfo const&)>; /// States to be added to the stack on next iteration /// of the state machine processing. std::vector forcedTransitions; diff --git a/Framework/Core/src/CommandInfo.cxx b/Framework/Core/src/CommandInfo.cxx new file mode 100644 index 0000000000000..03128f126abc2 --- /dev/null +++ b/Framework/Core/src/CommandInfo.cxx @@ -0,0 +1,48 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "Framework/CommandInfo.h" + +#include +#include +#include + +namespace o2::framework +{ + +CommandInfo::CommandInfo(int argc, char* const* argv) +{ + assert(argc > 0); + + std::stringstream commandStream; + commandStream << argv[0]; + + for (size_t ai = 1; ai < argc; ++ai) { + const char* arg = argv[ai]; + if (strpbrk(arg, "\" ;@") != nullptr || arg[0] == 0) { + commandStream << " '" << arg << "'"; + } else if (strpbrk(arg, "'") != nullptr) { + commandStream << " \"" << arg << "\""; + } else { + commandStream << " " << arg; + } + } + command = commandStream.str(); +} + +void CommandInfo::merge(CommandInfo const& other) +{ + if (!command.empty()) { + command += " | "; + } + command += other.command; +} + +} // namespace o2::framework \ No newline at end of file diff --git a/Framework/Core/src/O2ControlHelpers.cxx b/Framework/Core/src/O2ControlHelpers.cxx index 89bc245bc1319..3792bdee4fe44 100644 --- a/Framework/Core/src/O2ControlHelpers.cxx +++ b/Framework/Core/src/O2ControlHelpers.cxx @@ -292,13 +292,14 @@ void dumpWorkflow(std::ostream& dumpOut, const std::vector& specs, c auto& execution = executions[di]; dumpRole(dumpOut, taskName(workflowName, spec.id), spec, specs, execution, indLevel + indScheme); } -}; +} } // namespace implementation void dumpDeviceSpec2O2Control(std::string workflowName, const std::vector& specs, - const std::vector& executions) + const std::vector& executions, + const CommandInfo&) { const char* tasksDirectory = "tasks"; const char* workflowsDirectory = "workflows"; diff --git a/Framework/Core/src/O2ControlHelpers.h b/Framework/Core/src/O2ControlHelpers.h index ac85093570376..5395938c07161 100644 --- a/Framework/Core/src/O2ControlHelpers.h +++ b/Framework/Core/src/O2ControlHelpers.h @@ -12,6 +12,7 @@ #include "Framework/DeviceSpec.h" #include "Framework/DeviceExecution.h" +#include "Framework/CommandInfo.h" #include #include @@ -40,7 +41,8 @@ namespace framework void dumpDeviceSpec2O2Control(std::string workflowName, std::vector const& specs, - std::vector const& executions); + std::vector const& executions, + CommandInfo const& commandInfo); } // namespace framework } // namespace o2 diff --git a/Framework/Core/src/WorkflowSerializationHelpers.cxx b/Framework/Core/src/WorkflowSerializationHelpers.cxx index c84e010812ad2..a99ea86bbddb9 100644 --- a/Framework/Core/src/WorkflowSerializationHelpers.cxx +++ b/Framework/Core/src/WorkflowSerializationHelpers.cxx @@ -35,6 +35,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, IN_EXECUTION, IN_WORKFLOW, IN_METADATA, + IN_COMMAND, IN_DATAPROCESSORS, IN_DATAPROCESSOR, IN_DATAPROCESSOR_NAME, @@ -87,6 +88,9 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, case State::IN_WORKFLOW: s << "IN_WORKFLOW"; break; + case State::IN_COMMAND: + s << "IN_COMMAND"; + break; case State::IN_DATAPROCESSORS: s << "IN_DATAPROCESSORS"; break; @@ -209,10 +213,12 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, } WorkflowImporter(std::vector& o, - std::vector& m) + std::vector& m, + CommandInfo& c) : states{}, dataProcessors{o}, - metadata{m} + metadata{m}, + command{c} { push(State::IN_START); } @@ -245,6 +251,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, metadata.push_back(DataProcessorInfo{}); } else if (in(State::IN_METADATUM)) { metadata.push_back(DataProcessorInfo{}); + } else if (in(State::IN_COMMAND)) { + command = CommandInfo{}; } return true; } @@ -469,6 +477,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, push(State::IN_WORKFLOW_OPTIONS); } else if (in(State::IN_METADATUM) && strncmp(str, "channels", length) == 0) { push(State::IN_METADATUM_CHANNELS); + } else if (in(State::IN_EXECUTION) && strncmp(str, "command", length) == 0) { + push(State::IN_COMMAND); } return true; } @@ -519,6 +529,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, // This is in an array, so we do not actually want to // exit from the state. push(State::IN_METADATUM_CHANNEL); + } else if (in(State::IN_COMMAND)) { + command.merge({s}); } pop(); return true; @@ -605,6 +617,7 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, std::string spec; std::vector& dataProcessors; std::vector& metadata; + CommandInfo& command; std::vector inputOptions; std::string binding; header::DataOrigin origin; @@ -623,7 +636,8 @@ struct WorkflowImporter : public rapidjson::BaseReaderHandler, void WorkflowSerializationHelpers::import(std::istream& s, std::vector& workflow, - std::vector& metadata) + std::vector& metadata, + CommandInfo& command) { // Skip any line which does not start with '{' // If we do not find a starting {, we simply assume that no workflow @@ -642,7 +656,7 @@ void WorkflowSerializationHelpers::import(std::istream& s, } rapidjson::Reader reader; rapidjson::IStreamWrapper isw(s); - WorkflowImporter importer{workflow, metadata}; + WorkflowImporter importer{workflow, metadata, command}; bool ok = reader.Parse(isw, importer); if (ok == false) { throw std::runtime_error("Error while parsing serialised workflow"); @@ -651,7 +665,8 @@ void WorkflowSerializationHelpers::import(std::istream& s, void WorkflowSerializationHelpers::dump(std::ostream& out, std::vector const& workflow, - std::vector const& metadata) + std::vector const& metadata, + CommandInfo const& commandInfo) { rapidjson::OStreamWrapper osw(out); rapidjson::PrettyWriter w(osw); @@ -841,6 +856,10 @@ void WorkflowSerializationHelpers::dump(std::ostream& out, w.EndObject(); } w.EndArray(); + + w.Key("command"); + w.String(commandInfo.command.c_str()); + w.EndObject(); } diff --git a/Framework/Core/src/WorkflowSerializationHelpers.h b/Framework/Core/src/WorkflowSerializationHelpers.h index 1d9bd7fa8f904..11585aec401fe 100644 --- a/Framework/Core/src/WorkflowSerializationHelpers.h +++ b/Framework/Core/src/WorkflowSerializationHelpers.h @@ -12,6 +12,7 @@ #include "Framework/DataProcessorSpec.h" #include "Framework/DataProcessorInfo.h" +#include "Framework/CommandInfo.h" #include #include @@ -22,10 +23,12 @@ namespace o2::framework struct WorkflowSerializationHelpers { static void import(std::istream& s, std::vector& workflow, - std::vector& metadata); + std::vector& metadata, + CommandInfo& command); static void dump(std::ostream& o, std::vector const& workflow, - std::vector const& metadata); + std::vector const& metadata, + CommandInfo const& commandInfo); }; } // namespace o2::framework diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index c73bce269100a..0d42ca701ff01 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -40,6 +40,7 @@ #include "Framework/DataProcessorInfo.h" #include "Framework/DriverInfo.h" #include "Framework/DriverControl.h" +#include "Framework/CommandInfo.h" #include "DriverServerContext.h" #include "ControlServiceHelpers.h" #include "HTTPParser.h" @@ -1097,6 +1098,7 @@ void single_step_callback(uv_timer_s* ctx) int runStateMachine(DataProcessorSpecs const& workflow, WorkflowInfo const& workflowInfo, DataProcessorInfos const& previousDataProcessorInfos, + CommandInfo const& commandInfo, DriverControl& driverControl, DriverInfo& driverInfo, std::vector& metricsInfos, @@ -1491,7 +1493,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, // restart the data processors which need to be restarted. LOG(INFO) << "Redeployment of configuration asked."; std::ostringstream forwardedStdin; - WorkflowSerializationHelpers::dump(forwardedStdin, workflow, dataProcessorInfos); + WorkflowSerializationHelpers::dump(forwardedStdin, workflow, dataProcessorInfos, commandInfo); infos.reserve(deviceSpecs.size()); // This is guaranteed to be a single CPU. @@ -1673,7 +1675,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, } case DriverState::PERFORM_CALLBACKS: for (auto& callback : driverControl.callbacks) { - callback(workflow, deviceSpecs, deviceExecutions, dataProcessorInfos); + callback(workflow, deviceSpecs, deviceExecutions, dataProcessorInfos, commandInfo); } driverControl.callbacks.clear(); break; @@ -1844,7 +1846,8 @@ void initialiseDriverControl(bpo::variables_map const& varmap, control.callbacks = {[](WorkflowSpec const& workflow, DeviceSpecs const& specs, DeviceExecutions const&, - DataProcessorInfos&) { + DataProcessorInfos&, + CommandInfo const&) { GraphvizHelpers::dumpDeviceSpec2Graphviz(std::cout, specs); }}; control.forcedTransitions = { @@ -1862,7 +1865,8 @@ void initialiseDriverControl(bpo::variables_map const& varmap, control.callbacks = {[](WorkflowSpec const& workflow, DeviceSpecs const& specs, DeviceExecutions const& executions, - DataProcessorInfos&) { + DataProcessorInfos&, + CommandInfo const&) { dumpDeviceSpec2DDS(std::cout, specs, executions); }}; control.forcedTransitions = { @@ -1877,8 +1881,9 @@ void initialiseDriverControl(bpo::variables_map const& varmap, (WorkflowSpec const& workflow, DeviceSpecs const& specs, DeviceExecutions const& executions, - DataProcessorInfos&) { - dumpDeviceSpec2O2Control(workflowName, specs, executions); + DataProcessorInfos&, + CommandInfo const& commandInfo) { + dumpDeviceSpec2O2Control(workflowName, specs, executions, commandInfo); }}; control.forcedTransitions = { DriverState::EXIT, // @@ -1903,14 +1908,15 @@ void initialiseDriverControl(bpo::variables_map const& varmap, control.callbacks = {[filename = varmap["dump-workflow-file"].as()](WorkflowSpec const& workflow, DeviceSpecs const devices, DeviceExecutions const&, - DataProcessorInfos& dataProcessorInfos) { + DataProcessorInfos& dataProcessorInfos, + CommandInfo const& commandInfo) { if (filename == "-") { - WorkflowSerializationHelpers::dump(std::cout, workflow, dataProcessorInfos); + WorkflowSerializationHelpers::dump(std::cout, workflow, dataProcessorInfos, commandInfo); // FIXME: this is to avoid trailing garbage.. exit(0); } else { std::ofstream output(filename); - WorkflowSerializationHelpers::dump(output, workflow, dataProcessorInfos); + WorkflowSerializationHelpers::dump(output, workflow, dataProcessorInfos, commandInfo); } }}; control.forcedTransitions = { @@ -2078,9 +2084,10 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, } std::vector dataProcessorInfos; + CommandInfo commandInfo{}; if (isatty(STDIN_FILENO) == false) { std::vector importedWorkflow; - WorkflowSerializationHelpers::import(std::cin, importedWorkflow, dataProcessorInfos); + WorkflowSerializationHelpers::import(std::cin, importedWorkflow, dataProcessorInfos, commandInfo); size_t workflowHashB = 0; for (auto& dp : importedWorkflow) { @@ -2260,6 +2267,8 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, driverInfo.processorInfo = dataProcessorInfos; driverInfo.configContext = &configContext; + commandInfo.merge(CommandInfo(argc, argv)); + std::string frameworkId; // If the id is set, this means this is a device, // otherwise this is the driver. @@ -2274,6 +2283,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, return runStateMachine(physicalWorkflow, currentWorkflow, dataProcessorInfos, + commandInfo, driverControl, driverInfo, gDeviceMetricsInfos, diff --git a/Framework/Core/test/test_WorkflowSerialization.cxx b/Framework/Core/test/test_WorkflowSerialization.cxx index 4618b5e28813c..e3ea1d838f631 100644 --- a/Framework/Core/test/test_WorkflowSerialization.cxx +++ b/Framework/Core/test/test_WorkflowSerialization.cxx @@ -57,19 +57,23 @@ BOOST_AUTO_TEST_CASE(TestVerifyWorkflow) {"D", "test_Framework_test_SerializationWorkflow", {}}, }; + CommandInfo commandInfoOut{"o2-dpl-workflow -b --option 1 --option 2"}; + std::vector metadataIn{}; + CommandInfo commandInfoIn; std::ostringstream firstDump; - WorkflowSerializationHelpers::dump(firstDump, w0, metadataOut); + WorkflowSerializationHelpers::dump(firstDump, w0, metadataOut, commandInfoOut); std::istringstream is; is.str(firstDump.str()); WorkflowSpec w1; - WorkflowSerializationHelpers::import(is, w1, metadataIn); + WorkflowSerializationHelpers::import(is, w1, metadataIn, commandInfoIn); std::ostringstream secondDump; - WorkflowSerializationHelpers::dump(secondDump, w1, metadataIn); + WorkflowSerializationHelpers::dump(secondDump, w1, metadataIn, commandInfoIn); BOOST_REQUIRE_EQUAL(w0.size(), 4); BOOST_REQUIRE_EQUAL(w0.size(), w1.size()); BOOST_CHECK_EQUAL(firstDump.str(), secondDump.str()); + BOOST_CHECK_EQUAL(commandInfoIn.command, commandInfoOut.command); }