Skip to content

Commit

Permalink
grid: changing to defered deletion (#34832)
Browse files Browse the repository at this point in the history
changing the wrapper callbacks and async callbacks in the grid to do deferred deletion.

currently we infer immediate response / async newStream from return codes.
This gets more complicated when we add in happy eyeballs, so instead we want to determine if we should return a cancelable handle based on if the caller has been informed of success or failure. This requires the objects to not be deleted under the stack of newStream hence the migration to defered delete.

Also doing a follow-up from the prior PR and creating connection pools explicitly.

Risk Level: high (object lifetime changes)
Testing: existing tests
Docs Changes: n/a
Release Notes: n/a

Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk committed Jun 24, 2024
1 parent 746cfff commit 243787f
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 85 deletions.
81 changes: 41 additions & 40 deletions source/common/http/conn_pool_grid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void ConnectivityGrid::WrapperCallbacks::onConnectionAttemptFailed(
}
maybeMarkHttp3Broken();

auto delete_this_on_return = attempt->removeFromList(connection_attempts_);
grid_.dispatcher_.deferredDelete(attempt->removeFromList(connection_attempts_));

// If there is another connection attempt in flight then let that proceed.
if (!connection_attempts_.empty()) {
Expand Down Expand Up @@ -111,8 +111,8 @@ void ConnectivityGrid::WrapperCallbacks::signalFailureAndDeleteSelf(
}

void ConnectivityGrid::WrapperCallbacks::deleteThis() {
// By removing the entry from the list, it will be deleted.
removeFromList(grid_.wrapped_callbacks_);
// Set this to delete on the next dispatcher loop.
grid_.dispatcher_.deferredDelete(removeFromList(grid_.wrapped_callbacks_));
}

ConnectivityGrid::StreamCreationResult
Expand All @@ -139,7 +139,7 @@ void ConnectivityGrid::WrapperCallbacks::onConnectionAttemptReady(
maybeMarkHttp3Broken();
}

auto delete_this_on_return = attempt->removeFromList(connection_attempts_);
grid_.dispatcher_.deferredDelete(attempt->removeFromList(connection_attempts_));
ConnectionPool::Callbacks* callbacks = inner_callbacks_;
inner_callbacks_ = nullptr;
// If an HTTP/3 connection attempts is in progress, let it complete so that if it succeeds
Expand Down Expand Up @@ -207,9 +207,8 @@ ConnectivityGrid::WrapperCallbacks::tryAnotherConnection() {
// return true regardless of if newStream resulted in an immediate result or
// an async call, as either way the attempt will result in success/failure
// callbacks.
grid_.createNextPool(); // Make sure the HTTP/2 pool exists
has_attempted_http2_ = true;
return newStream(*grid_.http2_pool_);
return newStream(*grid_.getOrCreateHttp2Pool());
}

ConnectivityGrid::ConnectivityGrid(
Expand Down Expand Up @@ -263,31 +262,40 @@ void ConnectivityGrid::deleteIsPending() {
}
}

ConnectionPool::Instance* ConnectivityGrid::createNextPool() {
ConnectionPool::Instance* ConnectivityGrid::getOrCreateHttp3Pool() {
ASSERT(!deferred_deleting_);
// Pools are created by newStream, which should not be called during draining.
ASSERT(!draining_);
// If both pools exist we're done.
if ((http2_pool_ && http3_pool_) || draining_) {
return nullptr;
if (http3_pool_ == nullptr) {
http3_pool_ = createHttp3Pool();
pools_.push_back(http3_pool_.get());
setupPool(*http3_pool_.get());
}
return http3_pool_.get();
}

// HTTP/3 is hard-coded as higher priority, H2 as secondary.
if (!http3_pool_) {
http3_pool_ = Http3::allocateConnPool(
dispatcher_, random_generator_, host_, priority_, options_, transport_socket_options_,
state_, quic_stat_names_, *alternate_protocols_, scope_,
makeOptRefFromPtr<Http3::PoolConnectResultCallback>(this), quic_info_);
pools_.push_back(http3_pool_.get());
} else {
http2_pool_ = std::make_unique<HttpConnPoolImplMixed>(
dispatcher_, random_generator_, host_, priority_, options_, transport_socket_options_,
state_, origin_, alternate_protocols_);
ConnectionPool::Instance* ConnectivityGrid::getOrCreateHttp2Pool() {
ASSERT(!deferred_deleting_);
ASSERT(!draining_);
if (http2_pool_ == nullptr) {
http2_pool_ = createHttp2Pool();
pools_.push_back(http2_pool_.get());
setupPool(*http2_pool_.get());
}

setupPool(*pools_.back());
return pools_.back();
return http2_pool_.get();
}

ConnectionPool::InstancePtr ConnectivityGrid::createHttp2Pool() {
return std::make_unique<HttpConnPoolImplMixed>(dispatcher_, random_generator_, host_, priority_,
options_, transport_socket_options_, state_,
origin_, alternate_protocols_);
}

ConnectionPool::InstancePtr ConnectivityGrid::createHttp3Pool() {
return Http3::allocateConnPool(
dispatcher_, random_generator_, host_, priority_, options_, transport_socket_options_, state_,
quic_stat_names_, *alternate_protocols_, scope_,
makeOptRefFromPtr<Http3::PoolConnectResultCallback>(this), quic_info_);
}

void ConnectivityGrid::setupPool(ConnectionPool::Instance& pool) {
Expand All @@ -311,10 +319,7 @@ ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder&
ASSERT(!draining_);

// Always start with the HTTP/3 pool if it exists.
ConnectionPool::Instance* pool = http3_pool_ ? http3_pool_.get() : http2_pool_.get();
if (!pool) {
pool = createNextPool();
}
ConnectionPool::Instance* pool = getOrCreateHttp3Pool();
Instance::StreamOptions overriding_options = options;
bool delay_tcp_attempt = true;
if (shouldAttemptHttp3() && options.can_use_http3_) {
Expand All @@ -323,9 +328,7 @@ ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder&
delay_tcp_attempt = false;
}
} else {
// Make sure the HTTP/2 pool is created.
createNextPool();
pool = http2_pool_.get();
pool = getOrCreateHttp2Pool();
}
auto wrapped_callback =
std::make_unique<WrapperCallbacks>(*this, decoder, callbacks, overriding_options);
Expand All @@ -334,18 +337,17 @@ ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder&
if (ret->newStream(*pool) == StreamCreationResult::ImmediateResult) {
// If newStream succeeds, return nullptr as the caller has received their
// callback and does not need a cancellable handle. At this point the
// WrappedCallbacks object has also been deleted.
// WrappedCallbacks object is queued to be deleted.
return nullptr;
}
if (!delay_tcp_attempt) {
// Immediately start TCP attempt if HTTP/3 failed recently.
absl::optional<StreamCreationResult> result = ret->tryAnotherConnection();
if (result.has_value() && result.value() == StreamCreationResult::ImmediateResult) {
// As above, if we have an immediate success, return nullptr.
return nullptr;
}
ret->tryAnotherConnection();
}
return ret;

// Return a handle if the caller hasn't yet been notified of success/failure.
// There may still be connection attempts, if the wrapper is waiting on a slow H3 connection.
return ret->hasNotifiedCaller() ? nullptr : ret;
}

void ConnectivityGrid::addIdleCallback(IdleCb cb) {
Expand All @@ -356,8 +358,7 @@ void ConnectivityGrid::addIdleCallback(IdleCb cb) {

void ConnectivityGrid::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) {
if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
// Note that no new pools can be created from this point on
// as createNextPool fast-fails if `draining_` is true.
// Note that no new pools should be created from this point on.
draining_ = true;
}
for (auto& pool : pools_) {
Expand Down
22 changes: 17 additions & 5 deletions source/common/http/conn_pool_grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,26 @@ class ConnectivityGrid : public ConnectionPool::Instance,
// It also relays cancellation calls between the original caller and the
// current connection attempts.
class WrapperCallbacks : public ConnectionPool::Cancellable,
public LinkedObject<WrapperCallbacks> {
public LinkedObject<WrapperCallbacks>,
public Event::DeferredDeletable {
public:
WrapperCallbacks(ConnectivityGrid& grid, Http::ResponseDecoder& decoder,
ConnectionPool::Callbacks& callbacks, const Instance::StreamOptions& options);

bool hasNotifiedCaller() { return inner_callbacks_ == nullptr; }

// Event::DeferredDeletable
// The wrapper is being deleted - cancel all alarms.
void deleteIsPending() override { next_attempt_timer_.reset(); }

// This holds state for a single connection attempt to a specific pool.
class ConnectionAttemptCallbacks : public ConnectionPool::Callbacks,
public LinkedObject<ConnectionAttemptCallbacks> {
public LinkedObject<ConnectionAttemptCallbacks>,
public Event::DeferredDeletable {
public:
ConnectionAttemptCallbacks(WrapperCallbacks& parent, ConnectionPool::Instance& pool);
~ConnectionAttemptCallbacks() override;
void deleteIsPending() override {}

StreamCreationResult newStream();

Expand Down Expand Up @@ -206,9 +215,12 @@ class ConnectivityGrid : public ConnectionPool::Instance,
// that specifies HTTP/3 and HTTP/3 is not broken.
bool shouldAttemptHttp3();

// Creates the next pool in the priority list, or nullptr if all pools have been created.
// TODO(alyssawilk) replace this now we have explicit pools.
virtual ConnectionPool::Instance* createNextPool();
// Returns the specified pool, which will be created if necessary
ConnectionPool::Instance* getOrCreateHttp3Pool();
ConnectionPool::Instance* getOrCreateHttp2Pool();

virtual ConnectionPool::InstancePtr createHttp3Pool();
virtual ConnectionPool::InstancePtr createHttp2Pool();

// This batch of member variables are latched objects required for pool creation.
Event::Dispatcher& dispatcher_;
Expand Down
66 changes: 26 additions & 40 deletions test/common/http/conn_pool_grid_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,26 @@ namespace Http {
class ConnectivityGridForTest : public ConnectivityGrid {
public:
using ConnectivityGrid::ConnectivityGrid;
using ConnectivityGrid::getOrCreateHttp2Pool;
using ConnectivityGrid::getOrCreateHttp3Pool;

static bool hasHttp3FailedRecently(const ConnectivityGrid& grid) {
return grid.getHttp3StatusTracker().hasHttp3FailedRecently();
}

static ConnectionPool::Instance* forceCreateNextPool(ConnectivityGrid& grid) {
return grid.createNextPool();
// Helper method to expose getOrCreateHttp3Pool() for non-test grids
static ConnectionPool::Instance* forceGetOrCreateHttp3Pool(ConnectivityGrid& grid) {
return grid.getOrCreateHttp3Pool();
}
// Helper method to expose getOrCreateHttp2Pool() for non-test grids
static ConnectionPool::Instance* forceGetOrCreateHttp2Pool(ConnectivityGrid& grid) {
return grid.getOrCreateHttp2Pool();
}

ConnectionPool::Instance* createNextPool() override {
if (http2_pool_ && http3_pool_) {
return nullptr;
}
ConnectionPool::InstancePtr createHttp3Pool() override { return createMockPool("http3"); }
ConnectionPool::InstancePtr createHttp2Pool() override { return createMockPool("http2"); }
ConnectionPool::InstancePtr createMockPool(absl::string_view type) {
ConnectionPool::MockInstance* instance = new NiceMock<ConnectionPool::MockInstance>();
setupPool(*instance);
if (!http3_pool_) {
http3_pool_.reset(instance);
} else {
http2_pool_.reset(instance);
}
pools_.push_back(instance);
ON_CALL(*instance, newStream(_, _, _))
.WillByDefault(
Invoke([&, &grid = *this](Http::ResponseDecoder&, ConnectionPool::Callbacks& callbacks,
Expand All @@ -80,17 +79,8 @@ class ConnectivityGridForTest : public ConnectivityGrid {
callbacks_.push_back(&callbacks);
return cancel_;
}));
if (!http2_pool_) {
EXPECT_CALL(*http3Pool(), protocolDescription())
.Times(AnyNumber())
.WillRepeatedly(Return("http3"));

return instance;
}
EXPECT_CALL(*http2Pool(), protocolDescription())
.Times(AnyNumber())
.WillRepeatedly(Return("http2"));
return instance;
EXPECT_CALL(*instance, protocolDescription()).Times(AnyNumber()).WillRepeatedly(Return(type));
return absl::WrapUnique(instance);
}

ConnectionPool::MockInstance* http3Pool() {
Expand Down Expand Up @@ -181,15 +171,13 @@ class ConnectivityGridTest : public Event::TestUsingSimulatedTime, public testin
const Network::TransportSocketOptionsConstSharedPtr transport_socket_options_;
ConnectivityGrid::ConnectivityOptions options_;
Upstream::ClusterConnectivityState state_;
NiceMock<Event::MockDispatcher> dispatcher_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_{new NiceMock<Upstream::MockClusterInfo>()};
NiceMock<Random::MockRandomGenerator> random_;
HttpServerPropertiesCacheSharedPtr alternate_protocols_;
Stats::IsolatedStoreImpl store_;
Quic::QuicStatNames quic_stat_names_;
PersistentQuicInfoPtr quic_connection_persistent_info_;
NiceMock<Envoy::ConnectionPool::MockCancellable> cancel_;
std::unique_ptr<ConnectivityGridForTest> grid_;
Upstream::HostDescriptionConstSharedPtr host_;

NiceMock<ConnPoolCallbacks> callbacks_;
Expand All @@ -200,6 +188,8 @@ class ConnectivityGridTest : public Event::TestUsingSimulatedTime, public testin

NiceMock<Server::Configuration::MockTransportSocketFactoryContext> factory_context_;
testing::NiceMock<ThreadLocal::MockInstance> thread_local_;
NiceMock<Event::MockDispatcher> dispatcher_;
std::unique_ptr<ConnectivityGridForTest> grid_;
};

// Test the first pool successfully connecting.
Expand Down Expand Up @@ -589,14 +579,14 @@ TEST_F(ConnectivityGridTest, Drain) {
grid_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections);

// Synthetically create a pool.
grid_->createNextPool();
grid_->getOrCreateHttp3Pool();
{
EXPECT_CALL(*grid_->http3Pool(),
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections));
grid_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections);
}

grid_->createNextPool();
grid_->getOrCreateHttp2Pool();
{
EXPECT_CALL(*grid_->http3Pool(),
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections));
Expand All @@ -611,8 +601,8 @@ TEST_F(ConnectivityGridTest, DrainCallbacks) {
initialize();
addHttp3AlternateProtocol();
// Synthetically create both pools.
grid_->createNextPool();
grid_->createNextPool();
grid_->getOrCreateHttp3Pool();
grid_->getOrCreateHttp2Pool();

bool drain_received = false;

Expand Down Expand Up @@ -659,8 +649,8 @@ TEST_F(ConnectivityGridTest, IdleCallbacks) {
initialize();
addHttp3AlternateProtocol();
// Synthetically create both pools.
grid_->createNextPool();
grid_->createNextPool();
grid_->getOrCreateHttp3Pool();
grid_->getOrCreateHttp2Pool();

bool idle_received = false;

Expand Down Expand Up @@ -692,7 +682,7 @@ TEST_F(ConnectivityGridTest, IdleCallbacks) {
TEST_F(ConnectivityGridTest, NoDrainOnTeardown) {
initialize();
addHttp3AlternateProtocol();
grid_->createNextPool();
grid_->getOrCreateHttp3Pool();

bool drain_received = false;

Expand Down Expand Up @@ -980,19 +970,15 @@ TEST_F(ConnectivityGridTest, RealGrid) {
EXPECT_FALSE(grid.hasActiveConnections());

// Create the HTTP/3 pool.
auto pool1 = ConnectivityGridForTest::forceCreateNextPool(grid);
auto pool1 = ConnectivityGridForTest::forceGetOrCreateHttp3Pool(grid);
ASSERT_TRUE(pool1 != nullptr);
EXPECT_EQ("HTTP/3", pool1->protocolDescription());
EXPECT_FALSE(grid.hasActiveConnections());

// Create the mixed pool.
auto pool2 = ConnectivityGridForTest::forceCreateNextPool(grid);
auto pool2 = ConnectivityGridForTest::forceGetOrCreateHttp2Pool(grid);
ASSERT_TRUE(pool2 != nullptr);
EXPECT_EQ("HTTP/1 HTTP/2 ALPN", pool2->protocolDescription());

// There is no third option currently.
auto pool3 = ConnectivityGridForTest::forceCreateNextPool(grid);
ASSERT_TRUE(pool3 == nullptr);
}

TEST_F(ConnectivityGridTest, ConnectionCloseDuringAysnConnect) {
Expand Down Expand Up @@ -1022,7 +1008,7 @@ TEST_F(ConnectivityGridTest, ConnectionCloseDuringAysnConnect) {
*quic_connection_persistent_info_);

// Create the HTTP/3 pool.
auto pool = ConnectivityGridForTest::forceCreateNextPool(grid);
auto pool = ConnectivityGridForTest::forceGetOrCreateHttp3Pool(grid);
ASSERT_TRUE(pool != nullptr);
EXPECT_EQ("HTTP/3", pool->protocolDescription());

Expand Down

0 comments on commit 243787f

Please sign in to comment.