diff --git a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto index b3dd0f63191c2..e6d0737829277 100644 --- a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto +++ b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto @@ -81,7 +81,7 @@ enum ProxyProtocolTlvMergePolicy { APPEND_IF_EXISTS_OR_ADD = 2; } -// [#next-free-field: 24] +// [#next-free-field: 25] message TcpProxy { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.network.tcp_proxy.v2.TcpProxy"; @@ -397,4 +397,11 @@ message TcpProxy { // Use this carefully with server-first protocols. The upstream may send data before // receiving anything from downstream, which could fill the early data buffer. google.protobuf.UInt32Value max_early_data_bytes = 22 [(validate.rules).uint32 = {lte: 1048576}]; + + // If set to ``true``, the TCP proxy checks if the downstream connection was marked as drained + // after each read or write. When drain close is requested for the listener's traffic direction, + // the downstream connection is closed with ``FlushWrite``. + // + // This is disabled by default for backward compatibility. + google.protobuf.BoolValue check_drain_close = 24; } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index caff0a0e7376a..e354271d2ed21 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -72,6 +72,12 @@ removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` new_features: +- area: tcp_proxy + change: | + Added :ref:`check_drain_close + ` to the TCP proxy + filter to close downstream connections with ``FlushWrite`` when the drain manager requests drain close during + downstream read or write handling. - area: stat_sinks change: | Added :ref:`max_data_points_per_request diff --git a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst index afbf63c685808..662f324bb06b2 100644 --- a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst +++ b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst @@ -210,6 +210,7 @@ The downstream statistics are rooted at *tcp..* with the following downstream_cx_total, Counter, Total number of connections handled by the filter downstream_cx_no_route, Counter, Number of connections for which no matching route was found or the cluster for the route was not found + downstream_cx_drain_close, Counter, Total number of connections closed due to drain close downstream_cx_tx_bytes_total, Counter, Total bytes written to the downstream connection downstream_cx_tx_bytes_buffered, Gauge, Total bytes currently buffered to the downstream connection downstream_cx_rx_bytes_total, Counter, Total bytes read from the downstream connection diff --git a/envoy/stream_info/stream_info.h b/envoy/stream_info/stream_info.h index 5675997e9def8..09c2d077740c5 100644 --- a/envoy/stream_info/stream_info.h +++ b/envoy/stream_info/stream_info.h @@ -283,6 +283,7 @@ struct LocalCloseReasonValues { const std::string TransportSocketTimeout = "transport_socket_timeout"; const std::string TriggeredDelayedCloseTimeout = "triggered_delayed_close_timeout"; const std::string TcpProxyInitializationFailure = "tcp_initializion_failure:"; + const std::string TcpProxyDrainClose = "tcp_proxy_drain_close"; const std::string TcpSessionIdleTimeout = "tcp_session_idle_timeout"; const std::string MaxConnectionDurationReached = "max_connection_duration_reached"; const std::string ClosingUpstreamTcpDueToDownstreamRemoteClose = diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index edc244c73acae..e07abe5d9808f 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -220,7 +220,13 @@ Config::Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProx upstream_drain_manager_slot_(context.serverFactoryContext().threadLocal().allocateSlot()), shared_config_(std::make_shared(config, context)), random_generator_(context.serverFactoryContext().api().randomGenerator()), - regex_engine_(context.serverFactoryContext().regexEngine()) { + regex_engine_(context.serverFactoryContext().regexEngine()), + drain_decision_(context.drainDecision()), + drain_close_scope_(context.listenerInfo().direction() == + envoy::config::core::v3::TrafficDirection::INBOUND + ? Network::DrainDirection::InboundOnly + : Network::DrainDirection::All), + check_drain_close_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, check_drain_close, false)) { upstream_drain_manager_slot_->set([](Event::Dispatcher&) { ThreadLocal::ThreadLocalObjectSharedPtr drain_manager = std::make_shared(); @@ -1112,6 +1118,7 @@ Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) { // Before there is an upstream the connection should be readDisabled. If the upstream is // destroyed, there should be no further reads as well. ASSERT(0 == data.length()); + maybeCloseDownstreamForDrainClose(); return Network::FilterStatus::StopIteration; } @@ -1248,6 +1255,20 @@ void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) { read_callbacks_->connection().write(data, end_stream); ASSERT(0 == data.length()); resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive? + maybeCloseDownstreamForDrainClose(); +} + +void Filter::maybeCloseDownstreamForDrainClose() { + if (!config_->checkDrainClose() || downstream_closed_ || + read_callbacks_->connection().state() != Network::Connection::State::Open || + !config_->drainDecision().drainClose(config_->drainCloseScope())) { + return; + } + + ENVOY_CONN_LOG(debug, "drain closing tcp_proxy connection", read_callbacks_->connection()); + config_->stats().downstream_cx_drain_close_.inc(); + read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite, + StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose); } void Filter::onUpstreamEvent(Network::ConnectionEvent event) { diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index f6c82791b29a3..546f0c2ec5aba 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -59,6 +59,7 @@ constexpr absl::string_view ReceiveBeforeConnectKey = "envoy.tcp_proxy.receive_b * All tcp proxy stats. @see stats_macros.h */ #define ALL_TCP_PROXY_STATS(COUNTER, GAUGE) \ + COUNTER(downstream_cx_drain_close) \ COUNTER(downstream_cx_no_route) \ COUNTER(downstream_cx_rx_bytes_total) \ COUNTER(downstream_cx_total) \ @@ -377,6 +378,9 @@ class Config { } const absl::optional& maxEarlyDataBytes() const { return max_early_data_bytes_; } + bool checkDrainClose() const { return check_drain_close_; } + const Network::DrainDecision& drainDecision() const { return drain_decision_; } + Network::DrainDirection drainCloseScope() const { return drain_close_scope_; } private: struct SimpleRouteImpl : public Route { @@ -433,6 +437,9 @@ class Config { envoy::extensions::filters::network::tcp_proxy::v3::UpstreamConnectMode upstream_connect_mode_{ envoy::extensions::filters::network::tcp_proxy::v3::IMMEDIATE}; absl::optional max_early_data_bytes_; + const Network::DrainDecision& drain_decision_; + const Network::DrainDirection drain_close_scope_{}; + const bool check_drain_close_{false}; }; using ConfigSharedPtr = std::shared_ptr; @@ -690,6 +697,7 @@ class Filter : public Network::ReadFilter, void onDownstreamEvent(Network::ConnectionEvent event); void onUpstreamData(Buffer::Instance& data, bool end_stream); void onUpstreamEvent(Network::ConnectionEvent event); + void maybeCloseDownstreamForDrainClose(); void onUpstreamConnection(); void onIdleTimeout(); void resetIdleTimer(); diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 3a26636f44bf1..9ab2ebd03a053 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -265,6 +265,74 @@ TEST_P(TcpProxyTest, HalfCloseProxy) { upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); } +TEST_P(TcpProxyTest, DrainCloseIgnoredWhenFlagDisabled) { + setup(1); + + EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All)).Times(0); + EXPECT_CALL(filter_callbacks_.connection_, close(_, _)).Times(0); + + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); + filter_->onData(buffer, false); +} + +TEST_P(TcpProxyTest, DrainCloseAfterDownstreamRead) { + auto config = defaultConfig(); + config.mutable_check_drain_close()->set_value(true); + setup(1, config); + + EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All)) + .WillOnce(Return(true)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWrite, + StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose)); + + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); + filter_->onData(buffer, false); +} + +TEST_P(TcpProxyTest, DrainCloseUsesInboundOnlyScopeForInboundListeners) { + auto config = defaultConfig(); + config.mutable_check_drain_close()->set_value(true); + EXPECT_CALL(factory_context_.listener_info_, direction()) + .WillRepeatedly(Return(envoy::config::core::v3::TrafficDirection::INBOUND)); + setup(1, config); + + EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::InboundOnly)) + .WillOnce(Return(true)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWrite, + StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose)); + + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); + filter_->onData(buffer, false); +} + +TEST_P(TcpProxyTest, DrainCloseAfterDownstreamWrite) { + auto config = defaultConfig(); + config.mutable_check_drain_close()->set_value(true); + setup(1, config); + + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("world"); + EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All)) + .WillOnce(Return(true)); + EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&buffer), false)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWrite, + StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose)); + upstream_callbacks_->onUpstreamData(buffer, false); +} + // Test with an explicitly configured upstream. TEST_P(TcpProxyTest, ExplicitFactory) { // Explicitly configure an HTTP upstream, to test factory creation. diff --git a/test/extensions/filters/network/tcp_proxy/config_test.cc b/test/extensions/filters/network/tcp_proxy/config_test.cc index e42a52e9c86a6..32b3114ab1570 100644 --- a/test/extensions/filters/network/tcp_proxy/config_test.cc +++ b/test/extensions/filters/network/tcp_proxy/config_test.cc @@ -75,6 +75,19 @@ TEST(ConfigTest, ConfigTest) { cb(connection); } +TEST(ConfigTest, ConfigWithDrainCloseCheck) { + NiceMock context; + ConfigFactory factory; + envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy config = + *dynamic_cast( + factory.createEmptyConfigProto().get()); + config.set_stat_prefix("prefix"); + config.set_cluster("cluster"); + config.mutable_check_drain_close()->set_value(true); + + EXPECT_TRUE(factory.createFilterFactoryFromProto(config, context).ok()); +} + TEST(ConfigTest, TunnelingConfigWithFormatters) { Envoy::Formatter::TestCommandFactory test_factory; Registry::InjectFactory register_factory(test_factory); diff --git a/test/mocks/server/factory_context.cc b/test/mocks/server/factory_context.cc index c138d0f176a14..ca4ffda9597da 100644 --- a/test/mocks/server/factory_context.cc +++ b/test/mocks/server/factory_context.cc @@ -11,6 +11,7 @@ namespace Envoy { namespace Server { namespace Configuration { +using ::testing::Return; using ::testing::ReturnRef; MockFactoryContext::MockFactoryContext() { @@ -22,6 +23,9 @@ MockFactoryContext::MockFactoryContext() { ON_CALL(*this, drainDecision()).WillByDefault(ReturnRef(drain_manager_)); ON_CALL(*this, listenerScope()).WillByDefault(ReturnRef(*listener_store_.rootScope())); + ON_CALL(*this, listenerInfo()).WillByDefault(ReturnRef(listener_info_)); + ON_CALL(listener_info_, direction()) + .WillByDefault(Return(envoy::config::core::v3::TrafficDirection::UNSPECIFIED)); } MockFactoryContext::~MockFactoryContext() = default; diff --git a/test/mocks/server/factory_context.h b/test/mocks/server/factory_context.h index 9fe92ab4d0379..d163510ed89ac 100644 --- a/test/mocks/server/factory_context.h +++ b/test/mocks/server/factory_context.h @@ -5,6 +5,7 @@ #include "source/common/router/context_impl.h" #include "source/common/tls/context_manager_impl.h" +#include "test/mocks/network/mocks.h" #include "test/mocks/server/admin.h" #include "test/mocks/server/drain_manager.h" #include "test/mocks/server/instance.h" @@ -39,6 +40,7 @@ class MockFactoryContext : public virtual ListenerFactoryContext { Stats::IsolatedStoreImpl listener_store_; Stats::Scope& listener_scope_{*listener_store_.rootScope()}; testing::NiceMock drain_manager_; + testing::NiceMock listener_info_; }; class MockUpstreamFactoryContext : public UpstreamFactoryContext {