Skip to content

Commit

Permalink
ipcz: Delegated shared memory allocation
Browse files Browse the repository at this point in the history
Implements support for IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE, a flag
for ConnectNode() which indicates that all shared memory allocation by
the local node must be delegated to the remote node.

A new DriverMode is added for parameterized multinode tests which uses
the existing async driver but which also always specifies
IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE when connecting a non-broker
test node to the broker test node.

Bug: 1299283
Change-Id: I257ffe37d85b1d25ca343a0c8f9ce1afef3c5787
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3785279
Reviewed-by: Alex Gough <ajgo@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1030722}
  • Loading branch information
krockot authored and Chromium LUCI CQ committed Aug 2, 2022
1 parent 27b58f3 commit 166bbe9
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 23 deletions.
17 changes: 14 additions & 3 deletions third_party/ipcz/src/ipcz/message.cc
Expand Up @@ -160,6 +160,10 @@ uint32_t Message::AllocateGenericArray(size_t element_size,
}

uint32_t Message::AppendDriverObject(DriverObject object) {
if (!object.is_valid()) {
return internal::kInvalidDriverObjectIndex;
}

const uint32_t index = checked_cast<uint32_t>(driver_objects_.size());
driver_objects_.push_back(std::move(object));
return index;
Expand All @@ -173,12 +177,17 @@ internal::DriverObjectArrayData Message::AppendDriverObjects(
};
driver_objects_.reserve(driver_objects_.size() + objects.size());
for (auto& object : objects) {
ABSL_ASSERT(object.is_valid());
driver_objects_.push_back(std::move(object));
}
return data;
}

DriverObject Message::TakeDriverObject(uint32_t index) {
if (index == internal::kInvalidDriverObjectIndex) {
return {};
}

// Note that `index` has already been validated by now.
ABSL_HARDENING_ASSERT(index < driver_objects_.size());
return std::move(driver_objects_[index]);
Expand Down Expand Up @@ -344,10 +353,12 @@ bool Message::DeserializeFromTransport(
switch (param.type) {
case internal::ParamType::kDriverObject: {
const uint32_t index = GetParamValueAt<uint32_t>(param.offset);
if (is_object_claimed[index]) {
return false;
if (index != internal::kInvalidDriverObjectIndex) {
if (is_object_claimed[index]) {
return false;
}
is_object_claimed[index] = true;
}
is_object_claimed[index] = true;
break;
}

Expand Down
4 changes: 4 additions & 0 deletions third_party/ipcz/src/ipcz/message.h
Expand Up @@ -115,6 +115,10 @@ struct IPCZ_ALIGN(8) DriverObjectArrayData {
uint32_t num_objects;
};

// Encodes an invalid driver object index. Any driver object field encoded as
// this value will deserialize to an invalid DriverObject.
constexpr uint32_t kInvalidDriverObjectIndex = 0xffffffff;

// End of wire structure definitions. Anything below this line is not meant to
// be encoded into messages.
#pragma pack(pop)
Expand Down
36 changes: 28 additions & 8 deletions third_party/ipcz/src/ipcz/node.cc
Expand Up @@ -122,12 +122,25 @@ NodeName Node::GenerateRandomName() const {
return name;
}

void Node::SetAllocationDelegate(Ref<NodeLink> link) {
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(!allocation_delegate_link_);
allocation_delegate_link_ = std::move(link);
}

void Node::AllocateSharedMemory(size_t size,
AllocateSharedMemoryCallback callback) {
// TODO: Implement delegated allocation when this Node is connected to another
// with the IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE flag set. For now we
// assume all nodes can perform direct allocation.
callback(DriverMemory(driver_, size));
Ref<NodeLink> delegate;
{
absl::MutexLock lock(&mutex_);
delegate = allocation_delegate_link_;
}

if (delegate) {
delegate->RequestMemory(size, std::move(callback));
} else {
callback(DriverMemory(driver_, size));
}
}

void Node::EstablishLink(const NodeName& name, EstablishLinkCallback callback) {
Expand Down Expand Up @@ -227,7 +240,7 @@ void Node::AcceptIntroduction(NodeLink& from_node_link,

Ref<NodeLink> new_link = NodeLink::CreateInactive(
WrapRefCounted(this), side, local_name, name, Type::kNormal,
remote_protocol_version, transport, std::move(memory));
remote_protocol_version, transport, memory);
ABSL_ASSERT(new_link);

std::vector<EstablishLinkCallback> callbacks;
Expand Down Expand Up @@ -290,14 +303,20 @@ void Node::DropLink(const NodeName& name) {
link = std::move(it->second);
node_links_.erase(it);

DVLOG(4) << "Node " << link->local_node_name().ToString() << " dropping "
const NodeName& local_name = link->local_node_name();
DVLOG(4) << "Node " << local_name.ToString() << " dropping "
<< " link to " << link->remote_node_name().ToString();
if (link == broker_link_) {
DVLOG(4) << "Node " << link->local_node_name().ToString()
<< " has lost its broker link";
DVLOG(4) << "Node " << local_name.ToString() << " lost its broker link";
broker_link_.reset();
lost_broker = true;
}

if (link == allocation_delegate_link_) {
DVLOG(4) << "Node " << local_name.ToString()
<< " lost its allocation delegate";
allocation_delegate_link_.reset();
}
}

link->Deactivate();
Expand All @@ -313,6 +332,7 @@ void Node::ShutDown() {
absl::MutexLock lock(&mutex_);
std::swap(node_links_, node_links);
broker_link_.reset();
allocation_delegate_link_.reset();
}

for (const auto& entry : node_links) {
Expand Down
12 changes: 12 additions & 0 deletions third_party/ipcz/src/ipcz/node.h
Expand Up @@ -98,6 +98,13 @@ class Node : public APIObjectImpl<Node, APIObject::kNode> {
// randomness.
NodeName GenerateRandomName() const;

// Sets a NodeLink to use for asynchronous shared memory allocation requests.
// This is configured when the ConnectNode() API is called with
// IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE. Typically this is combined with
// IPCZ_CONNECT_NODE_TO_BROKER when connecting from a sandboxed process which
// cannot allocate its own shared memory regions.
void SetAllocationDelegate(Ref<NodeLink> link);

// Requests allocation of a new shared memory object of the given size.
// `callback` is invoked with the new object when allocation is complete.
// This operation is asynchronous if allocation is delegated to another node,
Expand Down Expand Up @@ -168,6 +175,11 @@ class Node : public APIObjectImpl<Node, APIObject::kNode> {
// the node will lose all its other links too.
Ref<NodeLink> broker_link_ ABSL_GUARDED_BY(mutex_);

// A link over which all internal shared memory allocation is delegated. If
// null, this Node will always attempt to allocate shared memory directly
// through its ipcz driver.
Ref<NodeLink> allocation_delegate_link_ ABSL_GUARDED_BY(mutex_);

// Lookup table of broker-assigned node names and links to those nodes. All of
// these links and their associated names are received by the `broker_link_`
// if this is a non-broker node. If this is a broker node, these links are
Expand Down
6 changes: 3 additions & 3 deletions third_party/ipcz/src/ipcz/node_connector.cc
Expand Up @@ -132,9 +132,9 @@ class NodeConnectorForNonBrokerToBroker : public NodeConnector {
NodeLinkMemory::Create(node_, buffer_memory.Map()));
node_->SetAssignedName(connect.params().receiver_name);
node_->SetBrokerLink(new_link);

// TODO: Support delegated allocation of shared memory.
ABSL_ASSERT((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) == 0);
if ((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) != 0) {
node_->SetAllocationDelegate(new_link);
}

AcceptConnection(std::move(new_link), LinkSide::kB,
connect.params().num_initial_portals);
Expand Down
47 changes: 47 additions & 0 deletions third_party/ipcz/src/ipcz/node_link.cc
Expand Up @@ -8,6 +8,7 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <utility>

#include "ipcz/box.h"
Expand All @@ -28,6 +29,7 @@
#include "third_party/abseil-cpp/absl/base/macros.h"
#include "util/log.h"
#include "util/ref_counted.h"
#include "util/safe_math.h"

namespace ipcz {

Expand Down Expand Up @@ -225,6 +227,18 @@ void NodeLink::AcceptBypassLink(
Transmit(accept);
}

void NodeLink::RequestMemory(size_t size, RequestMemoryCallback callback) {
const uint32_t size32 = checked_cast<uint32_t>(size);
{
absl::MutexLock lock(&mutex_);
pending_memory_requests_[size32].push_back(std::move(callback));
}

msg::RequestMemory request;
request.params().size = size32;
Transmit(request);
}

void NodeLink::Deactivate() {
{
absl::MutexLock lock(&mutex_);
Expand Down Expand Up @@ -531,6 +545,39 @@ bool NodeLink::OnFlushRouter(msg::FlushRouter& flush) {
return true;
}

bool NodeLink::OnRequestMemory(msg::RequestMemory& request) {
DriverMemory memory(node_->driver(), request.params().size);
msg::ProvideMemory provide;
provide.params().size = request.params().size;
provide.params().buffer =
provide.AppendDriverObject(memory.TakeDriverObject());
Transmit(provide);
return true;
}

bool NodeLink::OnProvideMemory(msg::ProvideMemory& provide) {
DriverMemory memory(provide.TakeDriverObject(provide.params().buffer));
RequestMemoryCallback callback;
{
absl::MutexLock lock(&mutex_);
auto it = pending_memory_requests_.find(provide.params().size);
if (it == pending_memory_requests_.end()) {
return false;
}

std::list<RequestMemoryCallback>& callbacks = it->second;
ABSL_ASSERT(!callbacks.empty());
callback = std::move(callbacks.front());
callbacks.pop_front();
if (callbacks.empty()) {
pending_memory_requests_.erase(it);
}
}

callback(std::move(memory));
return true;
}

void NodeLink::OnTransportError() {
SublinkMap sublinks;
{
Expand Down
15 changes: 15 additions & 0 deletions third_party/ipcz/src/ipcz/node_link.h
Expand Up @@ -160,6 +160,12 @@ class NodeLink : public msg::NodeMessageListener {
SublinkId new_sublink,
FragmentRef<RouterLinkState> new_link_state);

// Sends a request to allocate a new shared memory region and invokes
// `callback` once the request succeeds or fails. On failure, `callback` is
// invoke with an invalid DriverMemory object.
using RequestMemoryCallback = std::function<void(DriverMemory)>;
void RequestMemory(size_t size, RequestMemoryCallback callback);

// Permanently deactivates this NodeLink. Once this call returns the NodeLink
// will no longer receive transport messages. It may still be used to transmit
// outgoing messages, but it cannot be reactivated. Transmissions over a
Expand Down Expand Up @@ -211,6 +217,8 @@ class NodeLink : public msg::NodeMessageListener {
bool OnBypassPeerWithLink(msg::BypassPeerWithLink& bypass) override;
bool OnStopProxyingToLocalPeer(msg::StopProxyingToLocalPeer& stop) override;
bool OnFlushRouter(msg::FlushRouter& flush) override;
bool OnRequestMemory(msg::RequestMemory& request) override;
bool OnProvideMemory(msg::ProvideMemory& provide) override;
void OnTransportError() override;

const Ref<Node> node_;
Expand All @@ -234,6 +242,13 @@ class NodeLink : public msg::NodeMessageListener {

using SublinkMap = absl::flat_hash_map<SublinkId, Sublink>;
SublinkMap sublinks_ ABSL_GUARDED_BY(mutex_);

// Pending memory allocation request callbacks. Keyed by request size, when
// an incoming ProvideMemory message is received, the front of the list for
// that size is removed from the map and invoked with the new memory object.
using MemoryRequestMap =
absl::flat_hash_map<uint32_t, std::list<RequestMemoryCallback>>;
MemoryRequestMap pending_memory_requests_ ABSL_GUARDED_BY(mutex_);
};

} // namespace ipcz
Expand Down
17 changes: 17 additions & 0 deletions third_party/ipcz/src/ipcz/node_messages_generator.h
Expand Up @@ -351,4 +351,21 @@ IPCZ_MSG_BEGIN(FlushRouter, IPCZ_MSG_ID(36), IPCZ_MSG_VERSION(0))
IPCZ_MSG_PARAM(SublinkId, sublink)
IPCZ_MSG_END()

// Requests allocation of a shared memory region of a given size. If the
// recipient can comply, they will send back a corresponding ProvideMemory
// message with a serialized memory region. This message is only sent to a
// node's allocation delegate (usually the broker), which is established by
// providing the IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE flag to ConnectNode().
IPCZ_MSG_BEGIN(RequestMemory, IPCZ_MSG_ID(64), IPCZ_MSG_VERSION(0))
IPCZ_MSG_PARAM(uint32_t, size)
IPCZ_MSG_END()

// Provides a new shared buffer to the receiver, owned exclusively by the
// receiver. The receiver is free to duplicate this buffer and share it with
// other nodes.
IPCZ_MSG_BEGIN(ProvideMemory, IPCZ_MSG_ID(65), IPCZ_MSG_VERSION(0))
IPCZ_MSG_PARAM(uint32_t, size)
IPCZ_MSG_PARAM_DRIVER_OBJECT(buffer)
IPCZ_MSG_END()

IPCZ_MSG_END_INTERFACE()
12 changes: 9 additions & 3 deletions third_party/ipcz/src/test/multinode_test.cc
Expand Up @@ -118,6 +118,9 @@ const IpczDriver& TestNode::GetDriver() const {
case DriverMode::kAsync:
return reference_drivers::kAsyncReferenceDriver;

case DriverMode::kAsyncDelegatedAlloc:
return reference_drivers::kAsyncReferenceDriver;

#if BUILDFLAG(ENABLE_IPCZ_MULTIPROCESS_TESTS)
case DriverMode::kMultiprocess:
return reference_drivers::kMultiprocessReferenceDriver;
Expand All @@ -142,12 +145,15 @@ void TestNode::Initialize(DriverMode driver_mode,
}

void TestNode::ConnectToBroker(absl::Span<IpczHandle> portals) {
uint32_t flags = IPCZ_CONNECT_NODE_TO_BROKER;
if (driver_mode_ == DriverMode::kAsyncDelegatedAlloc) {
flags |= IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE;
}
IpczDriverHandle transport =
std::exchange(transport_, IPCZ_INVALID_DRIVER_HANDLE);
ABSL_ASSERT(transport != IPCZ_INVALID_DRIVER_HANDLE);
const IpczResult result =
ipcz().ConnectNode(node(), transport, portals.size(),
IPCZ_CONNECT_NODE_TO_BROKER, nullptr, portals.data());
const IpczResult result = ipcz().ConnectNode(
node(), transport, portals.size(), flags, nullptr, portals.data());
ASSERT_EQ(IPCZ_RESULT_OK, result);
}

Expand Down
13 changes: 7 additions & 6 deletions third_party/ipcz/src/test/multinode_test.h
Expand Up @@ -303,11 +303,12 @@ class MultinodeTest : public TestNodeType,
#endif

// TODO: Add other DriverMode enumerators here as support is landed.
#define INSTANTIATE_MULTINODE_TEST_SUITE_P(suite) \
INSTANTIATE_TEST_SUITE_P( \
, suite, \
::testing::Values( \
ipcz::test::DriverMode::kSync, \
ipcz::test::DriverMode::kAsync IPCZ_EXTRA_DRIVER_MODES))
#define INSTANTIATE_MULTINODE_TEST_SUITE_P(suite) \
INSTANTIATE_TEST_SUITE_P( \
, suite, \
::testing::Values(ipcz::test::DriverMode::kSync, \
ipcz::test::DriverMode::kAsync, \
ipcz::test::DriverMode::kAsyncDelegatedAlloc \
IPCZ_EXTRA_DRIVER_MODES))

#endif // IPCZ_SRC_TEST_MULTINODE_TEST_H_

0 comments on commit 166bbe9

Please sign in to comment.