diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 5f55b56c24064..62a8fba88a207 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -152,6 +152,7 @@ foreach(t CallbackRegistry ChannelSpecHelpers CompletionPolicy + ComputingResourceHelpers ConfigParamRegistry ContextRegistry DataDescriptorMatcher @@ -273,7 +274,7 @@ foreach(w PUBLIC_LINK_LIBRARIES O2::Framework TIMEOUT 30 NO_BOOST_TEST - COMMAND_LINE_ARGS ${DPL_WORKFLOW_TESTS_EXTRA_OPTIONS} --run) + COMMAND_LINE_ARGS ${DPL_WORKFLOW_TESTS_EXTRA_OPTIONS} --run --shm-segment-size 20000000) endforeach() # TODO: DanglingInput test not working for the moment [ERROR] Unable to relay diff --git a/Framework/Core/include/Framework/ChannelSpec.h b/Framework/Core/include/Framework/ChannelSpec.h index 276b9441ed108..7844342e0bdc9 100644 --- a/Framework/Core/include/Framework/ChannelSpec.h +++ b/Framework/Core/include/Framework/ChannelSpec.h @@ -32,6 +32,12 @@ enum struct ChannelType { Pull, }; +/// The kind of backend to use for the channels +enum struct ChannelProtocol { + Network, + IPC +}; + /// This describes an input channel. Since they are point to /// point connections, there is not much to say about them. /// Notice that this should be considered read only once it @@ -42,6 +48,7 @@ struct InputChannelSpec { enum ChannelMethod method; std::string hostname; unsigned short port; + ChannelProtocol protocol = ChannelProtocol::Network; }; /// This describes an output channel. Output channels are semantically @@ -56,6 +63,7 @@ struct OutputChannelSpec { std::string hostname; unsigned short port; size_t listeners; + ChannelProtocol protocol = ChannelProtocol::Network; }; } // namespace framework diff --git a/Framework/Core/src/ChannelSpecHelpers.cxx b/Framework/Core/src/ChannelSpecHelpers.cxx index 0e9939d2de5de..309bf516944c2 100644 --- a/Framework/Core/src/ChannelSpecHelpers.cxx +++ b/Framework/Core/src/ChannelSpecHelpers.cxx @@ -44,14 +44,24 @@ char const* ChannelSpecHelpers::methodAsString(enum ChannelMethod method) std::string ChannelSpecHelpers::channelUrl(OutputChannelSpec const& channel) { - return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port) - : fmt::format("tcp://{}:{}", channel.hostname, channel.port); + switch (channel.protocol) { + case ChannelProtocol::IPC: + return fmt::format("ipc://{}_{}", channel.hostname, channel.port); + default: + return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port) + : fmt::format("tcp://{}:{}", channel.hostname, channel.port); + } } std::string ChannelSpecHelpers::channelUrl(InputChannelSpec const& channel) { - return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port) - : fmt::format("tcp://{}:{}", channel.hostname, channel.port); + switch (channel.protocol) { + case ChannelProtocol::IPC: + return fmt::format("ipc://{}_{}", channel.hostname, channel.port); + default: + return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port) + : fmt::format("tcp://{}:{}", channel.hostname, channel.port); + } } /// Stream operators so that we can use ChannelType with Boost.Test diff --git a/Framework/Core/src/ComputingResourceHelpers.cxx b/Framework/Core/src/ComputingResourceHelpers.cxx index b968d8b21c243..551a98118f30c 100644 --- a/Framework/Core/src/ComputingResourceHelpers.cxx +++ b/Framework/Core/src/ComputingResourceHelpers.cxx @@ -10,6 +10,7 @@ #include "ComputingResourceHelpers.h" #include #include +#include namespace o2::framework { @@ -20,15 +21,34 @@ long getTotalNumberOfBytes() return pages * page_size; }; -ComputingResource ComputingResourceHelpers::getLocalhostResource(unsigned short startPort, unsigned short rangeSize) +ComputingResource ComputingResourceHelpers::getLocalhostResource() { ComputingResource result; result.cpu = std::thread::hardware_concurrency(), result.memory = getTotalNumberOfBytes(); result.hostname = "localhost"; - result.startPort = startPort; - result.lastPort = startPort + rangeSize; + result.startPort = 22000; + result.lastPort = 23000; result.usedPorts = 0; return result; } + +std::vector ComputingResourceHelpers::parseResources(std::string const& resourceString) +{ + std::vector resources; + std::istringstream str{resourceString}; + std::string result; + while (std::getline(str, result, ',')) { + std::istringstream in{result}; + char colon; + ComputingResource resource; + std::getline(in, resource.hostname, ':'); + in >> resource.cpu >> colon >> resource.memory >> colon >> resource.startPort >> colon >> resource.lastPort; + resource.memory = resource.memory * 1000000; + resource.usedPorts = 0; + resources.emplace_back(resource); + } + return resources; +} + } // namespace o2::framework diff --git a/Framework/Core/src/ComputingResourceHelpers.h b/Framework/Core/src/ComputingResourceHelpers.h index eb9d00b162957..deea2b9219e4e 100644 --- a/Framework/Core/src/ComputingResourceHelpers.h +++ b/Framework/Core/src/ComputingResourceHelpers.h @@ -13,10 +13,21 @@ #include "Framework/ComputingResource.h" +#include +#include + namespace o2::framework { struct ComputingResourceHelpers { - static ComputingResource getLocalhostResource(unsigned short startPort, unsigned short rangeSize); + /// This will create a ComputingResource which matches what offered by localhost. + /// Notice that the port range will always be [22000, 23000) since in any case we will + /// use ipc:// in place of tcp:// + static ComputingResource getLocalhostResource(); + + /// Parse a string which contains resources specified in the following format: + /// + /// :::: + static std::vector parseResources(std::string const& resourceString); }; } // namespace o2::framework diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index 65eb34424ac6e..1af392b54e9b2 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -27,6 +27,7 @@ #include "Framework/OutputRoute.h" #include "Framework/WorkflowSpec.h" #include "Framework/ComputingResource.h" +#include "Framework/Logger.h" #include "WorkflowHelpers.h" @@ -625,7 +626,8 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(WorkflowSpec const& workf std::vector const& completionPolicies, std::vector const& dispatchPolicies, std::vector& devices, - ResourceManager& resourceManager) + ResourceManager& resourceManager, + std::string const& uniqueWorkflowId) { std::vector availableForwardsInfo; @@ -701,6 +703,40 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(WorkflowSpec const& workf } } } + + auto findDeviceIndex = [&deviceIndex](size_t processorIndex, size_t timeslice) { + for (auto& deviceEdge : deviceIndex) { + if (deviceEdge.processorIndex != processorIndex) { + continue; + } + if (deviceEdge.timeslice != timeslice) { + continue; + } + return deviceEdge.deviceIndex; + } + throw std::runtime_error("Unable to find device."); + }; + + // Optimize the topology when two devices are + // running on the same node. + for (auto& connection : connections) { + auto& device1 = devices[findDeviceIndex(connection.consumer, connection.timeIndex)]; + auto& device2 = devices[findDeviceIndex(connection.producer, connection.producerTimeIndex)]; + // No need to do anything if they are not on the same host + if (device1.resource.hostname != device2.resource.hostname) { + continue; + } + for (auto& input : device1.inputChannels) { + for (auto& output : device2.outputChannels) { + if (input.hostname == output.hostname && input.port == output.port) { + input.protocol = ChannelProtocol::IPC; + output.protocol = ChannelProtocol::IPC; + input.hostname += uniqueWorkflowId; + output.hostname += uniqueWorkflowId; + } + } + } + } } void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, @@ -756,8 +792,11 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, // FIXME: add some checksum in framework id. We could use this // to avoid redeploys when only a portion of the workflow is changed. // FIXME: this should probably be done in one go with char *, but I am lazy. - std::vector tmpArgs = {argv[0], "--id", spec.id.c_str(), "--control", "static", - "--log-color", "false", "--color", "false"}; + std::vector tmpArgs = {argv[0], + "--id", spec.id.c_str(), + "--control", "static", + "--log-color", "false", + "--color", "false"}; if (defaultStopped) { tmpArgs.push_back("-s"); } @@ -802,6 +841,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, bpo::options_description realOdesc = odesc; realOdesc.add_options()("child-driver", bpo::value()); realOdesc.add_options()("rate", bpo::value()); + realOdesc.add_options()("shm-segment-size", bpo::value()); filterArgsFct(expansions.we_wordc, expansions.we_wordv, realOdesc); wordfree(&expansions); return; @@ -871,6 +911,9 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, // FIXME: this should probably be reflected in the GUI std::ostringstream str; for (size_t ai = 0; ai < execution.args.size() - 1; ai++) { + if (execution.args[ai] == nullptr) { + LOG(ERROR) << "Bad argument for " << execution.args[ai - 1]; + } assert(execution.args[ai]); str << " " << execution.args[ai]; } @@ -889,6 +932,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("plugin-search-path,S", bpo::value(), "FairMQ plugins search path") // ("control-port", bpo::value(), "Utility port to be used by O2 Control") // ("rate", bpo::value(), "rate for a data source device (Hz)") // + ("shm-segment-size", bpo::value(), "size of the shared memory segment in bytes") // ("monitoring-backend", bpo::value(), "monitoring connection string") // ("infologger-mode", bpo::value(), "INFOLOGGER_MODE override") // ("infologger-severity", bpo::value(), "minimun FairLogger severity which goes to info logger") // diff --git a/Framework/Core/src/DeviceSpecHelpers.h b/Framework/Core/src/DeviceSpecHelpers.h index 73df1eefb7aa9..ad857a83f20fe 100644 --- a/Framework/Core/src/DeviceSpecHelpers.h +++ b/Framework/Core/src/DeviceSpecHelpers.h @@ -45,17 +45,19 @@ struct DeviceSpecHelpers { std::vector const& completionPolicies, std::vector const& dispatchPolicies, std::vector& devices, - ResourceManager& resourceManager); + ResourceManager& resourceManager, + std::string const& uniqueWorkflowId); static void dataProcessorSpecs2DeviceSpecs( const WorkflowSpec& workflow, std::vector const& channelPolicies, std::vector const& completionPolicies, std::vector& devices, - ResourceManager& resources) + ResourceManager& resourceManager, + std::string const& uniqueWorkflowId) { std::vector dispatchPolicies = DispatchPolicy::createDefaultPolicies(); - dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, dispatchPolicies, devices, resources); + dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, dispatchPolicies, devices, resourceManager, uniqueWorkflowId); } /// Helper to prepare the arguments which will be used to diff --git a/Framework/Core/src/DriverInfo.h b/Framework/Core/src/DriverInfo.h index 9edd071b035f0..b6bb3706b1f77 100644 --- a/Framework/Core/src/DriverInfo.h +++ b/Framework/Core/src/DriverInfo.h @@ -119,10 +119,13 @@ struct DriverInfo { /// The optional timeout after which the driver will request /// all the children to quit. double timeout; - /// The start port to use when looking for a free range - unsigned short startPort; - /// The size of the port range to consider allocated - unsigned short portRange; + /// The hostname which needs to be deployed by this instance of + /// the driver. By default it will be localhost + std::string deployHostname; + /// resources which are allocated for the whole workflow by + /// an external resource manager. If the value is an empty string + /// resources are obtained from the localhost. + std::string resources; /// The current set of metadata associated to each DataProcessor being /// executed. std::vector processorInfo; @@ -139,6 +142,8 @@ struct DriverInfo { float frameCost; /// The time between one frame and the other. float frameLatency; + /// The unique id used for ipc communications + std::string uniqueWorkflowId = ""; }; } // namespace framework diff --git a/Framework/Core/src/WorkflowHelpers.h b/Framework/Core/src/WorkflowHelpers.h index d41eb6a4a6d4f..4ec478bb6fd54 100644 --- a/Framework/Core/src/WorkflowHelpers.h +++ b/Framework/Core/src/WorkflowHelpers.h @@ -77,7 +77,7 @@ struct DeviceConnectionId { size_t consumer; size_t timeIndex; size_t producerTimeIndex; - size_t port; + uint16_t port; bool operator<(const DeviceConnectionId& rhs) const { diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 902d866cd6500..ca9564852fbe7 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -28,7 +28,6 @@ #include "Framework/DeviceSpec.h" #include "Framework/DeviceState.h" #include "Framework/FrameworkGUIDebugger.h" -#include "Framework/FreePortFinder.h" #include "Framework/LocalRootFileService.h" #include "Framework/LogParsingHelpers.h" #include "Framework/Logger.h" @@ -271,6 +270,31 @@ static void handle_sigint(int) static void handle_sigchld(int) { sigchld_requested = true; } +void spawnRemoteDevice(std::string const& forwardedStdin, + DeviceSpec const& spec, + std::map& socket2DeviceInfo, + DeviceControl& control, + DeviceExecution& execution, + std::vector& deviceInfos, + int& maxFd, fd_set& childFdset) +{ + LOG(INFO) << "Starting " << spec.id << " as remote device"; + DeviceInfo info; + // FIXME: we should make sure we do not sent a kill to pid 0. + info.pid = 0; + info.active = true; + info.readyToQuit = false; + info.historySize = 1000; + info.historyPos = 0; + info.maxLogLevel = LogParsingHelpers::LogLevel::Debug; + info.dataRelayerViewIndex = Metric2DViewIndex{"data_relayer", 0, 0, {}}; + info.variablesViewIndex = Metric2DViewIndex{"matcher_variables", 0, 0, {}}; + info.queriesViewIndex = Metric2DViewIndex{"data_queries", 0, 0, {}}; + + deviceInfos.emplace_back(info); + // Let's add also metrics information for the given device + gDeviceMetricsInfos.emplace_back(DeviceMetricsInfo{}); +} /// This will start a new device by forking and executing a /// new child void spawnDevice(std::string const& forwardedStdin, @@ -732,8 +756,13 @@ int runStateMachine(DataProcessorSpecs const& workflow, DeviceExecutions deviceExecutions; DataProcessorInfos dataProcessorInfos = previousDataProcessorInfos; - std::vector resources{ - ComputingResourceHelpers::getLocalhostResource(driverInfo.startPort, driverInfo.portRange)}; + std::vector resources; + + if (driverInfo.resources != "") { + resources = ComputingResourceHelpers::parseResources(driverInfo.resources); + } else { + resources = {ComputingResourceHelpers::getLocalhostResource()}; + } auto resourceManager = std::make_unique(resources); @@ -848,7 +877,8 @@ int runStateMachine(DataProcessorSpecs const& workflow, driverInfo.completionPolicies, driverInfo.dispatchPolicies, deviceSpecs, - *resourceManager); + *resourceManager, + driverInfo.uniqueWorkflowId); // This should expand nodes so that we can build a consistent DAG. } catch (std::runtime_error& e) { std::cerr << "Invalid workflow: " << e.what() << std::endl; @@ -907,12 +937,19 @@ int runStateMachine(DataProcessorSpecs const& workflow, dataProcessorInfos, deviceSpecs, deviceExecutions, controls); + std::ostringstream forwardedStdin; WorkflowSerializationHelpers::dump(forwardedStdin, workflow, dataProcessorInfos); for (size_t di = 0; di < deviceSpecs.size(); ++di) { - spawnDevice(forwardedStdin.str(), - deviceSpecs[di], driverInfo.socket2DeviceInfo, controls[di], deviceExecutions[di], infos, - driverInfo.maxFd, driverInfo.childFdset); + if (deviceSpecs[di].resource.hostname != driverInfo.deployHostname) { + spawnRemoteDevice(forwardedStdin.str(), + deviceSpecs[di], driverInfo.socket2DeviceInfo, controls[di], deviceExecutions[di], infos, + driverInfo.maxFd, driverInfo.childFdset); + } else { + spawnDevice(forwardedStdin.str(), + deviceSpecs[di], driverInfo.socket2DeviceInfo, controls[di], deviceExecutions[di], infos, + driverInfo.maxFd, driverInfo.childFdset); + } } driverInfo.maxFd += 1; assert(infos.empty() == false); @@ -1205,6 +1242,8 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, ("stop,s", bpo::value()->zero_tokens()->default_value(false), "stop before device start") // ("single-step", bpo::value()->zero_tokens()->default_value(false), "start in single step mode") // ("batch,b", bpo::value()->zero_tokens()->default_value(false), "batch processing mode") // + ("hostname", bpo::value()->default_value("localhost"), "hostname to deploy") // + ("resources", bpo::value()->default_value(""), "resources allocated for the workflow") // ("start-port,p", bpo::value()->default_value(22000), "start port to allocate") // ("port-range,pr", bpo::value()->default_value(1000), "ports in range") // ("completion-policy,c", bpo::value(&policy)->default_value(TerminationPolicy::QUIT), // @@ -1219,8 +1258,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, executorOptions.add(DeviceSpecHelpers::getForwardedDeviceOptions()); gHiddenDeviceOptions.add_options() // - ((std::string("id") + ",i").c_str(), bpo::value(), // - "device id for child spawning") // + ("id,i", bpo::value(), "device id for child spawning") // ("channel-config", bpo::value>(), "channel configuration") // ("control", "control plugin") // ("log-color", "logging color scheme")("color", "logging color scheme"); @@ -1316,8 +1354,9 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, driverInfo.terminationPolicy = varmap["completion-policy"].as(); driverInfo.startTime = std::chrono::steady_clock::now(); driverInfo.timeout = varmap["timeout"].as(); - driverInfo.startPort = varmap["start-port"].as(); - driverInfo.portRange = varmap["port-range"].as(); + driverInfo.deployHostname = varmap["hostname"].as(); + driverInfo.resources = varmap["resources"].as(); + // FIXME: should use the whole dataProcessorInfos, actually... driverInfo.processorInfo = dataProcessorInfos; driverInfo.configContext = &configContext; @@ -1325,15 +1364,11 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, std::string frameworkId; // If the id is set, this means this is a device, // otherwise this is the driver. - FreePortFinder finder(driverInfo.startPort - 1, - 65535 - driverInfo.portRange, - driverInfo.portRange); if (varmap.count("id")) { frameworkId = varmap["id"].as(); + driverInfo.uniqueWorkflowId = fmt::format("{}", getppid()); } else { - finder.scan(); - driverInfo.startPort = finder.port(); - driverInfo.portRange = finder.range(); + driverInfo.uniqueWorkflowId = fmt::format("{}", getpid()); } return runStateMachine(physicalWorkflow, currentWorkflow, diff --git a/Framework/Core/test/test_ComputingResourceHelpers.cxx b/Framework/Core/test/test_ComputingResourceHelpers.cxx new file mode 100644 index 0000000000000..a1d8e2a3baae1 --- /dev/null +++ b/Framework/Core/test/test_ComputingResourceHelpers.cxx @@ -0,0 +1,47 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#define BOOST_TEST_MODULE Test Framework ComputingResourceHelpers +#define BOOST_TEST_MAIN +#define BOOST_TEST_DYN_LINK +#include + +#include "../src/ComputingResourceHelpers.h" +#include +#include + +using namespace o2::framework; + +BOOST_AUTO_TEST_CASE(TestResourceParsing) +{ + auto test1 = "foo:16:1000:22000:23000"; + auto test2 = "foo:16:1000:22000:23000,bar:8:500:22000:23000"; + + auto resources = ComputingResourceHelpers::parseResources(test1); + BOOST_REQUIRE_EQUAL(resources.size(), 1); + BOOST_CHECK_EQUAL(resources[0].cpu, 16); + BOOST_CHECK_EQUAL(resources[0].memory, 1000000000); + BOOST_CHECK_EQUAL(resources[0].hostname, "foo"); + BOOST_CHECK_EQUAL(resources[0].startPort, 22000); + BOOST_CHECK_EQUAL(resources[0].lastPort, 23000); + + resources = ComputingResourceHelpers::parseResources(test2); + BOOST_REQUIRE_EQUAL(resources.size(), 2); + BOOST_CHECK_EQUAL(resources[0].cpu, 16); + BOOST_CHECK_EQUAL(resources[0].memory, 1000000000); + BOOST_CHECK_EQUAL(resources[0].hostname, "foo"); + BOOST_CHECK_EQUAL(resources[0].startPort, 22000); + BOOST_CHECK_EQUAL(resources[0].lastPort, 23000); + BOOST_CHECK_EQUAL(resources[1].cpu, 8); + BOOST_CHECK_EQUAL(resources[1].memory, 500000000); + BOOST_CHECK_EQUAL(resources[1].hostname, "bar"); + BOOST_CHECK_EQUAL(resources[1].startPort, 22000); + BOOST_CHECK_EQUAL(resources[1].lastPort, 23000); +} diff --git a/Framework/Core/test/test_DeviceSpec.cxx b/Framework/Core/test/test_DeviceSpec.cxx index c424f13c62af3..fcf2f096e61d5 100644 --- a/Framework/Core/test/test_DeviceSpec.cxx +++ b/Framework/Core/test/test_DeviceSpec.cxx @@ -46,7 +46,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1) BOOST_REQUIRE_EQUAL(completionPolicies.empty(), false); std::vector devices; - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; BOOST_REQUIRE_EQUAL(resources.size(), 1); BOOST_CHECK_EQUAL(resources[0].startPort, 22000); SimpleResourceManager rm(resources); @@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1) BOOST_CHECK_EQUAL(offers[0].startPort, 22000); BOOST_CHECK_EQUAL(offers[0].rangeSize, 1000); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); BOOST_REQUIRE_EQUAL(devices.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -88,9 +88,9 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1PushPull) BOOST_REQUIRE_EQUAL(channelPolicies.empty(), false); std::vector devices; - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); BOOST_CHECK_EQUAL(devices.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -132,9 +132,9 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec2) auto completionPolicies = CompletionPolicy::createDefaultPolicies(); std::vector devices; - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); BOOST_CHECK_EQUAL(devices.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -174,9 +174,9 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec3) auto completionPolicies = CompletionPolicy::createDefaultPolicies(); std::vector devices; - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); BOOST_CHECK_EQUAL(devices.size(), 3); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -221,10 +221,10 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec4) auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); std::vector devices; - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); BOOST_CHECK_EQUAL(devices.size(), 4); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -291,9 +291,9 @@ BOOST_AUTO_TEST_CASE(TestTopologyForwarding) auto completionPolicies = CompletionPolicy::createDefaultPolicies(); std::vector devices; - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); BOOST_CHECK_EQUAL(devices.size(), 3); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -406,7 +406,7 @@ BOOST_AUTO_TEST_CASE(TestOutEdgeProcessingHelpers) WorkflowSpec workflow = defineDataProcessing7(); auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); ComputingOffer defaultOffer; defaultOffer.cpu = 0.01; @@ -568,9 +568,9 @@ BOOST_AUTO_TEST_CASE(TestTopologyLayeredTimePipeline) std::vector devices; auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); BOOST_CHECK_EQUAL(devices.size(), 6); BOOST_CHECK_EQUAL(devices[0].id, "A"); BOOST_CHECK_EQUAL(devices[1].id, "B_t0"); @@ -688,7 +688,7 @@ WorkflowSpec defineDataProcessing8() BOOST_AUTO_TEST_CASE(TestSimpleWildcard) { auto workflow = defineDataProcessing8(); - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); diff --git a/Framework/Core/test/test_DeviceSpecHelpers.cxx b/Framework/Core/test/test_DeviceSpecHelpers.cxx index a36ee5d11a305..a584c51677d48 100644 --- a/Framework/Core/test/test_DeviceSpecHelpers.cxx +++ b/Framework/Core/test/test_DeviceSpecHelpers.cxx @@ -132,14 +132,14 @@ BOOST_AUTO_TEST_CASE(test_prepareArguments) std::vector deviceSpecs; - std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources = {ComputingResourceHelpers::getLocalhostResource()}; auto rm = std::make_unique(resources); DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, ChannelConfigurationPolicy::createDefaultPolicies(), CompletionPolicy::createDefaultPolicies(), deviceSpecs, - *rm); + *rm, "workflow-id"); // Now doing the test cases CheckMatrix matrix; diff --git a/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx b/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx index ac70194d326b0..8eac79fc008be 100644 --- a/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx +++ b/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx @@ -64,16 +64,16 @@ WorkflowSpec defineDataProcessing() }}}; } -BOOST_AUTO_TEST_CASE(TestGraphviz) +BOOST_AUTO_TEST_CASE(TestDDS) { auto workflow = defineDataProcessing(); std::ostringstream ss{""}; auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); std::vector devices; - std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources{ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); std::vector controls; std::vector executions; controls.resize(devices.size()); diff --git a/Framework/Core/test/test_Graphviz.cxx b/Framework/Core/test/test_Graphviz.cxx index 5c4910b66fad6..262c104714900 100644 --- a/Framework/Core/test/test_Graphviz.cxx +++ b/Framework/Core/test/test_Graphviz.cxx @@ -97,9 +97,9 @@ BOOST_AUTO_TEST_CASE(TestGraphviz) } auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources = {ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); str.str(""); GraphvizHelpers::dumpDeviceSpec2Graphviz(str, devices); lineByLineComparision(str.str(), R"EXPECTED(digraph structs { @@ -135,9 +135,9 @@ BOOST_AUTO_TEST_CASE(TestGraphvizWithPipeline) } auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources = {ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); str.str(""); GraphvizHelpers::dumpDeviceSpec2Graphviz(str, devices); lineByLineComparision(str.str(), R"EXPECTED(digraph structs { diff --git a/Framework/Core/test/test_TimeParallelPipelining.cxx b/Framework/Core/test/test_TimeParallelPipelining.cxx index a2975f5403644..07b4113f5faa9 100644 --- a/Framework/Core/test/test_TimeParallelPipelining.cxx +++ b/Framework/Core/test/test_TimeParallelPipelining.cxx @@ -54,9 +54,9 @@ BOOST_AUTO_TEST_CASE(TimePipeliningSimple) std::vector devices; auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources = {ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); BOOST_REQUIRE_EQUAL(devices.size(), 4); auto& producer = devices[0]; auto& layer0Consumer0 = devices[1]; @@ -106,9 +106,9 @@ BOOST_AUTO_TEST_CASE(TimePipeliningFull) std::vector devices; auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + std::vector resources = {ComputingResourceHelpers::getLocalhostResource()}; SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm, "workflow-id"); BOOST_REQUIRE_EQUAL(devices.size(), 7); auto& producer = devices[0]; auto& layer0Consumer0 = devices[1];