Skip to content

Commit

Permalink
ipcz: Implement ConnectNode() API
Browse files Browse the repository at this point in the history
Split out from the ipcz uber-CL: https://crrev.com/c/3570271

Implements ConnectNode() support for connections between brokers and
non-brokers.

Also adds support to Router for outbound parcel transmission to retain
parcels in an outbound queue when no outward link is present.

Bug: 1299283
Change-Id: I871704bae0fcfc9754d20322cc41fa92900525ac
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3626220
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Alex Gough <ajgo@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1001295}
  • Loading branch information
krockot authored and Chromium LUCI CQ committed May 10, 2022
1 parent 651c4ab commit b9b9341
Show file tree
Hide file tree
Showing 17 changed files with 666 additions and 45 deletions.
3 changes: 3 additions & 0 deletions third_party/ipcz/src/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ ipcz_source_set("ipcz_tests_sources") {

sources = [
"api_test.cc",
"connect_test.cc",
"ipcz/driver_memory_test.cc",
"ipcz/driver_object_test.cc",
"ipcz/driver_transport_test.cc",
Expand All @@ -262,6 +263,8 @@ ipcz_source_set("ipcz_tests_sources") {
"reference_drivers/single_process_reference_driver_test.cc",
"test/mock_driver.cc",
"test/mock_driver.h",
"test/multinode_test.cc",
"test/multinode_test.h",
"test/test_base.cc",
"test/test_base.h",
"test/test_transport_listener.cc",
Expand Down
13 changes: 12 additions & 1 deletion third_party/ipcz/src/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,18 @@ IpczResult ConnectNode(IpczHandle node_handle,
IpczConnectNodeFlags flags,
const void* options,
IpczHandle* initial_portals) {
return IPCZ_RESULT_UNIMPLEMENTED;
ipcz::Node* node = ipcz::Node::FromHandle(node_handle);
if (!node || driver_transport == IPCZ_INVALID_HANDLE) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}

if (num_initial_portals == 0 || !initial_portals) {
return IPCZ_RESULT_INVALID_ARGUMENT;
}

return node->ConnectNode(
driver_transport, flags,
absl::Span<IpczHandle>(initial_portals, num_initial_portals));
}

IpczResult OpenPortals(IpczHandle node_handle,
Expand Down
48 changes: 45 additions & 3 deletions third_party/ipcz/src/api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ const IpczDriver& kDefaultDriver =
using APITest = test::TestBase;

TEST_F(APITest, Unimplemented) {
EXPECT_EQ(IPCZ_RESULT_UNIMPLEMENTED,
ipcz().ConnectNode(IPCZ_INVALID_HANDLE, IPCZ_INVALID_DRIVER_HANDLE,
0, IPCZ_NO_FLAGS, nullptr, nullptr));
EXPECT_EQ(IPCZ_RESULT_UNIMPLEMENTED,
ipcz().MergePortals(IPCZ_INVALID_HANDLE, IPCZ_INVALID_HANDLE,
IPCZ_NO_FLAGS, nullptr));
Expand Down Expand Up @@ -71,6 +68,51 @@ TEST_F(APITest, CreateNode) {
EXPECT_EQ(IPCZ_RESULT_OK, ipcz().Close(node, IPCZ_NO_FLAGS, nullptr));
}

TEST_F(APITest, ConnectNodeInvalid) {
IpczHandle node = CreateNode(kDefaultDriver);
IpczDriverHandle transport0, transport1;
ASSERT_EQ(IPCZ_RESULT_OK,
kDefaultDriver.CreateTransports(
IPCZ_INVALID_DRIVER_HANDLE, IPCZ_INVALID_DRIVER_HANDLE,
IPCZ_NO_FLAGS, nullptr, &transport0, &transport1));

IpczHandle portal;

// Invalid node handle
EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
ipcz().ConnectNode(IPCZ_INVALID_HANDLE, transport0, 1,
IPCZ_NO_FLAGS, nullptr, &portal));

// Non-node handle
auto [a, b] = OpenPortals(node);
EXPECT_EQ(
IPCZ_RESULT_INVALID_ARGUMENT,
ipcz().ConnectNode(a, transport0, 1, IPCZ_NO_FLAGS, nullptr, &portal));
CloseAll({a, b});

// Invalid transport
EXPECT_EQ(IPCZ_RESULT_INVALID_ARGUMENT,
ipcz().ConnectNode(node, IPCZ_INVALID_DRIVER_HANDLE, 1,
IPCZ_NO_FLAGS, nullptr, &portal));

// No initial portals
EXPECT_EQ(
IPCZ_RESULT_INVALID_ARGUMENT,
ipcz().ConnectNode(node, transport0, 0, IPCZ_NO_FLAGS, nullptr, &portal));

// Null portal storage
EXPECT_EQ(
IPCZ_RESULT_INVALID_ARGUMENT,
ipcz().ConnectNode(node, transport0, 0, IPCZ_NO_FLAGS, nullptr, nullptr));

EXPECT_EQ(IPCZ_RESULT_OK,
kDefaultDriver.Close(transport0, IPCZ_NO_FLAGS, nullptr));
EXPECT_EQ(IPCZ_RESULT_OK,
kDefaultDriver.Close(transport1, IPCZ_NO_FLAGS, nullptr));

Close(node);
}

TEST_F(APITest, OpenPortalsInvalid) {
IpczHandle node = CreateNode(kDefaultDriver);

Expand Down
184 changes: 184 additions & 0 deletions third_party/ipcz/src/connect_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <string>

#include "ipcz/ipcz.h"
#include "ipcz/node_messages.h"
#include "test/multinode_test.h"
#include "test/test_transport_listener.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/abseil-cpp/absl/types/optional.h"

namespace ipcz {
namespace {

using ConnectTest = test::MultinodeTestWithDriver;

TEST_P(ConnectTest, BrokerToNonBroker) {
IpczHandle broker = CreateBrokerNode();
IpczHandle non_broker = CreateNonBrokerNode();

IpczDriverHandle broker_transport;
IpczDriverHandle non_broker_transport;
CreateBrokerToNonBrokerTransports(&broker_transport, &non_broker_transport);

IpczHandle non_broker_portal;
ASSERT_EQ(IPCZ_RESULT_OK, ipcz().ConnectNode(non_broker, non_broker_transport,
1, IPCZ_CONNECT_NODE_TO_BROKER,
nullptr, &non_broker_portal));

IpczHandle broker_portal;
ASSERT_EQ(IPCZ_RESULT_OK,
ipcz().ConnectNode(broker, broker_transport, 1, IPCZ_NO_FLAGS,
nullptr, &broker_portal));

Close(broker_portal);
EXPECT_EQ(IPCZ_RESULT_OK,
WaitForConditionFlags(non_broker_portal, IPCZ_TRAP_PEER_CLOSED));

CloseAll({non_broker_portal, non_broker, broker});
}

TEST_P(ConnectTest, SurplusPortals) {
IpczHandle broker = CreateBrokerNode();
IpczHandle non_broker = CreateNonBrokerNode();

IpczDriverHandle broker_transport;
IpczDriverHandle non_broker_transport;
CreateBrokerToNonBrokerTransports(&broker_transport, &non_broker_transport);

constexpr size_t kNumBrokerPortals = 2;
constexpr size_t kNumNonBrokerPortals = 5;
static_assert(kNumBrokerPortals < kNumNonBrokerPortals,
"Test requires fewer broker portals than non-broker portals");

IpczHandle broker_portals[kNumBrokerPortals];
ASSERT_EQ(
IPCZ_RESULT_OK,
ipcz().ConnectNode(broker, broker_transport, std::size(broker_portals),
IPCZ_NO_FLAGS, nullptr, broker_portals));

IpczHandle non_broker_portals[kNumNonBrokerPortals];
ASSERT_EQ(IPCZ_RESULT_OK, ipcz().ConnectNode(non_broker, non_broker_transport,
std::size(non_broker_portals),
IPCZ_CONNECT_NODE_TO_BROKER,
nullptr, non_broker_portals));

// All of the surplus broker portals should observe peer closure.
for (size_t i = kNumBrokerPortals; i < kNumNonBrokerPortals; ++i) {
EXPECT_EQ(IPCZ_RESULT_OK, WaitForConditionFlags(non_broker_portals[i],
IPCZ_TRAP_PEER_CLOSED));
}

for (IpczHandle portal : non_broker_portals) {
Close(portal);
}
for (IpczHandle portal : broker_portals) {
Close(portal);
}
CloseAll({non_broker, broker});
}

TEST_P(ConnectTest, DisconnectWithoutHandshake) {
IpczHandle broker = CreateBrokerNode();
IpczHandle non_broker = CreateNonBrokerNode();

// First fail to connect a broker.
IpczDriverHandle broker_transport, non_broker_transport;
CreateBrokerToNonBrokerTransports(&broker_transport, &non_broker_transport);

IpczHandle portal;
{
// This listener is scoped such that it closes the non-broker's transport
// after the broker issues its ConnectNode(). This should trigger a
// rejection and ultimately portal closure by the broker.
test::TestTransportListener non_broker_listener(non_broker,
non_broker_transport);
non_broker_listener.DiscardMessages<msg::ConnectFromBrokerToNonBroker>();

ASSERT_EQ(IPCZ_RESULT_OK,
ipcz().ConnectNode(broker, broker_transport, 1, IPCZ_NO_FLAGS,
nullptr, &portal));
}

EXPECT_EQ(IPCZ_RESULT_OK,
WaitForConditionFlags(portal, IPCZ_TRAP_PEER_CLOSED));
Close(portal);

// Next fail to connect a non-broker.
CreateBrokerToNonBrokerTransports(&broker_transport, &non_broker_transport);

{
// This listener is scoped such that it closes the broker transport after
// the non-broker issues its ConnectNode(). This should trigger a rejection
// and ultimately portal closure by the non-broker.
test::TestTransportListener broker_listener(broker, broker_transport);
broker_listener.DiscardMessages<msg::ConnectFromNonBrokerToBroker>();

ASSERT_EQ(
IPCZ_RESULT_OK,
ipcz().ConnectNode(non_broker, non_broker_transport, 1,
IPCZ_CONNECT_NODE_TO_BROKER, nullptr, &portal));
}

EXPECT_EQ(IPCZ_RESULT_OK,
WaitForConditionFlags(portal, IPCZ_TRAP_PEER_CLOSED));

CloseAll({portal, non_broker, broker});
}

TEST_P(ConnectTest, DisconnectOnBadMessage) {
IpczHandle broker = CreateBrokerNode();
IpczHandle non_broker = CreateNonBrokerNode();

IpczDriverHandle broker_transport, non_broker_transport;
CreateBrokerToNonBrokerTransports(&broker_transport, &non_broker_transport);

// First fail to connect a broker.
IpczHandle portal;
ASSERT_EQ(IPCZ_RESULT_OK,
ipcz().ConnectNode(broker, broker_transport, 1, IPCZ_NO_FLAGS,
nullptr, &portal));

test::TestTransportListener non_broker_listener(non_broker,
non_broker_transport);
non_broker_listener.DiscardMessages<msg::ConnectFromBrokerToNonBroker>();

const char kBadMessage[] = "this will never be a valid handshake message!";
EXPECT_EQ(IPCZ_RESULT_OK,
GetDriver().Transmit(non_broker_transport, kBadMessage,
std::size(kBadMessage), nullptr, 0,
IPCZ_NO_FLAGS, nullptr));

EXPECT_EQ(IPCZ_RESULT_OK,
WaitForConditionFlags(portal, IPCZ_TRAP_PEER_CLOSED));

non_broker_listener.StopListening();
Close(portal);

// Next fail to connect a non-broker.
CreateBrokerToNonBrokerTransports(&broker_transport, &non_broker_transport);

test::TestTransportListener broker_listener(broker, broker_transport);
broker_listener.DiscardMessages<msg::ConnectFromNonBrokerToBroker>();

ASSERT_EQ(IPCZ_RESULT_OK,
ipcz().ConnectNode(non_broker, non_broker_transport, 1,
IPCZ_CONNECT_NODE_TO_BROKER, nullptr, &portal));
EXPECT_EQ(IPCZ_RESULT_OK,
GetDriver().Transmit(broker_transport, kBadMessage,
std::size(kBadMessage), nullptr, 0,
IPCZ_NO_FLAGS, nullptr));
EXPECT_EQ(IPCZ_RESULT_OK,
WaitForConditionFlags(portal, IPCZ_TRAP_PEER_CLOSED));

broker_listener.StopListening();
CloseAll({portal, non_broker, broker});
}

INSTANTIATE_MULTINODE_TEST_SUITE_P(ConnectTest);

} // namespace
} // namespace ipcz
3 changes: 3 additions & 0 deletions third_party/ipcz/src/ipcz/local_router_link.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
namespace ipcz {

// Local link between two Routers on the same node. This class is thread-safe.
//
// NOTE: This implementation must take caution when calling into any Router. See
// note on RouterLink's own class documentation.
class LocalRouterLink : public RouterLink {
public:
// Creates a new pair of LocalRouterLinks linking the given pair of Routers
Expand Down
43 changes: 43 additions & 0 deletions third_party/ipcz/src/ipcz/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,49 @@ IpczResult Node::Close() {
return IPCZ_RESULT_OK;
}

void Node::ShutDown() {
absl::flat_hash_map<NodeName, Ref<NodeLink>> node_links;
{
absl::MutexLock lock(&mutex_);
std::swap(node_links_, node_links);
broker_link_.reset();
}

for (const auto& entry : node_links) {
entry.second->Deactivate();
}
}

IpczResult Node::ConnectNode(IpczDriverHandle driver_transport,
IpczConnectNodeFlags flags,
absl::Span<IpczHandle> initial_portals) {
std::vector<Ref<Portal>> portals(initial_portals.size());
for (size_t i = 0; i < initial_portals.size(); ++i) {
auto portal =
MakeRefCounted<Portal>(WrapRefCounted(this), MakeRefCounted<Router>());
portals[i] = portal;
initial_portals[i] = Portal::ReleaseAsHandle(std::move(portal));
}

auto transport = MakeRefCounted<DriverTransport>(
DriverObject(WrapRefCounted(this), driver_transport));
IpczResult result = NodeConnector::ConnectNode(WrapRefCounted(this),
transport, flags, portals);
if (result != IPCZ_RESULT_OK) {
// On failure the caller retains ownership of `driver_transport`. Release
// it here so it doesn't get closed when `transport` is destroyed.
transport->Release();

// Wipe out the initial portals we created, since they are invalid and
// effectively not returned to the caller on failure.
for (Ref<Portal>& portal : portals) {
Ref<Portal> doomed_portal = AdoptRef(portal.get());
}
return result;
}
return IPCZ_RESULT_OK;
}

NodeName Node::GetAssignedName() {
absl::MutexLock lock(&mutex_);
return assigned_name_;
Expand Down
12 changes: 12 additions & 0 deletions third_party/ipcz/src/ipcz/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ class Node : public APIObjectImpl<Node, APIObject::kNode> {
// APIObject:
IpczResult Close() override;

// Deactivates all NodeLinks and their underlying driver transports in
// preparation for this node's imminent destruction.
void ShutDown();

// Connects to another node using `driver_transport` for I/O to and from the
// other node. `initial_portals` is a collection of new portals who may
// immediately begin to route parcels over a link to the new node, assuming
// the link is established successfully.
IpczResult ConnectNode(IpczDriverHandle driver_transport,
IpczConnectNodeFlags flags,
absl::Span<IpczHandle> initial_portals);

// Retrieves the name assigned to this node, if any.
NodeName GetAssignedName();

Expand Down
3 changes: 3 additions & 0 deletions third_party/ipcz/src/ipcz/remote_router_link.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class NodeLink;
// LinkSide at construction time. This assignment is arbitrary but will always
// be the opposite of the LinkSide assigned to the RemoteRouteLink on the other
// end.
//
// NOTE: This implementation must take caution when calling into any Router. See
// note on RouterLink's own class documentation.
class RemoteRouterLink : public RouterLink {
public:
// Constructs a new RemoteRouterLink which sends messages over `node_link`
Expand Down

0 comments on commit b9b9341

Please sign in to comment.