Skip to content

Commit

Permalink
generic conn pool: directly use thread local cluster (#14423)
Browse files Browse the repository at this point in the history
Further cleanup from previous PRs. In the happy path this further
eliminates map lookups and simplifies error handling.

Signed-off-by: Matt Klein <mklein@lyft.com>
  • Loading branch information
Matt Klein committed Dec 15, 2020
1 parent f752cff commit f19d025
Show file tree
Hide file tree
Showing 21 changed files with 76 additions and 88 deletions.
3 changes: 2 additions & 1 deletion include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace Envoy {
namespace Upstream {
class ClusterManager;
class LoadBalancerContext;
class ThreadLocalCluster;
} // namespace Upstream

namespace Router {
Expand Down Expand Up @@ -1306,7 +1307,7 @@ class GenericConnPoolFactory : public Envoy::Config::TypedFactory {
* @return may be null
*/
virtual GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const RouteEntry& route_entry,
absl::optional<Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const PURE;
Expand Down
6 changes: 3 additions & 3 deletions include/envoy/tcp/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Envoy {

namespace Upstream {
class LoadBalancerContext;
class ThreadLocalCluster;
} // namespace Upstream

namespace TcpProxy {
Expand Down Expand Up @@ -116,15 +117,14 @@ class GenericConnPoolFactory : public Envoy::Config::TypedFactory {
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;

/*
* @param cluster_name the name of the cluster to use
* @param cm the cluster manager to get the connection pool from
* @param thread_local_cluster the thread local cluster to use for conn pool creation.
* @param config the tunneling config, if doing connect tunneling.
* @param context the load balancing context for this connection.
* @param upstream_callbacks the callbacks to provide to the connection if successfully created.
* @return may be null if there is no cluster with the given name.
*/
virtual GenericConnPoolPtr
createGenericConnPool(const std::string& cluster_name, Upstream::ClusterManager& cm,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
const absl::optional<TunnelingConfig>& config,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) const PURE;
Expand Down
16 changes: 12 additions & 4 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,

transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
*callbacks_->streamInfo().filterState());
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool();
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);

if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
Expand Down Expand Up @@ -595,7 +595,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
return Http::FilterHeadersStatus::StopIteration;
}

std::unique_ptr<GenericConnPool> Filter::createConnPool() {
std::unique_ptr<GenericConnPool>
Filter::createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster) {
GenericConnPoolFactory* factory = nullptr;
if (cluster_->upstreamConfig().has_value()) {
factory = &Envoy::Config::Utility::getAndCheckFactory<GenericConnPoolFactory>(
Expand All @@ -607,7 +608,7 @@ std::unique_ptr<GenericConnPool> Filter::createConnPool() {
const bool should_tcp_proxy =
route_entry_->connectConfig().has_value() &&
downstream_headers_->getMethodValue() == Http::Headers::get().MethodValues.Connect;
return factory->createGenericConnPool(config_.cm_, should_tcp_proxy, *route_entry_,
return factory->createGenericConnPool(thread_local_cluster, should_tcp_proxy, *route_entry_,
callbacks_->streamInfo().protocol(), this);
}

Expand Down Expand Up @@ -1533,7 +1534,14 @@ void Filter::doRetry() {
ASSERT(pending_retries_ > 0);
pending_retries_--;

std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool();
// Clusters can technically get removed by CDS during a retry. Make sure it still exists.
const auto cluster = config_.cm_.getThreadLocalCluster(route_entry_->clusterName());
std::unique_ptr<GenericConnPool> generic_conn_pool;
if (cluster != nullptr) {
cluster_ = cluster->info();
generic_conn_pool = createConnPool(*cluster);
}

if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
cleanup();
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,8 @@ class Filter : Logger::Loggable<Logger::Id::router>,
Event::Dispatcher& dispatcher, TimeSource& time_source,
Upstream::ResourcePriority priority) PURE;

std::unique_ptr<GenericConnPool> createConnPool();
std::unique_ptr<GenericConnPool>
createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster);
UpstreamRequestPtr createUpstreamRequest();

void maybeDoShadowing();
Expand Down
8 changes: 4 additions & 4 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {
downstreamConnection()->streamInfo().filterState());
}

if (!maybeTunnel(*thread_local_cluster, cluster_name)) {
if (!maybeTunnel(*thread_local_cluster)) {
// Either cluster is unknown or there are no healthy hosts. tcpConnPool() increments
// cluster->stats().upstream_cx_none_healthy in the latter case.
getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream);
Expand All @@ -443,7 +443,7 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {
return Network::FilterStatus::StopIteration;
}

bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster, const std::string& cluster_name) {
bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster) {
GenericConnPoolFactory* factory = nullptr;
if (cluster.info()->upstreamConfig().has_value()) {
factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
Expand All @@ -456,8 +456,8 @@ bool Filter::maybeTunnel(Upstream::ThreadLocalCluster& cluster, const std::strin
return false;
}

generic_conn_pool_ = factory->createGenericConnPool(
cluster_name, cluster_manager_, config_->tunnelingConfig(), this, *upstream_callbacks_);
generic_conn_pool_ = factory->createGenericConnPool(cluster, config_->tunnelingConfig(), this,
*upstream_callbacks_);
if (generic_conn_pool_) {
connecting_ = true;
connect_attempts_++;
Expand Down
2 changes: 1 addition & 1 deletion source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class Filter : public Network::ReadFilter,

void initialize(Network::ReadFilterCallbacks& callbacks, bool set_connection_stats);
Network::FilterStatus initializeUpstreamConnection();
bool maybeTunnel(Upstream::ThreadLocalCluster& cluster, const std::string& cluster_name);
bool maybeTunnel(Upstream::ThreadLocalCluster& cluster);
void onConnectTimeout();
void onDownstreamEvent(Network::ConnectionEvent event);
void onUpstreamData(Buffer::Instance& data, bool end_stream);
Expand Down
23 changes: 5 additions & 18 deletions source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,11 @@ void HttpUpstream::doneWriting() {
}
}

TcpConnPool::TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
TcpConnPool::TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
: upstream_callbacks_(upstream_callbacks) {
// TODO(mattklein123): Pass thread local cluster into this function, removing an additional
// map lookup and moving the error handling closer to the source (where it is likely already
// done).
const auto thread_local_cluster = cluster_manager.getThreadLocalCluster(cluster_name);
if (thread_local_cluster != nullptr) {
conn_pool_ = thread_local_cluster->tcpConnPool(Upstream::ResourcePriority::Default, context);
}
conn_pool_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context);
}

TcpConnPool::~TcpConnPool() {
Expand Down Expand Up @@ -185,20 +179,13 @@ void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data
latched_data->connection().streamInfo().downstreamSslConnection());
}

HttpConnPool::HttpConnPool(const std::string& cluster_name,
Upstream::ClusterManager& cluster_manager,
HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context, const TunnelingConfig& config,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
Http::CodecClient::Type type)
: hostname_(config.hostname()), type_(type), upstream_callbacks_(upstream_callbacks) {
// TODO(mattklein123): Pass thread local cluster into this function, removing an additional
// map lookup and moving the error handling closer to the source (where it is likely already
// done).
const auto thread_local_cluster = cluster_manager.getThreadLocalCluster(cluster_name);
if (thread_local_cluster != nullptr) {
conn_pool_ = thread_local_cluster->httpConnPool(Upstream::ResourcePriority::Default,
absl::nullopt, context);
}
conn_pool_ = thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, absl::nullopt,
context);
}

HttpConnPool::~HttpConnPool() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/tcp_proxy/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace TcpProxy {

class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callbacks {
public:
TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks);
~TcpConnPool() override;
Expand Down Expand Up @@ -44,7 +44,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba
using TunnelingConfig =
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;

HttpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context, const TunnelingConfig& config,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
Http::CodecClient::Type type);
Expand Down
11 changes: 6 additions & 5 deletions source/extensions/upstreams/http/generic/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ namespace Http {
namespace Generic {

Router::GenericConnPoolPtr GenericGenericConnPoolFactory::createGenericConnPool(
Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const {
if (is_connect) {
auto ret = std::make_unique<Upstreams::Http::Tcp::TcpConnPool>(cm, is_connect, route_entry,
downstream_protocol, ctx);
auto ret = std::make_unique<Upstreams::Http::Tcp::TcpConnPool>(
thread_local_cluster, is_connect, route_entry, downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}
auto ret = std::make_unique<Upstreams::Http::Http::HttpConnPool>(cm, is_connect, route_entry,
downstream_protocol, ctx);
auto ret = std::make_unique<Upstreams::Http::Http::HttpConnPool>(
thread_local_cluster, is_connect, route_entry, downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/http/generic/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class GenericGenericConnPoolFactory : public Router::GenericConnPoolFactory {
std::string name() const override { return "envoy.filters.connection_pools.http.generic"; }
std::string category() const override { return "envoy.upstreams"; }
Router::GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const override;
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/upstreams/http/http/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ namespace Http {
namespace Http {

Router::GenericConnPoolPtr HttpGenericConnPoolFactory::createGenericConnPool(
Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const {
auto ret = std::make_unique<HttpConnPool>(cm, is_connect, route_entry, downstream_protocol, ctx);
auto ret = std::make_unique<HttpConnPool>(thread_local_cluster, is_connect, route_entry,
downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/http/http/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HttpGenericConnPoolFactory : public Router::GenericConnPoolFactory {
std::string name() const override { return "envoy.filters.connection_pools.http.http"; }
std::string category() const override { return "envoy.upstreams"; }
Router::GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const override;
Expand Down
13 changes: 4 additions & 9 deletions source/extensions/upstreams/http/http/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@ namespace Http {
class HttpConnPool : public Router::GenericConnPool, public Envoy::Http::ConnectionPool::Callbacks {
public:
// GenericConnPool
HttpConnPool(Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) {
ASSERT(!is_connect);
// TODO(mattklein123): Pass thread local cluster into this function, removing an additional
// map lookup and moving the error handling closer to the source (where it is likely already
// done).
const auto thread_local_cluster = cm.getThreadLocalCluster(route_entry.clusterName());
if (thread_local_cluster != nullptr) {
conn_pool_ =
thread_local_cluster->httpConnPool(route_entry.priority(), downstream_protocol, ctx);
}
conn_pool_ =
thread_local_cluster.httpConnPool(route_entry.priority(), downstream_protocol, ctx);
}
~HttpConnPool() override {
ASSERT(conn_pool_stream_handle_ == nullptr, "conn_pool_stream_handle not null");
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/upstreams/http/tcp/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ namespace Http {
namespace Tcp {

Router::GenericConnPoolPtr TcpGenericConnPoolFactory::createGenericConnPool(
Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const {
auto ret = std::make_unique<TcpConnPool>(cm, is_connect, route_entry, downstream_protocol, ctx);
auto ret = std::make_unique<TcpConnPool>(thread_local_cluster, is_connect, route_entry,
downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/http/tcp/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TcpGenericConnPoolFactory : public Router::GenericConnPoolFactory {
std::string name() const override { return "envoy.filters.connection_pools.http.tcp"; }
std::string category() const override { return "envoy.upstreams"; }
Router::GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
createGenericConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const override;
Expand Down
13 changes: 4 additions & 9 deletions source/extensions/upstreams/http/tcp/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,11 @@ namespace Tcp {

class TcpConnPool : public Router::GenericConnPool, public Envoy::Tcp::ConnectionPool::Callbacks {
public:
TcpConnPool(Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol>, Upstream::LoadBalancerContext* ctx) {
TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, bool is_connect,
const Router::RouteEntry& route_entry, absl::optional<Envoy::Http::Protocol>,
Upstream::LoadBalancerContext* ctx) {
ASSERT(is_connect);
// TODO(mattklein123): Pass thread local cluster into this function, removing an additional
// map lookup and moving the error handling closer to the source (where it is likely already
// done).
const auto thread_local_cluster = cm.getThreadLocalCluster(route_entry.clusterName());
if (thread_local_cluster != nullptr) {
conn_pool_ = thread_local_cluster->tcpConnPool(Upstream::ResourcePriority::Default, ctx);
}
conn_pool_ = thread_local_cluster.tcpConnPool(route_entry.priority(), ctx);
}
void newStream(Router::GenericConnectionPoolCallbacks* callbacks) override {
callbacks_ = callbacks;
Expand Down
19 changes: 8 additions & 11 deletions source/extensions/upstreams/tcp/generic/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,20 @@ namespace Tcp {
namespace Generic {

TcpProxy::GenericConnPoolPtr GenericConnPoolFactory::createGenericConnPool(
const std::string& cluster_name, Upstream::ClusterManager& cluster_manager,
Upstream::ThreadLocalCluster& thread_local_cluster,
const absl::optional<TunnelingConfig>& config, Upstream::LoadBalancerContext* context,
Envoy::Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) const {
if (config.has_value()) {
auto* cluster = cluster_manager.getThreadLocalCluster(cluster_name);
if (!cluster) {
return nullptr;
}
auto pool_type = ((cluster->info()->features() & Upstream::ClusterInfo::Features::HTTP2) != 0)
? Http::CodecClient::Type::HTTP2
: Http::CodecClient::Type::HTTP1;
auto pool_type =
((thread_local_cluster.info()->features() & Upstream::ClusterInfo::Features::HTTP2) != 0)
? Http::CodecClient::Type::HTTP2
: Http::CodecClient::Type::HTTP1;
auto ret = std::make_unique<TcpProxy::HttpConnPool>(
cluster_name, cluster_manager, context, config.value(), upstream_callbacks, pool_type);
thread_local_cluster, context, config.value(), upstream_callbacks, pool_type);
return (ret->valid() ? std::move(ret) : nullptr);
}
auto ret = std::make_unique<TcpProxy::TcpConnPool>(cluster_name, cluster_manager, context,
upstream_callbacks);
auto ret =
std::make_unique<TcpProxy::TcpConnPool>(thread_local_cluster, context, upstream_callbacks);
return (ret->valid() ? std::move(ret) : nullptr);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/tcp/generic/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class GenericConnPoolFactory : public TcpProxy::GenericConnPoolFactory {
std::string name() const override { return "envoy.filters.connection_pools.tcp.generic"; }
std::string category() const override { return "envoy.upstreams"; }
TcpProxy::GenericConnPoolPtr createGenericConnPool(
const std::string& cluster_name, Upstream::ClusterManager& cm,
Upstream::ThreadLocalCluster& thread_local_cluster,
const absl::optional<TunnelingConfig>& config, Upstream::LoadBalancerContext* context,
Envoy::Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) const override;

Expand Down

0 comments on commit f19d025

Please sign in to comment.