Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grid: cleaning up pools #34802

Merged
merged 5 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 39 additions & 50 deletions source/common/http/conn_pool_grid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ std::string getSni(const Network::TransportSocketOptionsConstSharedPtr& options,

ConnectivityGrid::WrapperCallbacks::WrapperCallbacks(ConnectivityGrid& grid,
Http::ResponseDecoder& decoder,
PoolIterator pool_it,
ConnectionPool::Callbacks& callbacks,
const Instance::StreamOptions& options)
: grid_(grid), decoder_(decoder), inner_callbacks_(&callbacks),
next_attempt_timer_(
grid_.dispatcher_.createTimer([this]() -> void { tryAnotherConnection(); })),
current_(pool_it), stream_options_(options) {
stream_options_(options) {
if (!stream_options_.can_use_http3_) {
// If alternate protocols are explicitly disabled, there must have been a failed request over
// HTTP/3 and the failure must be post-handshake. So disable HTTP/3 for this request.
Expand All @@ -45,8 +44,8 @@ ConnectivityGrid::WrapperCallbacks::WrapperCallbacks(ConnectivityGrid& grid,
}

ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::ConnectionAttemptCallbacks(
WrapperCallbacks& parent, PoolIterator it)
: parent_(parent), pool_it_(it) {}
WrapperCallbacks& parent, ConnectionPool::Instance& pool)
: parent_(parent), pool_(pool) {}

ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::~ConnectionAttemptCallbacks() {
if (cancellable_ != nullptr) {
Expand Down Expand Up @@ -116,10 +115,11 @@ void ConnectivityGrid::WrapperCallbacks::deleteThis() {
removeFromList(grid_.wrapped_callbacks_);
}

ConnectivityGrid::StreamCreationResult ConnectivityGrid::WrapperCallbacks::newStream() {
ENVOY_LOG(trace, "{} pool attempting to create a new stream to host '{}'.",
describePool(**current_), grid_.origin_.hostname_);
auto attempt = std::make_unique<ConnectionAttemptCallbacks>(*this, current_);
ConnectivityGrid::StreamCreationResult
ConnectivityGrid::WrapperCallbacks::newStream(ConnectionPool::Instance& pool) {
ENVOY_LOG(trace, "{} pool attempting to create a new stream to host '{}'.", describePool(pool),
grid_.origin_.hostname_);
auto attempt = std::make_unique<ConnectionAttemptCallbacks>(*this, pool);
LinkedList::moveIntoList(std::move(attempt), connection_attempts_);
if (!next_attempt_timer_->enabled()) {
next_attempt_timer_->enableTimer(grid_.next_attempt_duration_);
Expand Down Expand Up @@ -199,17 +199,17 @@ ConnectivityGrid::WrapperCallbacks::tryAnotherConnection() {
if (grid_.destroying_) {
return {};
}
absl::optional<PoolIterator> next_pool = grid_.nextPool(current_);
if (!next_pool.has_value()) {
if (has_attempted_http2_) {
// If there are no other pools to try, return an empty optional.
return {};
}
// Create a new connection attempt for the next pool. If we reach this point
// return true regardless of if newStream resulted in an immediate result or
// an async call, as either way the attempt will result in success/failure
// callbacks.
current_ = next_pool.value();
return newStream();
grid_.createNextPool(); // Make sure the HTTP/2 pool exists
has_attempted_http2_ = true;
return newStream(*grid_.http2_pool_);
}

ConnectivityGrid::ConnectivityGrid(
Expand Down Expand Up @@ -251,58 +251,57 @@ ConnectivityGrid::~ConnectivityGrid() {
wrapped_callbacks_.front()->signalFailureAndDeleteSelf(
ConnectionPool::PoolFailureReason::LocalConnectionFailure, "grid teardown", host_);
}
pools_.clear();
http2_pool_.reset();
http3_pool_.reset();
}

void ConnectivityGrid::deleteIsPending() {
deferred_deleting_ = true;

for (const auto& pool : pools_) {
pool->deleteIsPending();
}
}

absl::optional<ConnectivityGrid::PoolIterator> ConnectivityGrid::createNextPool() {
ConnectionPool::Instance* ConnectivityGrid::createNextPool() {
ASSERT(!deferred_deleting_);
// Pools are created by newStream, which should not be called during draining.
ASSERT(!draining_);
// Right now, only H3 and TCP are supported, so if there are 2 pools we're done.
if (pools_.size() == 2 || draining_) {
return absl::nullopt;
// If both pools exist we're done.
if ((http2_pool_ && http3_pool_) || draining_) {
return nullptr;
}

// HTTP/3 is hard-coded as higher priority, H2 as secondary.
ConnectionPool::InstancePtr pool;
if (pools_.empty()) {
pool = Http3::allocateConnPool(
if (!http3_pool_) {
http3_pool_ = Http3::allocateConnPool(
dispatcher_, random_generator_, host_, priority_, options_, transport_socket_options_,
state_, quic_stat_names_, *alternate_protocols_, scope_,
makeOptRefFromPtr<Http3::PoolConnectResultCallback>(this), quic_info_);
pools_.push_back(http3_pool_.get());
} else {
pool = std::make_unique<HttpConnPoolImplMixed>(dispatcher_, random_generator_, host_, priority_,
options_, transport_socket_options_, state_,
origin_, alternate_protocols_);
http2_pool_ = std::make_unique<HttpConnPoolImplMixed>(
dispatcher_, random_generator_, host_, priority_, options_, transport_socket_options_,
state_, origin_, alternate_protocols_);
pools_.push_back(http2_pool_.get());
}

setupPool(*pool);
pools_.push_back(std::move(pool));

return --pools_.end();
setupPool(*pools_.back());
return pools_.back();
}

void ConnectivityGrid::setupPool(ConnectionPool::Instance& pool) {
pool.addIdleCallback([this]() { onIdleReceived(); });
}

bool ConnectivityGrid::hasActiveConnections() const {
// This is O(n) but the function is constant and there are no plans for n > 8.
for (const auto& pool : pools_) {
if (pool->hasActiveConnections()) {
return true;
}
}
return false;
}

ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder& decoder,
ConnectionPool::Callbacks& callbacks,
const Instance::StreamOptions& options) {
Expand All @@ -311,10 +310,11 @@ ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder&
// New streams should not be created during draining.
ASSERT(!draining_);

if (pools_.empty()) {
createNextPool();
// Always start with the HTTP/3 pool if it exists.
ConnectionPool::Instance* pool = http3_pool_ ? http3_pool_.get() : http2_pool_.get();
if (!pool) {
pool = createNextPool();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having read through the rest of this PR, I think that we should eliminate createNextPool(). In this method, we have an explict if() down on line 320 which controls if we're doing h2 or h3. Can we simply inline the initialization of http2_pool_ and http3_pool_ there and eliminate createNextPool() since the next is a list operation which we really don't seem to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG. Would it be Ok to do this in a follow-up? I have another clean up PR Pending plus the actual underlying fix I want to do. added a TODO in the .h file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM!

}
PoolIterator pool = pools_.begin();
Instance::StreamOptions overriding_options = options;
bool delay_tcp_attempt = true;
if (shouldAttemptHttp3() && options.can_use_http3_) {
Expand All @@ -323,24 +323,23 @@ ConnectionPool::Cancellable* ConnectivityGrid::newStream(Http::ResponseDecoder&
delay_tcp_attempt = false;
}
} else {
// Before skipping to the next pool, make sure it has been created.
// Make sure the HTTP/2 pool is created.
createNextPool();
++pool;
pool = http2_pool_.get();
}
auto wrapped_callback =
std::make_unique<WrapperCallbacks>(*this, decoder, pool, callbacks, overriding_options);
ConnectionPool::Cancellable* ret = wrapped_callback.get();
std::make_unique<WrapperCallbacks>(*this, decoder, callbacks, overriding_options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOC, why move pool from the constructor to newStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a property of an attempt not of a wrapper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, heh. Sure, good point. Thanks!

WrapperCallbacks* ret = wrapped_callback.get();
LinkedList::moveIntoList(std::move(wrapped_callback), wrapped_callbacks_);
if (wrapped_callbacks_.front()->newStream() == StreamCreationResult::ImmediateResult) {
if (ret->newStream(*pool) == StreamCreationResult::ImmediateResult) {
// If newStream succeeds, return nullptr as the caller has received their
// callback and does not need a cancellable handle. At this point the
// WrappedCallbacks object has also been deleted.
return nullptr;
}
if (!delay_tcp_attempt) {
// Immediately start TCP attempt if HTTP/3 failed recently.
absl::optional<StreamCreationResult> result =
wrapped_callbacks_.front()->tryAnotherConnection();
absl::optional<StreamCreationResult> result = ret->tryAnotherConnection();
if (result.has_value() && result.value() == StreamCreationResult::ImmediateResult) {
// As above, if we have an immediate success, return nullptr.
return nullptr;
Expand All @@ -361,7 +360,6 @@ void ConnectivityGrid::drainConnections(Envoy::ConnectionPool::DrainBehavior dra
// as createNextPool fast-fails if `draining_` is true.
draining_ = true;
}

for (auto& pool : pools_) {
pool->drainConnections(drain_behavior);
}
Expand All @@ -373,16 +371,8 @@ bool ConnectivityGrid::maybePreconnect(float) {
return false; // Preconnect not yet supported for the grid.
}

absl::optional<ConnectivityGrid::PoolIterator> ConnectivityGrid::nextPool(PoolIterator pool_it) {
pool_it++;
if (pool_it != pools_.end()) {
return pool_it;
}
return createNextPool();
}

bool ConnectivityGrid::isPoolHttp3(const ConnectionPool::Instance& pool) {
return &pool == pools_.begin()->get();
return &pool == http3_pool_.get();
}

HttpServerPropertiesCache::Http3StatusTracker& ConnectivityGrid::getHttp3StatusTracker() const {
Expand All @@ -400,7 +390,6 @@ void ConnectivityGrid::markHttp3Broken() {
void ConnectivityGrid::markHttp3Confirmed() { getHttp3StatusTracker().markHttp3Confirmed(); }

bool ConnectivityGrid::isIdle() const {
// This is O(n) but the function is constant and there are no plans for n > 8.
bool idle = true;
for (const auto& pool : pools_) {
idle &= pool->isIdle();
Expand Down
51 changes: 28 additions & 23 deletions source/common/http/conn_pool_grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,22 @@
#include "source/common/quic/quic_stat_names.h"

#include "absl/container/flat_hash_map.h"
#include "absl/container/inlined_vector.h"

namespace Envoy {
namespace Http {

// An HTTP connection pool which will handle the connectivity grid of
// [WiFi / cellular] [ipv4 / ipv6] [QUIC / TCP].
// Currently only [QUIC / TCP are handled]
// An HTTP connection pool which handles HTTP/3 failing over to HTTP/2
//
// The ConnectivityGrid wraps an inner HTTP/3 and HTTP/2 pool.
//
// each newStream attempt to the grid creates a wrapper callback which attempts
// to hide from the caller that there may be serial or parallel newStream calls
// to the HTTP/3 and HTTP/2 pools.
//
// Any cancel call on the wrapper callbacks cancels either or both stream
// attempts to the wrapped pools. the wrapper callbacks will only pass up
// failure if both HTTP/3 and HTTP/2 attempts fail.
class ConnectivityGrid : public ConnectionPool::Instance,
public Http3::PoolConnectResultCallback,
protected Logger::Loggable<Logger::Id::pool> {
Expand All @@ -28,8 +37,6 @@ class ConnectivityGrid : public ConnectionPool::Instance,
StreamCreationPending,
};

using PoolIterator = std::list<ConnectionPool::InstancePtr>::iterator;

// This is a class which wraps a caller's connection pool callbacks to
// auto-retry pools in the case of connection failure.
//
Expand All @@ -38,14 +45,14 @@ class ConnectivityGrid : public ConnectionPool::Instance,
class WrapperCallbacks : public ConnectionPool::Cancellable,
public LinkedObject<WrapperCallbacks> {
public:
WrapperCallbacks(ConnectivityGrid& grid, Http::ResponseDecoder& decoder, PoolIterator pool_it,
WrapperCallbacks(ConnectivityGrid& grid, Http::ResponseDecoder& decoder,
ConnectionPool::Callbacks& callbacks, const Instance::StreamOptions& options);

// This holds state for a single connection attempt to a specific pool.
class ConnectionAttemptCallbacks : public ConnectionPool::Callbacks,
public LinkedObject<ConnectionAttemptCallbacks> {
public:
ConnectionAttemptCallbacks(WrapperCallbacks& parent, PoolIterator it);
ConnectionAttemptCallbacks(WrapperCallbacks& parent, ConnectionPool::Instance& pool);
~ConnectionAttemptCallbacks() override;

StreamCreationResult newStream();
Expand All @@ -58,15 +65,14 @@ class ConnectivityGrid : public ConnectionPool::Instance,
StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol) override;

ConnectionPool::Instance& pool() { return **pool_it_; }
ConnectionPool::Instance& pool() { return pool_; }

void cancel(Envoy::ConnectionPool::CancelPolicy cancel_policy);

private:
// A pointer back up to the parent.
WrapperCallbacks& parent_;
// The pool for this connection attempt.
const PoolIterator pool_it_;
ConnectionPool::Instance& pool_;
// The handle to cancel this connection attempt.
// This is owned by the pool which created it.
Cancellable* cancellable_{nullptr};
Expand All @@ -76,8 +82,8 @@ class ConnectivityGrid : public ConnectionPool::Instance,
// ConnectionPool::Cancellable
void cancel(Envoy::ConnectionPool::CancelPolicy cancel_policy) override;

// Attempt to create a new stream for pool().
StreamCreationResult newStream();
// Attempt to create a new stream for pool.
StreamCreationResult newStream(ConnectionPool::Instance& pool);

// Called on pool failure or timeout to kick off another connection attempt.
// Returns the StreamCreationResult if there is a failover pool and a
Expand Down Expand Up @@ -126,8 +132,8 @@ class ConnectivityGrid : public ConnectionPool::Instance,
ConnectionPool::Callbacks* inner_callbacks_;
// The timer which tracks when new connections should be attempted.
Event::TimerPtr next_attempt_timer_;
// The iterator to the last pool which had a connection attempt.
PoolIterator current_;
// Checks if http2 has been attempted.
bool has_attempted_http2_ = false;
// True if the HTTP/3 attempt failed.
bool http3_attempt_failed_{};
// True if the TCP attempt succeeded.
Expand Down Expand Up @@ -162,9 +168,6 @@ class ConnectivityGrid : public ConnectionPool::Instance,
bool maybePreconnect(float preconnect_ratio) override;
absl::string_view protocolDescription() const override { return "connection grid"; }

// Returns the next pool in the ordered priority list.
absl::optional<PoolIterator> nextPool(PoolIterator pool_it);

// Returns true if pool is the grid's HTTP/3 connection pool.
bool isPoolHttp3(const ConnectionPool::Instance& pool);

Expand Down Expand Up @@ -203,9 +206,9 @@ class ConnectivityGrid : public ConnectionPool::Instance,
// that specifies HTTP/3 and HTTP/3 is not broken.
bool shouldAttemptHttp3();

// Creates the next pool in the priority list, or absl::nullopt if all pools
// have been created.
virtual absl::optional<PoolIterator> createNextPool();
// Creates the next pool in the priority list, or nullptr if all pools have been created.
// TODO(alyssawilk) replace this now we have explicit pools.
virtual ConnectionPool::Instance* createNextPool();

// This batch of member variables are latched objects required for pool creation.
Event::Dispatcher& dispatcher_;
Expand All @@ -221,9 +224,11 @@ class ConnectivityGrid : public ConnectionPool::Instance,
// Tracks the callbacks to be called on drain completion.
std::list<Instance::IdleCb> idle_callbacks_;

// The connection pools to use to create new streams, ordered in the order of
// desired use.
std::list<ConnectionPool::InstancePtr> pools_;
// The connection pools to use to create new streams
ConnectionPool::InstancePtr http3_pool_;
ConnectionPool::InstancePtr http2_pool_;
// A convenience vector to allow taking actions on all pools.
absl::InlinedVector<ConnectionPool::Instance*, 2> pools_;

// Wrapped callbacks are stashed in the wrapped_callbacks_ for ownership.
std::list<WrapperCallbacksPtr> wrapped_callbacks_;
Expand Down
Loading
Loading