diff --git a/third_party/ipcz/src/ipcz/message.cc b/third_party/ipcz/src/ipcz/message.cc index f53727ec506d2e..5872d6b7870620 100644 --- a/third_party/ipcz/src/ipcz/message.cc +++ b/third_party/ipcz/src/ipcz/message.cc @@ -160,6 +160,10 @@ uint32_t Message::AllocateGenericArray(size_t element_size, } uint32_t Message::AppendDriverObject(DriverObject object) { + if (!object.is_valid()) { + return internal::kInvalidDriverObjectIndex; + } + const uint32_t index = checked_cast(driver_objects_.size()); driver_objects_.push_back(std::move(object)); return index; @@ -173,12 +177,17 @@ internal::DriverObjectArrayData Message::AppendDriverObjects( }; driver_objects_.reserve(driver_objects_.size() + objects.size()); for (auto& object : objects) { + ABSL_ASSERT(object.is_valid()); driver_objects_.push_back(std::move(object)); } return data; } DriverObject Message::TakeDriverObject(uint32_t index) { + if (index == internal::kInvalidDriverObjectIndex) { + return {}; + } + // Note that `index` has already been validated by now. ABSL_HARDENING_ASSERT(index < driver_objects_.size()); return std::move(driver_objects_[index]); @@ -344,10 +353,12 @@ bool Message::DeserializeFromTransport( switch (param.type) { case internal::ParamType::kDriverObject: { const uint32_t index = GetParamValueAt(param.offset); - if (is_object_claimed[index]) { - return false; + if (index != internal::kInvalidDriverObjectIndex) { + if (is_object_claimed[index]) { + return false; + } + is_object_claimed[index] = true; } - is_object_claimed[index] = true; break; } diff --git a/third_party/ipcz/src/ipcz/message.h b/third_party/ipcz/src/ipcz/message.h index a0b2d29cdbc05a..7aa50d225a840f 100644 --- a/third_party/ipcz/src/ipcz/message.h +++ b/third_party/ipcz/src/ipcz/message.h @@ -115,6 +115,10 @@ struct IPCZ_ALIGN(8) DriverObjectArrayData { uint32_t num_objects; }; +// Encodes an invalid driver object index. Any driver object field encoded as +// this value will deserialize to an invalid DriverObject. +constexpr uint32_t kInvalidDriverObjectIndex = 0xffffffff; + // End of wire structure definitions. Anything below this line is not meant to // be encoded into messages. #pragma pack(pop) diff --git a/third_party/ipcz/src/ipcz/node.cc b/third_party/ipcz/src/ipcz/node.cc index c21aa6f8aa5d78..5c4cf845d503b4 100644 --- a/third_party/ipcz/src/ipcz/node.cc +++ b/third_party/ipcz/src/ipcz/node.cc @@ -122,12 +122,25 @@ NodeName Node::GenerateRandomName() const { return name; } +void Node::SetAllocationDelegate(Ref link) { + absl::MutexLock lock(&mutex_); + ABSL_ASSERT(!allocation_delegate_link_); + allocation_delegate_link_ = std::move(link); +} + void Node::AllocateSharedMemory(size_t size, AllocateSharedMemoryCallback callback) { - // TODO: Implement delegated allocation when this Node is connected to another - // with the IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE flag set. For now we - // assume all nodes can perform direct allocation. - callback(DriverMemory(driver_, size)); + Ref delegate; + { + absl::MutexLock lock(&mutex_); + delegate = allocation_delegate_link_; + } + + if (delegate) { + delegate->RequestMemory(size, std::move(callback)); + } else { + callback(DriverMemory(driver_, size)); + } } void Node::EstablishLink(const NodeName& name, EstablishLinkCallback callback) { @@ -227,7 +240,7 @@ void Node::AcceptIntroduction(NodeLink& from_node_link, Ref new_link = NodeLink::CreateInactive( WrapRefCounted(this), side, local_name, name, Type::kNormal, - remote_protocol_version, transport, std::move(memory)); + remote_protocol_version, transport, memory); ABSL_ASSERT(new_link); std::vector callbacks; @@ -290,14 +303,20 @@ void Node::DropLink(const NodeName& name) { link = std::move(it->second); node_links_.erase(it); - DVLOG(4) << "Node " << link->local_node_name().ToString() << " dropping " + const NodeName& local_name = link->local_node_name(); + DVLOG(4) << "Node " << local_name.ToString() << " dropping " << " link to " << link->remote_node_name().ToString(); if (link == broker_link_) { - DVLOG(4) << "Node " << link->local_node_name().ToString() - << " has lost its broker link"; + DVLOG(4) << "Node " << local_name.ToString() << " lost its broker link"; broker_link_.reset(); lost_broker = true; } + + if (link == allocation_delegate_link_) { + DVLOG(4) << "Node " << local_name.ToString() + << " lost its allocation delegate"; + allocation_delegate_link_.reset(); + } } link->Deactivate(); @@ -313,6 +332,7 @@ void Node::ShutDown() { absl::MutexLock lock(&mutex_); std::swap(node_links_, node_links); broker_link_.reset(); + allocation_delegate_link_.reset(); } for (const auto& entry : node_links) { diff --git a/third_party/ipcz/src/ipcz/node.h b/third_party/ipcz/src/ipcz/node.h index 7c6f54291671cb..ded796d3f60b65 100644 --- a/third_party/ipcz/src/ipcz/node.h +++ b/third_party/ipcz/src/ipcz/node.h @@ -98,6 +98,13 @@ class Node : public APIObjectImpl { // randomness. NodeName GenerateRandomName() const; + // Sets a NodeLink to use for asynchronous shared memory allocation requests. + // This is configured when the ConnectNode() API is called with + // IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE. Typically this is combined with + // IPCZ_CONNECT_NODE_TO_BROKER when connecting from a sandboxed process which + // cannot allocate its own shared memory regions. + void SetAllocationDelegate(Ref link); + // Requests allocation of a new shared memory object of the given size. // `callback` is invoked with the new object when allocation is complete. // This operation is asynchronous if allocation is delegated to another node, @@ -168,6 +175,11 @@ class Node : public APIObjectImpl { // the node will lose all its other links too. Ref broker_link_ ABSL_GUARDED_BY(mutex_); + // A link over which all internal shared memory allocation is delegated. If + // null, this Node will always attempt to allocate shared memory directly + // through its ipcz driver. + Ref allocation_delegate_link_ ABSL_GUARDED_BY(mutex_); + // Lookup table of broker-assigned node names and links to those nodes. All of // these links and their associated names are received by the `broker_link_` // if this is a non-broker node. If this is a broker node, these links are diff --git a/third_party/ipcz/src/ipcz/node_connector.cc b/third_party/ipcz/src/ipcz/node_connector.cc index 0d3fddd2562188..d33b320827e8cd 100644 --- a/third_party/ipcz/src/ipcz/node_connector.cc +++ b/third_party/ipcz/src/ipcz/node_connector.cc @@ -132,9 +132,9 @@ class NodeConnectorForNonBrokerToBroker : public NodeConnector { NodeLinkMemory::Create(node_, buffer_memory.Map())); node_->SetAssignedName(connect.params().receiver_name); node_->SetBrokerLink(new_link); - - // TODO: Support delegated allocation of shared memory. - ABSL_ASSERT((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) == 0); + if ((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) != 0) { + node_->SetAllocationDelegate(new_link); + } AcceptConnection(std::move(new_link), LinkSide::kB, connect.params().num_initial_portals); diff --git a/third_party/ipcz/src/ipcz/node_link.cc b/third_party/ipcz/src/ipcz/node_link.cc index 51f05decc75032..f84033dfb49180 100644 --- a/third_party/ipcz/src/ipcz/node_link.cc +++ b/third_party/ipcz/src/ipcz/node_link.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "ipcz/box.h" @@ -28,6 +29,7 @@ #include "third_party/abseil-cpp/absl/base/macros.h" #include "util/log.h" #include "util/ref_counted.h" +#include "util/safe_math.h" namespace ipcz { @@ -225,6 +227,18 @@ void NodeLink::AcceptBypassLink( Transmit(accept); } +void NodeLink::RequestMemory(size_t size, RequestMemoryCallback callback) { + const uint32_t size32 = checked_cast(size); + { + absl::MutexLock lock(&mutex_); + pending_memory_requests_[size32].push_back(std::move(callback)); + } + + msg::RequestMemory request; + request.params().size = size32; + Transmit(request); +} + void NodeLink::Deactivate() { { absl::MutexLock lock(&mutex_); @@ -531,6 +545,39 @@ bool NodeLink::OnFlushRouter(msg::FlushRouter& flush) { return true; } +bool NodeLink::OnRequestMemory(msg::RequestMemory& request) { + DriverMemory memory(node_->driver(), request.params().size); + msg::ProvideMemory provide; + provide.params().size = request.params().size; + provide.params().buffer = + provide.AppendDriverObject(memory.TakeDriverObject()); + Transmit(provide); + return true; +} + +bool NodeLink::OnProvideMemory(msg::ProvideMemory& provide) { + DriverMemory memory(provide.TakeDriverObject(provide.params().buffer)); + RequestMemoryCallback callback; + { + absl::MutexLock lock(&mutex_); + auto it = pending_memory_requests_.find(provide.params().size); + if (it == pending_memory_requests_.end()) { + return false; + } + + std::list& callbacks = it->second; + ABSL_ASSERT(!callbacks.empty()); + callback = std::move(callbacks.front()); + callbacks.pop_front(); + if (callbacks.empty()) { + pending_memory_requests_.erase(it); + } + } + + callback(std::move(memory)); + return true; +} + void NodeLink::OnTransportError() { SublinkMap sublinks; { diff --git a/third_party/ipcz/src/ipcz/node_link.h b/third_party/ipcz/src/ipcz/node_link.h index 11decaa1a5c3b5..76003bead58c28 100644 --- a/third_party/ipcz/src/ipcz/node_link.h +++ b/third_party/ipcz/src/ipcz/node_link.h @@ -160,6 +160,12 @@ class NodeLink : public msg::NodeMessageListener { SublinkId new_sublink, FragmentRef new_link_state); + // Sends a request to allocate a new shared memory region and invokes + // `callback` once the request succeeds or fails. On failure, `callback` is + // invoke with an invalid DriverMemory object. + using RequestMemoryCallback = std::function; + void RequestMemory(size_t size, RequestMemoryCallback callback); + // Permanently deactivates this NodeLink. Once this call returns the NodeLink // will no longer receive transport messages. It may still be used to transmit // outgoing messages, but it cannot be reactivated. Transmissions over a @@ -211,6 +217,8 @@ class NodeLink : public msg::NodeMessageListener { bool OnBypassPeerWithLink(msg::BypassPeerWithLink& bypass) override; bool OnStopProxyingToLocalPeer(msg::StopProxyingToLocalPeer& stop) override; bool OnFlushRouter(msg::FlushRouter& flush) override; + bool OnRequestMemory(msg::RequestMemory& request) override; + bool OnProvideMemory(msg::ProvideMemory& provide) override; void OnTransportError() override; const Ref node_; @@ -234,6 +242,13 @@ class NodeLink : public msg::NodeMessageListener { using SublinkMap = absl::flat_hash_map; SublinkMap sublinks_ ABSL_GUARDED_BY(mutex_); + + // Pending memory allocation request callbacks. Keyed by request size, when + // an incoming ProvideMemory message is received, the front of the list for + // that size is removed from the map and invoked with the new memory object. + using MemoryRequestMap = + absl::flat_hash_map>; + MemoryRequestMap pending_memory_requests_ ABSL_GUARDED_BY(mutex_); }; } // namespace ipcz diff --git a/third_party/ipcz/src/ipcz/node_messages_generator.h b/third_party/ipcz/src/ipcz/node_messages_generator.h index c0a3b9741a939e..afdaa77b8e26b6 100644 --- a/third_party/ipcz/src/ipcz/node_messages_generator.h +++ b/third_party/ipcz/src/ipcz/node_messages_generator.h @@ -351,4 +351,21 @@ IPCZ_MSG_BEGIN(FlushRouter, IPCZ_MSG_ID(36), IPCZ_MSG_VERSION(0)) IPCZ_MSG_PARAM(SublinkId, sublink) IPCZ_MSG_END() +// Requests allocation of a shared memory region of a given size. If the +// recipient can comply, they will send back a corresponding ProvideMemory +// message with a serialized memory region. This message is only sent to a +// node's allocation delegate (usually the broker), which is established by +// providing the IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE flag to ConnectNode(). +IPCZ_MSG_BEGIN(RequestMemory, IPCZ_MSG_ID(64), IPCZ_MSG_VERSION(0)) + IPCZ_MSG_PARAM(uint32_t, size) +IPCZ_MSG_END() + +// Provides a new shared buffer to the receiver, owned exclusively by the +// receiver. The receiver is free to duplicate this buffer and share it with +// other nodes. +IPCZ_MSG_BEGIN(ProvideMemory, IPCZ_MSG_ID(65), IPCZ_MSG_VERSION(0)) + IPCZ_MSG_PARAM(uint32_t, size) + IPCZ_MSG_PARAM_DRIVER_OBJECT(buffer) +IPCZ_MSG_END() + IPCZ_MSG_END_INTERFACE() diff --git a/third_party/ipcz/src/test/multinode_test.cc b/third_party/ipcz/src/test/multinode_test.cc index 56764a8c97b5cb..9455476ea2e1d2 100644 --- a/third_party/ipcz/src/test/multinode_test.cc +++ b/third_party/ipcz/src/test/multinode_test.cc @@ -118,6 +118,9 @@ const IpczDriver& TestNode::GetDriver() const { case DriverMode::kAsync: return reference_drivers::kAsyncReferenceDriver; + case DriverMode::kAsyncDelegatedAlloc: + return reference_drivers::kAsyncReferenceDriver; + #if BUILDFLAG(ENABLE_IPCZ_MULTIPROCESS_TESTS) case DriverMode::kMultiprocess: return reference_drivers::kMultiprocessReferenceDriver; @@ -142,12 +145,15 @@ void TestNode::Initialize(DriverMode driver_mode, } void TestNode::ConnectToBroker(absl::Span portals) { + uint32_t flags = IPCZ_CONNECT_NODE_TO_BROKER; + if (driver_mode_ == DriverMode::kAsyncDelegatedAlloc) { + flags |= IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE; + } IpczDriverHandle transport = std::exchange(transport_, IPCZ_INVALID_DRIVER_HANDLE); ABSL_ASSERT(transport != IPCZ_INVALID_DRIVER_HANDLE); - const IpczResult result = - ipcz().ConnectNode(node(), transport, portals.size(), - IPCZ_CONNECT_NODE_TO_BROKER, nullptr, portals.data()); + const IpczResult result = ipcz().ConnectNode( + node(), transport, portals.size(), flags, nullptr, portals.data()); ASSERT_EQ(IPCZ_RESULT_OK, result); } diff --git a/third_party/ipcz/src/test/multinode_test.h b/third_party/ipcz/src/test/multinode_test.h index 7f0b98b37edc43..6f0842317eba51 100644 --- a/third_party/ipcz/src/test/multinode_test.h +++ b/third_party/ipcz/src/test/multinode_test.h @@ -303,11 +303,12 @@ class MultinodeTest : public TestNodeType, #endif // TODO: Add other DriverMode enumerators here as support is landed. -#define INSTANTIATE_MULTINODE_TEST_SUITE_P(suite) \ - INSTANTIATE_TEST_SUITE_P( \ - , suite, \ - ::testing::Values( \ - ipcz::test::DriverMode::kSync, \ - ipcz::test::DriverMode::kAsync IPCZ_EXTRA_DRIVER_MODES)) +#define INSTANTIATE_MULTINODE_TEST_SUITE_P(suite) \ + INSTANTIATE_TEST_SUITE_P( \ + , suite, \ + ::testing::Values(ipcz::test::DriverMode::kSync, \ + ipcz::test::DriverMode::kAsync, \ + ipcz::test::DriverMode::kAsyncDelegatedAlloc \ + IPCZ_EXTRA_DRIVER_MODES)) #endif // IPCZ_SRC_TEST_MULTINODE_TEST_H_