diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index effd89333ea5a..312fa8968ac86 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -36,6 +36,7 @@ o2_add_library(Framework src/CommonDataProcessors.cxx src/CompletionPolicy.cxx src/CompletionPolicyHelpers.cxx + src/ComputingResourceHelpers.cxx src/DispatchPolicy.cxx src/ConfigParamsHelper.cxx src/DDSConfigHelpers.cxx diff --git a/Framework/Core/include/Framework/ChannelSpec.h b/Framework/Core/include/Framework/ChannelSpec.h index 5fd9a558ce19d..276b9441ed108 100644 --- a/Framework/Core/include/Framework/ChannelSpec.h +++ b/Framework/Core/include/Framework/ChannelSpec.h @@ -40,6 +40,7 @@ struct InputChannelSpec { std::string name; enum ChannelType type; enum ChannelMethod method; + std::string hostname; unsigned short port; }; @@ -52,6 +53,7 @@ struct OutputChannelSpec { std::string name; enum ChannelType type; enum ChannelMethod method; + std::string hostname; unsigned short port; size_t listeners; }; diff --git a/Framework/Core/include/Framework/ComputingResource.h b/Framework/Core/include/Framework/ComputingResource.h new file mode 100644 index 0000000000000..27b54005ff315 --- /dev/null +++ b/Framework/Core/include/Framework/ComputingResource.h @@ -0,0 +1,49 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_COMPUTINGRESOURCE_H_ +#define O2_FRAMEWORK_COMPUTINGRESOURCE_H_ + +#include + +namespace o2::framework +{ + +struct ComputingOffer { + float cpu = 0; + float memory = 0; + std::string hostname = ""; + unsigned short startPort = 0; + unsigned short rangeSize = 0; +}; + +/// A computing resource which can be offered to run a device +struct ComputingResource { + ComputingResource() = default; + ComputingResource(ComputingOffer const& offer) + : cpu(offer.cpu), + memory(offer.memory), + hostname(offer.hostname), + startPort(offer.startPort), + lastPort(offer.startPort), + usedPorts(0) + { + } + + float cpu = 0; + float memory = 0; + std::string hostname = ""; + unsigned short startPort = 0; + unsigned short lastPort = 0; + unsigned short usedPorts = 0; +}; + +} // namespace o2::framework + +#endif // O2_FRAMEWORK_COMPUTINGRESOURCES_H_ diff --git a/Framework/Core/include/Framework/DeviceSpec.h b/Framework/Core/include/Framework/DeviceSpec.h index da9ff7081d541..bcfcf2ae61368 100644 --- a/Framework/Core/include/Framework/DeviceSpec.h +++ b/Framework/Core/include/Framework/DeviceSpec.h @@ -11,6 +11,7 @@ #define FRAMEWORK_DEVICESPEC_H #include "Framework/WorkflowSpec.h" +#include "Framework/ComputingResource.h" #include "Framework/DataProcessorSpec.h" #include "Framework/ChannelSpec.h" #include "Framework/ChannelInfo.h" @@ -54,6 +55,7 @@ struct DeviceSpec { /// The completion policy to use for this device. CompletionPolicy completionPolicy; DispatchPolicy dispatchPolicy; + ComputingResource resource; }; } // namespace framework diff --git a/Framework/Core/src/ChannelSpecHelpers.cxx b/Framework/Core/src/ChannelSpecHelpers.cxx index 29fe88ea98f1b..0e9939d2de5de 100644 --- a/Framework/Core/src/ChannelSpecHelpers.cxx +++ b/Framework/Core/src/ChannelSpecHelpers.cxx @@ -8,13 +8,12 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "ChannelSpecHelpers.h" +#include #include #include #include -namespace o2 -{ -namespace framework +namespace o2::framework { char const* ChannelSpecHelpers::typeAsString(enum ChannelType type) @@ -43,9 +42,16 @@ char const* ChannelSpecHelpers::methodAsString(enum ChannelMethod method) throw std::runtime_error("Unknown ChannelMethod"); } -char const* ChannelSpecHelpers::methodAsUrl(enum ChannelMethod method) +std::string ChannelSpecHelpers::channelUrl(OutputChannelSpec const& channel) +{ + return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port) + : fmt::format("tcp://{}:{}", channel.hostname, channel.port); +} + +std::string ChannelSpecHelpers::channelUrl(InputChannelSpec const& channel) { - return (method == ChannelMethod::Bind ? "tcp://*:%d" : "tcp://127.0.0.1:%d"); + return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port) + : fmt::format("tcp://{}:{}", channel.hostname, channel.port); } /// Stream operators so that we can use ChannelType with Boost.Test @@ -62,5 +68,4 @@ std::ostream& operator<<(std::ostream& s, ChannelMethod const& method) return s; } -} // namespace framework -} // namespace o2 +} // namespace o2::framework diff --git a/Framework/Core/src/ChannelSpecHelpers.h b/Framework/Core/src/ChannelSpecHelpers.h index 647563a679250..41276e50a2f90 100644 --- a/Framework/Core/src/ChannelSpecHelpers.h +++ b/Framework/Core/src/ChannelSpecHelpers.h @@ -26,10 +26,10 @@ struct ChannelSpecHelpers { static char const* typeAsString(enum ChannelType type); /// return a ChannelMethod as a lowercase string static char const* methodAsString(enum ChannelMethod method); - /// return a ChannelMethod as a connection url - /// FIXME: currently it only supports tcp://127.0.0.1 and tcp://*, we should - /// have the actual address customizable. - static char const* methodAsUrl(enum ChannelMethod method); + /// @return a url associated to an InputChannelSpec + static std::string channelUrl(InputChannelSpec const&); + /// @return a url associated to an OutputChannelSpec + static std::string channelUrl(OutputChannelSpec const&); }; /// Stream operators so that we can use ChannelType with Boost.Test diff --git a/Framework/Core/src/ComputingResourceHelpers.cxx b/Framework/Core/src/ComputingResourceHelpers.cxx new file mode 100644 index 0000000000000..b968d8b21c243 --- /dev/null +++ b/Framework/Core/src/ComputingResourceHelpers.cxx @@ -0,0 +1,34 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "ComputingResourceHelpers.h" +#include +#include + +namespace o2::framework +{ +long getTotalNumberOfBytes() +{ + long pages = sysconf(_SC_PHYS_PAGES); + long page_size = sysconf(_SC_PAGE_SIZE); + return pages * page_size; +}; + +ComputingResource ComputingResourceHelpers::getLocalhostResource(unsigned short startPort, unsigned short rangeSize) +{ + ComputingResource result; + result.cpu = std::thread::hardware_concurrency(), + result.memory = getTotalNumberOfBytes(); + result.hostname = "localhost"; + result.startPort = startPort; + result.lastPort = startPort + rangeSize; + result.usedPorts = 0; + return result; +} +} // namespace o2::framework diff --git a/Framework/Core/src/ComputingResource.h b/Framework/Core/src/ComputingResourceHelpers.h similarity index 56% rename from Framework/Core/src/ComputingResource.h rename to Framework/Core/src/ComputingResourceHelpers.h index 58afc18898ae9..eb9d00b162957 100644 --- a/Framework/Core/src/ComputingResource.h +++ b/Framework/Core/src/ComputingResourceHelpers.h @@ -7,25 +7,17 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_COMPUTINGRESOURCE_H -#define FRAMEWORK_COMPUTINGRESOURCE_H -#include +#ifndef O2_FRAMEWORK_COMPUTINGRESOURCEHELPERS_H_ +#define O2_FRAMEWORK_COMPUTINGRESOURCEHELPERS_H_ -namespace o2 -{ -namespace framework -{ +#include "Framework/ComputingResource.h" -/// A computing resource which can be used to run a device. -struct ComputingResource { - float cpu; - float memory; - std::string hostname; - unsigned short port; +namespace o2::framework +{ +struct ComputingResourceHelpers { + static ComputingResource getLocalhostResource(unsigned short startPort, unsigned short rangeSize); }; +} // namespace o2::framework -} // namespace framework -} // namespace o2 - -#endif // FRAMEWORK_COMPUTINGRESOURCES_H +#endif // O2_FRAMEWORK_COMPUTINGRESOURCEHELPERS_H_ diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index 415222ee72e7c..65eb34424ac6e 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -26,9 +26,9 @@ #include "Framework/LifetimeHelpers.h" #include "Framework/OutputRoute.h" #include "Framework/WorkflowSpec.h" +#include "Framework/ComputingResource.h" #include "WorkflowHelpers.h" -#include "ComputingResource.h" namespace bpo = boost::program_options; @@ -165,13 +165,11 @@ struct ExpirationHandlerHelpers { std::string inputChannel2String(const InputChannelSpec& channel) { std::string result; - char buffer[32]; - auto addressFormat = ChannelSpecHelpers::methodAsUrl(channel.method); result += "name=" + channel.name; result += std::string(",type=") + ChannelSpecHelpers::typeAsString(channel.type); result += std::string(",method=") + ChannelSpecHelpers::methodAsString(channel.method); - result += std::string(",address=") + (snprintf(buffer, 32, addressFormat, channel.port), buffer); + result += std::string(",address=") + ChannelSpecHelpers::channelUrl(channel); result += std::string(",rateLogging=60"); return result; @@ -180,26 +178,26 @@ std::string inputChannel2String(const InputChannelSpec& channel) std::string outputChannel2String(const OutputChannelSpec& channel) { std::string result; - char buffer[32]; - auto addressFormat = ChannelSpecHelpers::methodAsUrl(channel.method); result += "name=" + channel.name; result += std::string(",type=") + ChannelSpecHelpers::typeAsString(channel.type); result += std::string(",method=") + ChannelSpecHelpers::methodAsString(channel.method); - result += std::string(",address=") + (snprintf(buffer, 32, addressFormat, channel.port), buffer); + result += std::string(",address=") + ChannelSpecHelpers::channelUrl(channel); result += std::string(",rateLogging=60"); return result; } -void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, std::vector& deviceIndex, +void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, + std::vector& deviceIndex, std::vector& connections, - std::vector& resources, + ResourceManager& resourceManager, const std::vector& outEdgeIndex, const std::vector& logicalEdges, const std::vector& actions, const WorkflowSpec& workflow, const std::vector& outputsMatchers, - const std::vector& channelPolicies) + const std::vector& channelPolicies, + ComputingOffer const& defaultOffer) { // The topology cannot be empty or not connected. If that is the case, than // something before this went wrong. @@ -208,7 +206,7 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, // Edges are navigated in order for each device, so the device associaited to // an edge is always the last one created. - auto deviceForEdge = [&actions, &workflow, &devices, &logicalEdges](size_t ei) { + auto deviceForEdge = [&actions, &workflow, &devices, &logicalEdges, &resourceManager, &defaultOffer](size_t ei, ComputingOffer& acceptedOffer) { auto& edge = logicalEdges[ei]; auto& action = actions[ei]; @@ -216,7 +214,27 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, assert(devices.empty() == false); return devices.size() - 1; } + if (acceptedOffer.hostname != "") { + resourceManager.notifyAcceptedOffer(acceptedOffer); + } + auto processor = workflow[edge.producer]; + + acceptedOffer.cpu = defaultOffer.cpu; + acceptedOffer.memory = defaultOffer.memory; + for (auto offer : resourceManager.getAvailableOffers()) { + if (offer.cpu < acceptedOffer.cpu) { + continue; + } + if (offer.memory < acceptedOffer.memory) { + continue; + } + acceptedOffer.hostname = offer.hostname; + acceptedOffer.startPort = offer.startPort; + acceptedOffer.rangeSize = 0; + break; + } + DeviceSpec device; device.name = processor.name; device.id = processor.name; @@ -228,12 +246,15 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, device.rank = processor.rank; device.nSlots = processor.nSlots; device.inputTimesliceId = edge.timeIndex; + device.resource = {acceptedOffer}; devices.push_back(device); return devices.size() - 1; }; - auto channelFromDeviceEdgeAndPort = [&workflow, &channelPolicies](const DeviceSpec& device, - const DeviceConnectionEdge& edge, short port) { + auto channelFromDeviceEdgeAndPort = [&connections, &workflow, &channelPolicies](const DeviceSpec& device, + ComputingResource& deviceResource, + ComputingOffer& acceptedOffer, + const DeviceConnectionEdge& edge) { OutputChannelSpec channel; auto& consumer = workflow[edge.consumer]; std::string consumerDeviceId = consumer.name; @@ -241,20 +262,20 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, consumerDeviceId += "_t" + std::to_string(edge.timeIndex); } channel.name = "from_" + device.id + "_to_" + consumerDeviceId; - channel.port = port; + channel.port = acceptedOffer.startPort + acceptedOffer.rangeSize; + channel.hostname = acceptedOffer.hostname; + deviceResource.usedPorts += 1; + acceptedOffer.rangeSize += 1; + for (auto& policy : channelPolicies) { if (policy.match(device.id, consumerDeviceId)) { policy.modifyOutput(channel); break; } } - return std::move(channel); - }; - - auto connectionIdFromEdgeAndPort = [&connections](const DeviceConnectionEdge& edge, size_t port) { - DeviceConnectionId id{edge.producer, edge.consumer, edge.timeIndex, edge.producerTimeIndex, port}; + DeviceConnectionId id{edge.producer, edge.consumer, edge.timeIndex, edge.producerTimeIndex, channel.port}; connections.push_back(id); - return connections.back(); + return std::move(channel); }; auto isDifferentDestinationDeviceReferredBy = [&actions](size_t ei) { return actions[ei].requiresNewChannel; }; @@ -264,17 +285,15 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, // alredy there) and create a new channel only if it connects two new // devices. Whether or not this is the case was previously computed // in the action.requiresNewChannel field. - auto createChannelForDeviceEdge = [&devices, &logicalEdges, &resources, &channelFromDeviceEdgeAndPort, - &connectionIdFromEdgeAndPort, &outputsMatchers, &deviceIndex, - &workflow](size_t di, size_t ei) { + auto createChannelForDeviceEdge = [&devices, &logicalEdges, &channelFromDeviceEdgeAndPort, + &outputsMatchers, &deviceIndex, + &workflow](size_t di, size_t ei, ComputingOffer& offer) { auto& device = devices[di]; auto& edge = logicalEdges[ei]; deviceIndex.emplace_back(DeviceId{edge.producer, edge.producerTimeIndex, di}); - OutputChannelSpec channel = channelFromDeviceEdgeAndPort(device, edge, resources.back().port); - const DeviceConnectionId& id = connectionIdFromEdgeAndPort(edge, resources.back().port); - resources.pop_back(); + OutputChannelSpec channel = channelFromDeviceEdgeAndPort(device, device.resource, offer, edge); device.outputChannels.push_back(channel); return device.outputChannels.size() - 1; @@ -331,28 +350,31 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, // here refers to the source device. This loop will therefore not create the // devices which acts as sink, which are done in the preocessInEdgeActions // function. + ComputingOffer acceptedOffer; for (auto edge : outEdgeIndex) { - auto device = deviceForEdge(edge); + auto device = deviceForEdge(edge, acceptedOffer); size_t channel = -1; if (isDifferentDestinationDeviceReferredBy(edge)) { - channel = createChannelForDeviceEdge(device, edge); + channel = createChannelForDeviceEdge(device, edge, acceptedOffer); } else { channel = lastChannelFor(device); } appendOutputRouteToSourceDeviceChannel(edge, device, channel); } + resourceManager.notifyAcceptedOffer(acceptedOffer); sortDeviceIndex(); } void DeviceSpecHelpers::processInEdgeActions(std::vector& devices, std::vector& deviceIndex, - std::vector& resources, const std::vector& connections, + ResourceManager& resourceManager, const std::vector& inEdgeIndex, const std::vector& logicalEdges, const std::vector& actions, const WorkflowSpec& workflow, std::vector const& availableForwardsInfo, - std::vector const& channelPolicies) + std::vector const& channelPolicies, + ComputingOffer const& defaultOffer) { auto const& constDeviceIndex = deviceIndex; @@ -392,9 +414,30 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector& devices, return lastConsumerSearch->deviceIndex; }; - auto createNewDeviceForEdge = [&workflow, &logicalEdges, &devices, &deviceIndex](size_t ei) { + auto createNewDeviceForEdge = [&workflow, &logicalEdges, &devices, &deviceIndex, &resourceManager, &defaultOffer](size_t ei, ComputingOffer& acceptedOffer) { auto& edge = logicalEdges[ei]; + + if (acceptedOffer.hostname != "") { + resourceManager.notifyAcceptedOffer(acceptedOffer); + } + auto& processor = workflow[edge.consumer]; + + acceptedOffer.cpu = defaultOffer.cpu; + acceptedOffer.memory = defaultOffer.memory; + for (auto offer : resourceManager.getAvailableOffers()) { + if (offer.cpu < acceptedOffer.cpu) { + continue; + } + if (offer.memory < acceptedOffer.memory) { + continue; + } + acceptedOffer.hostname = offer.hostname; + acceptedOffer.startPort = offer.startPort; + acceptedOffer.rangeSize = 0; + break; + } + DeviceSpec device; device.name = processor.name; device.id = processor.name; @@ -406,6 +449,8 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector& devices, device.rank = processor.rank; device.nSlots = processor.nSlots; device.inputTimesliceId = edge.timeIndex; + device.resource = {acceptedOffer}; + // FIXME: maybe I should use an std::map in the end // but this is really not performance critical auto id = DeviceId{edge.consumer, edge.timeIndex, devices.size()}; @@ -442,11 +487,12 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector& devices, return true; }; auto appendInputChannelForConsumerDevice = [&devices, &connections, &checkNoDuplicatesFor, &channelPolicies]( - size_t pi, size_t ci, int16_t port) { + size_t pi, size_t ci, unsigned short port) { auto const& producerDevice = devices[pi]; auto& consumerDevice = devices[ci]; InputChannelSpec channel; channel.name = "from_" + producerDevice.id + "_to_" + consumerDevice.id; + channel.hostname = producerDevice.resource.hostname; channel.port = port; for (auto& policy : channelPolicies) { if (policy.match(producerDevice.id, consumerDevice.id)) { @@ -544,6 +590,7 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector& devices, // of the sink data processors. // New InputChannels need to refer to preexisting OutputChannels we create // previously. + ComputingOffer acceptedOffer; for (size_t edge : inEdgeIndex) { auto& action = actions[edge]; @@ -553,7 +600,7 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector& devices, if (hasConsumerForEdge(edge)) { consumerDevice = getConsumerForEdge(edge); } else { - consumerDevice = createNewDeviceForEdge(edge); + consumerDevice = createNewDeviceForEdge(edge, acceptedOffer); } } size_t producerDevice = findProducerForEdge(edge); @@ -567,6 +614,7 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector& devices, } appendInputRouteToDestDeviceChannel(edge, consumerDevice, channel); } + resourceManager.notifyAcceptedOffer(acceptedOffer); } // Construct the list of actual devices we want, given a workflow. @@ -577,7 +625,7 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(WorkflowSpec const& workf std::vector const& completionPolicies, std::vector const& dispatchPolicies, std::vector& devices, - std::vector& resources) + ResourceManager& resourceManager) { std::vector availableForwardsInfo; @@ -607,20 +655,36 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(WorkflowSpec const& workf std::vector outEdgeIndex; WorkflowHelpers::sortEdges(inEdgeIndex, outEdgeIndex, logicalEdges); - std::vector actions = WorkflowHelpers::computeOutEdgeActions(logicalEdges, outEdgeIndex); - - DeviceSpecHelpers::processOutEdgeActions(devices, deviceIndex, connections, resources, outEdgeIndex, logicalEdges, - actions, workflow, outputs, channelPolicies); - + std::vector outActions = WorkflowHelpers::computeOutEdgeActions(logicalEdges, outEdgeIndex); // Crete the connections on the inverse map for all of them // lookup for port and add as input of the current device. std::vector inActions = WorkflowHelpers::computeInEdgeActions(logicalEdges, inEdgeIndex); + size_t deviceCount = 0; + for (auto& action : outActions) { + deviceCount += action.requiresNewDevice ? 1 : 0; + } + for (auto& action : inActions) { + deviceCount += action.requiresNewDevice ? 1 : 0; + } + + ComputingOffer defaultOffer; + for (auto& offer : resourceManager.getAvailableOffers()) { + defaultOffer.cpu += offer.cpu; + defaultOffer.memory += offer.memory; + } + + /// For the moment lets play it safe and underestimate default needed resources. + defaultOffer.cpu /= deviceCount + 1; + defaultOffer.memory /= deviceCount + 1; + + processOutEdgeActions(devices, deviceIndex, connections, resourceManager, outEdgeIndex, logicalEdges, + outActions, workflow, outputs, channelPolicies, defaultOffer); // FIXME: is this not the case??? std::sort(connections.begin(), connections.end()); - processInEdgeActions(devices, deviceIndex, resources, connections, inEdgeIndex, logicalEdges, inActions, workflow, - availableForwardsInfo, channelPolicies); + processInEdgeActions(devices, deviceIndex, connections, resourceManager, inEdgeIndex, logicalEdges, + inActions, workflow, availableForwardsInfo, channelPolicies, defaultOffer); // We apply the completion policies here since this is where we have all the // devices resolved. for (auto& device : devices) { diff --git a/Framework/Core/src/DeviceSpecHelpers.h b/Framework/Core/src/DeviceSpecHelpers.h index d442f3ca43bca..73df1eefb7aa9 100644 --- a/Framework/Core/src/DeviceSpecHelpers.h +++ b/Framework/Core/src/DeviceSpecHelpers.h @@ -22,7 +22,7 @@ #include "Framework/AlgorithmSpec.h" #include "Framework/ConfigParamSpec.h" #include "Framework/OutputRoute.h" -#include "ComputingResource.h" +#include "ResourceManager.h" #include "DataProcessorInfo.h" #include "WorkflowHelpers.h" #include @@ -45,14 +45,14 @@ struct DeviceSpecHelpers { std::vector const& completionPolicies, std::vector const& dispatchPolicies, std::vector& devices, - std::vector& resources); + ResourceManager& resourceManager); static void dataProcessorSpecs2DeviceSpecs( const WorkflowSpec& workflow, std::vector const& channelPolicies, std::vector const& completionPolicies, std::vector& devices, - std::vector& resources) + ResourceManager& resources) { std::vector dispatchPolicies = DispatchPolicy::createDefaultPolicies(); dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, dispatchPolicies, devices, resources); @@ -76,13 +76,14 @@ struct DeviceSpecHelpers { std::vector& devices, std::vector& deviceIndex, std::vector& connections, - std::vector& resources, + ResourceManager& resourceManager, const std::vector& outEdgeIndex, const std::vector& logicalEdges, const std::vector& actions, const WorkflowSpec& workflow, const std::vector& outputs, - std::vector const& channelPolicies); + std::vector const& channelPolicies, + ComputingOffer const& defaultOffer); /// This takes the list of preprocessed edges of a graph /// and creates Devices and Channels which are related @@ -91,14 +92,15 @@ struct DeviceSpecHelpers { static void processInEdgeActions( std::vector& devices, std::vector& deviceIndex, - std::vector& resources, const std::vector& connections, + ResourceManager& resourceManager, const std::vector& inEdgeIndex, const std::vector& logicalEdges, const std::vector& actions, const WorkflowSpec& workflow, const std::vector& availableForwardsInfo, - std::vector const& channelPolicies); + std::vector const& channelPolicies, + ComputingOffer const& defaultOffer); /// return a description of all options to be forwarded to the device /// by default diff --git a/Framework/Core/src/ResourceManager.h b/Framework/Core/src/ResourceManager.h index 4f05822673051..93ba16711a1cc 100644 --- a/Framework/Core/src/ResourceManager.h +++ b/Framework/Core/src/ResourceManager.h @@ -7,24 +7,22 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_RESOURCEMANAGER_H -#define FRAMEWORK_RESOURCEMANAGER_H +#ifndef O2_FRAMEWORK_RESOURCEMANAGER_H_ +#define O2_FRAMEWORK_RESOURCEMANAGER_H_ -#include "ComputingResource.h" +#include "Framework/ComputingResource.h" #include -namespace o2 -{ -namespace framework +namespace o2::framework { class ResourceManager { public: - virtual std::vector getAvailableResources() = 0; + virtual std::vector getAvailableOffers() = 0; + virtual void notifyAcceptedOffer(ComputingOffer const&) = 0; }; -} // namespace framework -} // namespace o2 +} // namespace o2::framework -#endif // FRAMEWORK_RESOURCEMANAGER_H +#endif // O2_FRAMEWORK_RESOURCEMANAGER_H_ diff --git a/Framework/Core/src/SimpleResourceManager.cxx b/Framework/Core/src/SimpleResourceManager.cxx index aedf9c5ad8b4a..99961dd41b4e5 100644 --- a/Framework/Core/src/SimpleResourceManager.cxx +++ b/Framework/Core/src/SimpleResourceManager.cxx @@ -8,7 +8,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "SimpleResourceManager.h" -#include "ComputingResource.h" +#include "Framework/ComputingResource.h" #include #include @@ -20,23 +20,55 @@ namespace framework /// The simplest implementation of this allocates mMaxPorts ports starting from /// the mInitialPort. For now we still consider everything running on a single /// machine. -std::vector SimpleResourceManager::getAvailableResources() +std::vector SimpleResourceManager::getAvailableOffers() { - std::vector result; - if ((mInitialPort < 1025) || ((mInitialPort + mMaxPorts) > 65535)) { - throw std::runtime_error("Invalid port number. Valid port range is 1025-65535"); + std::vector result; + + for (auto& resource : mResources) { + if (resource.cpu < 0.01) { + continue; + } + if (resource.memory < 0.01) { + continue; + } + if (resource.usedPorts == (resource.lastPort - resource.startPort + 1)) { + continue; + } + ComputingOffer offer; + offer.cpu = resource.cpu; + offer.memory = resource.memory; + offer.hostname = resource.hostname; + offer.startPort = resource.startPort + resource.usedPorts; + offer.rangeSize = (resource.lastPort - resource.startPort) - resource.usedPorts; + result.push_back(offer); } - // We insert them backwards for compatibility with the previous - // way of assigning them. - for (size_t i = mInitialPort + mMaxPorts - 1; i >= mInitialPort; --i) { - result.push_back(ComputingResource{ - 1.0, - 1.0, - "localhost", - static_cast(i)}); - }; return result; } +void SimpleResourceManager::notifyAcceptedOffer(ComputingOffer const& offer) +{ + bool resourceFound = false; + for (auto& resource : mResources) { + if (resource.hostname != offer.hostname) { + continue; + } + if (resource.startPort > offer.startPort) { + continue; + } + if (resource.lastPort < offer.startPort + offer.rangeSize) { + continue; + } + resourceFound = true; + resource.cpu -= offer.cpu; + resource.memory -= offer.memory; + resource.usedPorts += offer.rangeSize; + break; + } + + if (resourceFound == false) { + throw std::runtime_error("Could not match offer to original resource."); + } +} + } // namespace framework } // namespace o2 diff --git a/Framework/Core/src/SimpleResourceManager.h b/Framework/Core/src/SimpleResourceManager.h index b11aa7f7a415d..8f383ce31f96b 100644 --- a/Framework/Core/src/SimpleResourceManager.h +++ b/Framework/Core/src/SimpleResourceManager.h @@ -7,14 +7,12 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_SIMPLERESOURCEMANAGER_H -#define FRAMEWORK_SIMPLERESOURCEMANAGER_H +#ifndef O2_FRAMEWORK_SIMPLERESOURCEMANAGER_H_ +#define O2_FRAMEWORK_SIMPLERESOURCEMANAGER_H_ #include "ResourceManager.h" -namespace o2 -{ -namespace framework +namespace o2::framework { /// A resource manager with infinite resources at its disposal. @@ -23,23 +21,22 @@ namespace framework class SimpleResourceManager : public ResourceManager { public: - /// @a initialPort is the first port which can be used - /// by this trivial resource manager. - /// @a maxPorts is the maximum number of ports starting from - /// initialPort that this resource manager can allocate. - SimpleResourceManager(unsigned short initialPort, unsigned short maxPorts = 1000) - : mInitialPort{initialPort}, - mMaxPorts{maxPorts} + /// @a initialResources the precomputed list of available resources + SimpleResourceManager(std::vector intialResources) + : mResources{intialResources} { } - std::vector getAvailableResources() override; + /// Get the available resources for a device to run on + std::vector getAvailableOffers() override; + + /// Notify that we have accepted a given resource and that it + /// should not be reoffered + void notifyAcceptedOffer(ComputingOffer const& accepted) override; private: - int mInitialPort; - int mMaxPorts; + std::vector mResources; }; -} // namespace framework -} // namespace o2 +} // namespace o2::framework -#endif // FRAMEWORK_SIMPLERESOURCEMANAGER_H +#endif // O2_FRAMEWORK_SIMPLERESOURCEMANAGER_H_ diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 0fc5eb66fae4f..902d866cd6500 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -40,6 +40,7 @@ #include "Framework/CallbackService.h" #include "Framework/WorkflowSpec.h" +#include "ComputingResourceHelpers.h" #include "DataProcessingStatus.h" #include "DDSConfigHelpers.h" #include "O2ControlHelpers.h" @@ -730,7 +731,11 @@ int runStateMachine(DataProcessorSpecs const& workflow, DeviceControls controls; DeviceExecutions deviceExecutions; DataProcessorInfos dataProcessorInfos = previousDataProcessorInfos; - auto resourceManager = std::make_unique(driverInfo.startPort, driverInfo.portRange); + + std::vector resources{ + ComputingResourceHelpers::getLocalhostResource(driverInfo.startPort, driverInfo.portRange)}; + + auto resourceManager = std::make_unique(resources); void* window = nullptr; decltype(gui::getGUIDebugger(infos, deviceSpecs, dataProcessorInfos, metricsInfos, driverInfo, controls, driverControl)) debugGUICallback; @@ -838,13 +843,12 @@ int runStateMachine(DataProcessorSpecs const& workflow, break; case DriverState::MATERIALISE_WORKFLOW: try { - std::vector resources = resourceManager->getAvailableResources(); DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, driverInfo.channelPolicies, driverInfo.completionPolicies, driverInfo.dispatchPolicies, deviceSpecs, - resources); + *resourceManager); // This should expand nodes so that we can build a consistent DAG. } catch (std::runtime_error& e) { std::cerr << "Invalid workflow: " << e.what() << std::endl; diff --git a/Framework/Core/test/test_ChannelSpecHelpers.cxx b/Framework/Core/test/test_ChannelSpecHelpers.cxx index 73b096b62610b..1166c706bd083 100644 --- a/Framework/Core/test/test_ChannelSpecHelpers.cxx +++ b/Framework/Core/test/test_ChannelSpecHelpers.cxx @@ -25,12 +25,6 @@ BOOST_AUTO_TEST_CASE(TestChannelMethod) BOOST_REQUIRE_EQUAL(oss.str(), "bindconnect"); std::ostringstream oss2; - - oss2 << ChannelSpecHelpers::methodAsUrl(ChannelMethod::Bind) - << "\n" - << ChannelSpecHelpers::methodAsUrl(ChannelMethod::Connect); - - BOOST_REQUIRE_EQUAL(oss2.str(), "tcp://*:%d\ntcp://127.0.0.1:%d"); } BOOST_AUTO_TEST_CASE(TestChannelType) diff --git a/Framework/Core/test/test_DeviceSpec.cxx b/Framework/Core/test/test_DeviceSpec.cxx index 0d5ee26b6521e..c424f13c62af3 100644 --- a/Framework/Core/test/test_DeviceSpec.cxx +++ b/Framework/Core/test/test_DeviceSpec.cxx @@ -20,6 +20,7 @@ #include "Framework/WorkflowSpec.h" #include "Framework/DataSpecUtils.h" #include "../src/SimpleResourceManager.h" +#include "../src/ComputingResourceHelpers.h" #include "test_HelperMacros.h" using namespace o2::framework; @@ -44,10 +45,18 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1) BOOST_REQUIRE_EQUAL(channelPolicies.empty(), false); BOOST_REQUIRE_EQUAL(completionPolicies.empty(), false); std::vector devices; - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); - BOOST_CHECK_EQUAL(devices.size(), 2); + + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + BOOST_REQUIRE_EQUAL(resources.size(), 1); + BOOST_CHECK_EQUAL(resources[0].startPort, 22000); + SimpleResourceManager rm(resources); + auto offers = rm.getAvailableOffers(); + BOOST_REQUIRE_EQUAL(offers.size(), 1); + BOOST_CHECK_EQUAL(offers[0].startPort, 22000); + BOOST_CHECK_EQUAL(offers[0].rangeSize, 1000); + + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); + BOOST_REQUIRE_EQUAL(devices.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].type, ChannelType::Push); @@ -55,13 +64,13 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1) BOOST_CHECK_EQUAL(devices[0].outputChannels[0].port, 22000); BOOST_CHECK_EQUAL(devices[0].outputs.size(), 1); - BOOST_CHECK_EQUAL(devices[1].inputChannels.size(), 1); + BOOST_REQUIRE_EQUAL(devices[1].inputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[1].inputChannels[0].method, ChannelMethod::Connect); BOOST_CHECK_EQUAL(devices[1].inputChannels[0].type, ChannelType::Pull); BOOST_CHECK_EQUAL(devices[1].inputChannels[0].name, "from_A_to_B"); BOOST_CHECK_EQUAL(devices[1].inputChannels[0].port, 22000); - BOOST_CHECK_EQUAL(devices[1].inputs.size(), 1); + BOOST_REQUIRE_EQUAL(devices[1].inputs.size(), 1); BOOST_CHECK_EQUAL(devices[1].inputs[0].sourceChannel, "from_A_to_B"); } @@ -79,9 +88,9 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec1PushPull) BOOST_REQUIRE_EQUAL(channelPolicies.empty(), false); std::vector devices; - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); BOOST_CHECK_EQUAL(devices.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -123,9 +132,9 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec2) auto completionPolicies = CompletionPolicy::createDefaultPolicies(); std::vector devices; - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); BOOST_CHECK_EQUAL(devices.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -165,9 +174,9 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec3) auto completionPolicies = CompletionPolicy::createDefaultPolicies(); std::vector devices; - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); BOOST_CHECK_EQUAL(devices.size(), 3); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -212,10 +221,10 @@ BOOST_AUTO_TEST_CASE(TestDeviceSpec4) auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); std::vector devices; - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); BOOST_CHECK_EQUAL(devices.size(), 4); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 2); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -282,9 +291,9 @@ BOOST_AUTO_TEST_CASE(TestTopologyForwarding) auto completionPolicies = CompletionPolicy::createDefaultPolicies(); std::vector devices; - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); BOOST_CHECK_EQUAL(devices.size(), 3); BOOST_CHECK_EQUAL(devices[0].outputChannels.size(), 1); BOOST_CHECK_EQUAL(devices[0].outputChannels[0].method, ChannelMethod::Bind); @@ -397,11 +406,14 @@ BOOST_AUTO_TEST_CASE(TestOutEdgeProcessingHelpers) WorkflowSpec workflow = defineDataProcessing7(); auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + ComputingOffer defaultOffer; + defaultOffer.cpu = 0.01; + defaultOffer.memory = 0.01; - DeviceSpecHelpers::processOutEdgeActions(devices, deviceIndex, connections, resources, edgeOutIndex, logicalEdges, - actions, workflow, globalOutputs, channelPolicies); + DeviceSpecHelpers::processOutEdgeActions(devices, deviceIndex, connections, rm, edgeOutIndex, logicalEdges, + actions, workflow, globalOutputs, channelPolicies, defaultOffer); std::vector expectedDeviceIndex = {{0, 0, 0}, {0, 0, 0}, {0, 0, 0}, {1, 0, 1}, {1, 0, 1}, {1, 1, 2}, {1, 1, 2}, {1, 2, 3}, {1, 2, 3}}; BOOST_REQUIRE_EQUAL(devices.size(), 4); // For producers @@ -427,8 +439,9 @@ BOOST_AUTO_TEST_CASE(TestOutEdgeProcessingHelpers) BOOST_REQUIRE_EQUAL(devices[2].outputs.size(), 2); BOOST_REQUIRE_EQUAL(devices[3].outputs.size(), 2); - // FIXME: check we have the right connections as well.. - BOOST_CHECK_EQUAL(resources.back().port, 22009); + auto offers = rm.getAvailableOffers(); + BOOST_REQUIRE_EQUAL(offers.size(), 1); + BOOST_CHECK_EQUAL(offers[0].startPort, 22009); // Not sure this is correct, but lets assume that's the case.. std::vector edgeInIndex{0, 1, 2, 3, 4, 5, 6, 7, 8}; @@ -447,8 +460,8 @@ BOOST_AUTO_TEST_CASE(TestOutEdgeProcessingHelpers) std::sort(connections.begin(), connections.end()); - DeviceSpecHelpers::processInEdgeActions(devices, deviceIndex, resources, connections, edgeInIndex, logicalEdges, - inActions, workflow, availableForwardsInfo, channelPolicies); + DeviceSpecHelpers::processInEdgeActions(devices, deviceIndex, connections, rm, edgeInIndex, logicalEdges, + inActions, workflow, availableForwardsInfo, channelPolicies, defaultOffer); // std::vector expectedDeviceIndexFinal = {{0, 0, 0}, {0, 0, 0}, {0, 0, 0}, {1, 0, 1}, {1, 0, 1}, {1, 1, 2}, {1, 1, 2}, {1, 2, 3}, {1, 2, 3}, {2, 0, 4}, {2, 1, 5}}; BOOST_REQUIRE_EQUAL(expectedDeviceIndexFinal.size(), deviceIndex.size()); @@ -555,9 +568,9 @@ BOOST_AUTO_TEST_CASE(TestTopologyLayeredTimePipeline) std::vector devices; auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); BOOST_CHECK_EQUAL(devices.size(), 6); BOOST_CHECK_EQUAL(devices[0].id, "A"); BOOST_CHECK_EQUAL(devices[1].id, "B_t0"); @@ -675,8 +688,8 @@ WorkflowSpec defineDataProcessing8() BOOST_AUTO_TEST_CASE(TestSimpleWildcard) { auto workflow = defineDataProcessing8(); - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); std::vector devices; @@ -711,8 +724,12 @@ BOOST_AUTO_TEST_CASE(TestSimpleWildcard) EdgeAction{false, false}, }; - DeviceSpecHelpers::processOutEdgeActions(devices, deviceIndex, connections, resources, edgeOutIndex, logicalEdges, - outActions, workflow, globalOutputs, channelPolicies); + ComputingOffer defaultOffer; + defaultOffer.cpu = 0.01; + defaultOffer.memory = 0.01; + + DeviceSpecHelpers::processOutEdgeActions(devices, deviceIndex, connections, rm, edgeOutIndex, logicalEdges, + outActions, workflow, globalOutputs, channelPolicies, defaultOffer); BOOST_REQUIRE_EQUAL(devices.size(), 2); // Two devices have outputs: A and Timer BOOST_CHECK_EQUAL(devices[0].name, "A"); @@ -727,8 +744,8 @@ BOOST_AUTO_TEST_CASE(TestSimpleWildcard) std::sort(connections.begin(), connections.end()); - DeviceSpecHelpers::processInEdgeActions(devices, deviceIndex, resources, connections, edgeInIndex, logicalEdges, - inActions, workflow, availableForwardsInfo, channelPolicies); + DeviceSpecHelpers::processInEdgeActions(devices, deviceIndex, connections, rm, edgeInIndex, logicalEdges, + inActions, workflow, availableForwardsInfo, channelPolicies, defaultOffer); BOOST_REQUIRE_EQUAL(devices.size(), 3); // Now we also have B BOOST_CHECK_EQUAL(devices[0].name, "A"); diff --git a/Framework/Core/test/test_DeviceSpecHelpers.cxx b/Framework/Core/test/test_DeviceSpecHelpers.cxx index 13b76a3084103..a36ee5d11a305 100644 --- a/Framework/Core/test/test_DeviceSpecHelpers.cxx +++ b/Framework/Core/test/test_DeviceSpecHelpers.cxx @@ -23,6 +23,7 @@ #include #include #include "../src/SimpleResourceManager.h" +#include "../src/ComputingResourceHelpers.h" namespace o2 { @@ -131,14 +132,14 @@ BOOST_AUTO_TEST_CASE(test_prepareArguments) std::vector deviceSpecs; - auto resourceManager = std::make_unique(42000, 100); - auto resources = resourceManager->getAvailableResources(); + std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + auto rm = std::make_unique(resources); DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, ChannelConfigurationPolicy::createDefaultPolicies(), CompletionPolicy::createDefaultPolicies(), deviceSpecs, - resources); + *rm); // Now doing the test cases CheckMatrix matrix; diff --git a/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx b/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx index 512e647722d90..ac70194d326b0 100644 --- a/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx +++ b/Framework/Core/test/test_FrameworkDataFlowToDDS.cxx @@ -15,6 +15,7 @@ #include "../src/DDSConfigHelpers.h" #include "../src/DeviceSpecHelpers.h" #include "../src/SimpleResourceManager.h" +#include "../src/ComputingResourceHelpers.h" #include "Framework/DataAllocator.h" #include "Framework/DeviceControl.h" #include "Framework/DeviceSpec.h" @@ -69,10 +70,10 @@ BOOST_AUTO_TEST_CASE(TestGraphviz) std::ostringstream ss{""}; auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); std::vector devices; - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); + std::vector resources{ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); std::vector controls; std::vector executions; controls.resize(devices.size()); diff --git a/Framework/Core/test/test_Graphviz.cxx b/Framework/Core/test/test_Graphviz.cxx index 8b0a21058d0f5..5c4910b66fad6 100644 --- a/Framework/Core/test/test_Graphviz.cxx +++ b/Framework/Core/test/test_Graphviz.cxx @@ -11,6 +11,7 @@ #define BOOST_TEST_MAIN #define BOOST_TEST_DYN_LINK +#include "../src/ComputingResourceHelpers.h" #include "../src/DeviceSpecHelpers.h" #include "../src/GraphvizHelpers.h" #include "../src/SimpleResourceManager.h" @@ -96,9 +97,9 @@ BOOST_AUTO_TEST_CASE(TestGraphviz) } auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); str.str(""); GraphvizHelpers::dumpDeviceSpec2Graphviz(str, devices); lineByLineComparision(str.str(), R"EXPECTED(digraph structs { @@ -134,9 +135,9 @@ BOOST_AUTO_TEST_CASE(TestGraphvizWithPipeline) } auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); str.str(""); GraphvizHelpers::dumpDeviceSpec2Graphviz(str, devices); lineByLineComparision(str.str(), R"EXPECTED(digraph structs { diff --git a/Framework/Core/test/test_TimeParallelPipelining.cxx b/Framework/Core/test/test_TimeParallelPipelining.cxx index d6d7b9b3b4b77..a2975f5403644 100644 --- a/Framework/Core/test/test_TimeParallelPipelining.cxx +++ b/Framework/Core/test/test_TimeParallelPipelining.cxx @@ -14,6 +14,7 @@ #include #include "../src/DeviceSpecHelpers.h" #include "../src/SimpleResourceManager.h" +#include "../src/ComputingResourceHelpers.h" #include "Framework/DeviceControl.h" #include "Framework/DeviceSpec.h" #include "Framework/WorkflowSpec.h" @@ -53,9 +54,9 @@ BOOST_AUTO_TEST_CASE(TimePipeliningSimple) std::vector devices; auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); BOOST_REQUIRE_EQUAL(devices.size(), 4); auto& producer = devices[0]; auto& layer0Consumer0 = devices[1]; @@ -105,9 +106,9 @@ BOOST_AUTO_TEST_CASE(TimePipeliningFull) std::vector devices; auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(); auto completionPolicies = CompletionPolicy::createDefaultPolicies(); - SimpleResourceManager rm(22000, 1000); - auto resources = rm.getAvailableResources(); - DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, resources); + std::vector resources = {ComputingResourceHelpers::getLocalhostResource(22000, 1000)}; + SimpleResourceManager rm(resources); + DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, devices, rm); BOOST_REQUIRE_EQUAL(devices.size(), 7); auto& producer = devices[0]; auto& layer0Consumer0 = devices[1];