From cc90be1702c91119b2cec8d523bd7fc1f1ca5f4e Mon Sep 17 00:00:00 2001 From: wbpcode/wangbaiping Date: Wed, 22 Apr 2026 02:17:06 +0000 Subject: [PATCH 1/6] tcp proxy: add new option to let tcp proxy check the drain close status Signed-off-by: wbpcode/wangbaiping --- .../network/tcp_proxy/v3/tcp_proxy.proto | 9 ++- changelogs/current.yaml | 6 ++ envoy/stream_info/stream_info.h | 1 + source/common/tcp_proxy/tcp_proxy.cc | 23 ++++++- source/common/tcp_proxy/tcp_proxy.h | 8 +++ test/common/tcp_proxy/tcp_proxy_test.cc | 68 +++++++++++++++++++ .../filters/network/tcp_proxy/config_test.cc | 13 ++++ test/mocks/server/factory_context.cc | 4 ++ test/mocks/server/factory_context.h | 2 + 9 files changed, 132 insertions(+), 2 deletions(-) diff --git a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto index 280baff908c4a..40939c8e75327 100644 --- a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto +++ b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto @@ -80,7 +80,7 @@ enum ProxyProtocolTlvMergePolicy { APPEND_IF_EXISTS_OR_ADD = 2; } -// [#next-free-field: 24] +// [#next-free-field: 25] message TcpProxy { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.network.tcp_proxy.v2.TcpProxy"; @@ -390,4 +390,11 @@ message TcpProxy { // Use this carefully with server-first protocols. The upstream may send data before // receiving anything from downstream, which could fill the early data buffer. google.protobuf.UInt32Value max_early_data_bytes = 22 [(validate.rules).uint32 = {lte: 1048576}]; + + // If set to ``true``, the TCP proxy checks the factory context's drain decision after each + // downstream read and downstream write. When drain close is requested for the listener's + // traffic direction, the downstream connection is closed with ``FlushWrite``. + // + // This is disabled by default for backward compatibility. + bool check_drain_close = 24; } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index f9751bc4b7263..adbacc51d2639 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -413,6 +413,12 @@ removed_config_or_runtime: and legacy code path. new_features: +- area: tcp_proxy + change: | + Added :ref:`check_drain_close + ` to the TCP proxy + filter to close downstream connections with ``FlushWrite`` when the drain manager requests drain close during + downstream read or write handling. - area: ext_authz change: | Added :ref:`shadow_mode diff --git a/envoy/stream_info/stream_info.h b/envoy/stream_info/stream_info.h index 5675997e9def8..09c2d077740c5 100644 --- a/envoy/stream_info/stream_info.h +++ b/envoy/stream_info/stream_info.h @@ -283,6 +283,7 @@ struct LocalCloseReasonValues { const std::string TransportSocketTimeout = "transport_socket_timeout"; const std::string TriggeredDelayedCloseTimeout = "triggered_delayed_close_timeout"; const std::string TcpProxyInitializationFailure = "tcp_initializion_failure:"; + const std::string TcpProxyDrainClose = "tcp_proxy_drain_close"; const std::string TcpSessionIdleTimeout = "tcp_session_idle_timeout"; const std::string MaxConnectionDurationReached = "max_connection_duration_reached"; const std::string ClosingUpstreamTcpDueToDownstreamRemoteClose = diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 9b4597b1fbc0d..c72d922b35d5b 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -218,7 +218,13 @@ Config::Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProx upstream_drain_manager_slot_(context.serverFactoryContext().threadLocal().allocateSlot()), shared_config_(std::make_shared(config, context)), random_generator_(context.serverFactoryContext().api().randomGenerator()), - regex_engine_(context.serverFactoryContext().regexEngine()) { + regex_engine_(context.serverFactoryContext().regexEngine()), + drain_decision_(context.drainDecision()), + drain_close_scope_(context.listenerInfo().direction() == + envoy::config::core::v3::TrafficDirection::INBOUND + ? Network::DrainDirection::InboundOnly + : Network::DrainDirection::All), + check_drain_close_(config.check_drain_close()) { upstream_drain_manager_slot_->set([](Event::Dispatcher&) { ThreadLocal::ThreadLocalObjectSharedPtr drain_manager = std::make_shared(); @@ -1092,6 +1098,7 @@ Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) { // Before there is an upstream the connection should be readDisabled. If the upstream is // destroyed, there should be no further reads as well. ASSERT(0 == data.length()); + maybeCloseDownstreamForDrainClose(); return Network::FilterStatus::StopIteration; } @@ -1228,6 +1235,20 @@ void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) { read_callbacks_->connection().write(data, end_stream); ASSERT(0 == data.length()); resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive? + maybeCloseDownstreamForDrainClose(); +} + +void Filter::maybeCloseDownstreamForDrainClose() { + if (!config_->checkDrainClose() || downstream_closed_ || + read_callbacks_->connection().state() != Network::Connection::State::Open || + !config_->drainDecision().drainClose(config_->drainCloseScope())) { + return; + } + + ENVOY_CONN_LOG(debug, "drain closing tcp_proxy connection", read_callbacks_->connection()); + config_->stats().downstream_cx_drain_close_.inc(); + read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite, + StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose); } void Filter::onUpstreamEvent(Network::ConnectionEvent event) { diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index f6c82791b29a3..546f0c2ec5aba 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -59,6 +59,7 @@ constexpr absl::string_view ReceiveBeforeConnectKey = "envoy.tcp_proxy.receive_b * All tcp proxy stats. @see stats_macros.h */ #define ALL_TCP_PROXY_STATS(COUNTER, GAUGE) \ + COUNTER(downstream_cx_drain_close) \ COUNTER(downstream_cx_no_route) \ COUNTER(downstream_cx_rx_bytes_total) \ COUNTER(downstream_cx_total) \ @@ -377,6 +378,9 @@ class Config { } const absl::optional& maxEarlyDataBytes() const { return max_early_data_bytes_; } + bool checkDrainClose() const { return check_drain_close_; } + const Network::DrainDecision& drainDecision() const { return drain_decision_; } + Network::DrainDirection drainCloseScope() const { return drain_close_scope_; } private: struct SimpleRouteImpl : public Route { @@ -433,6 +437,9 @@ class Config { envoy::extensions::filters::network::tcp_proxy::v3::UpstreamConnectMode upstream_connect_mode_{ envoy::extensions::filters::network::tcp_proxy::v3::IMMEDIATE}; absl::optional max_early_data_bytes_; + const Network::DrainDecision& drain_decision_; + const Network::DrainDirection drain_close_scope_{}; + const bool check_drain_close_{false}; }; using ConfigSharedPtr = std::shared_ptr; @@ -690,6 +697,7 @@ class Filter : public Network::ReadFilter, void onDownstreamEvent(Network::ConnectionEvent event); void onUpstreamData(Buffer::Instance& data, bool end_stream); void onUpstreamEvent(Network::ConnectionEvent event); + void maybeCloseDownstreamForDrainClose(); void onUpstreamConnection(); void onIdleTimeout(); void resetIdleTimer(); diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index e11fdfbef36bd..8cc11cf303790 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -266,6 +266,74 @@ TEST_P(TcpProxyTest, HalfCloseProxy) { upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose); } +TEST_P(TcpProxyTest, DrainCloseIgnoredWhenFlagDisabled) { + setup(1); + + EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All)).Times(0); + EXPECT_CALL(filter_callbacks_.connection_, close(_, _)).Times(0); + + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); + filter_->onData(buffer, false); +} + +TEST_P(TcpProxyTest, DrainCloseAfterDownstreamRead) { + auto config = defaultConfig(); + config.set_check_drain_close(true); + setup(1, config); + + EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All)) + .WillOnce(Return(true)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWrite, + StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose)); + + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); + filter_->onData(buffer, false); +} + +TEST_P(TcpProxyTest, DrainCloseUsesInboundOnlyScopeForInboundListeners) { + auto config = defaultConfig(); + config.set_check_drain_close(true); + EXPECT_CALL(factory_context_.listener_info_, direction()) + .WillRepeatedly(Return(envoy::config::core::v3::TrafficDirection::INBOUND)); + setup(1, config); + + EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::InboundOnly)) + .WillOnce(Return(true)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWrite, + StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose)); + + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false)); + filter_->onData(buffer, false); +} + +TEST_P(TcpProxyTest, DrainCloseAfterDownstreamWrite) { + auto config = defaultConfig(); + config.set_check_drain_close(true); + setup(1, config); + + raiseEventUpstreamConnected(0); + + Buffer::OwnedImpl buffer("world"); + EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All)) + .WillOnce(Return(true)); + EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&buffer), false)); + EXPECT_CALL(filter_callbacks_.connection_, + close(Network::ConnectionCloseType::FlushWrite, + StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose)); + upstream_callbacks_->onUpstreamData(buffer, false); +} + // Test with an explicitly configured upstream. TEST_P(TcpProxyTest, ExplicitFactory) { // Explicitly configure an HTTP upstream, to test factory creation. diff --git a/test/extensions/filters/network/tcp_proxy/config_test.cc b/test/extensions/filters/network/tcp_proxy/config_test.cc index f1d52b1921a49..0be1ceb3a82c8 100644 --- a/test/extensions/filters/network/tcp_proxy/config_test.cc +++ b/test/extensions/filters/network/tcp_proxy/config_test.cc @@ -73,6 +73,19 @@ TEST(ConfigTest, ConfigTest) { cb(connection); } +TEST(ConfigTest, ConfigWithDrainCloseCheck) { + NiceMock context; + ConfigFactory factory; + envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy config = + *dynamic_cast( + factory.createEmptyConfigProto().get()); + config.set_stat_prefix("prefix"); + config.set_cluster("cluster"); + config.set_check_drain_close(true); + + EXPECT_TRUE(factory.createFilterFactoryFromProto(config, context).ok()); +} + } // namespace TcpProxy } // namespace NetworkFilters } // namespace Extensions diff --git a/test/mocks/server/factory_context.cc b/test/mocks/server/factory_context.cc index c138d0f176a14..ca4ffda9597da 100644 --- a/test/mocks/server/factory_context.cc +++ b/test/mocks/server/factory_context.cc @@ -11,6 +11,7 @@ namespace Envoy { namespace Server { namespace Configuration { +using ::testing::Return; using ::testing::ReturnRef; MockFactoryContext::MockFactoryContext() { @@ -22,6 +23,9 @@ MockFactoryContext::MockFactoryContext() { ON_CALL(*this, drainDecision()).WillByDefault(ReturnRef(drain_manager_)); ON_CALL(*this, listenerScope()).WillByDefault(ReturnRef(*listener_store_.rootScope())); + ON_CALL(*this, listenerInfo()).WillByDefault(ReturnRef(listener_info_)); + ON_CALL(listener_info_, direction()) + .WillByDefault(Return(envoy::config::core::v3::TrafficDirection::UNSPECIFIED)); } MockFactoryContext::~MockFactoryContext() = default; diff --git a/test/mocks/server/factory_context.h b/test/mocks/server/factory_context.h index 9fe92ab4d0379..d163510ed89ac 100644 --- a/test/mocks/server/factory_context.h +++ b/test/mocks/server/factory_context.h @@ -5,6 +5,7 @@ #include "source/common/router/context_impl.h" #include "source/common/tls/context_manager_impl.h" +#include "test/mocks/network/mocks.h" #include "test/mocks/server/admin.h" #include "test/mocks/server/drain_manager.h" #include "test/mocks/server/instance.h" @@ -39,6 +40,7 @@ class MockFactoryContext : public virtual ListenerFactoryContext { Stats::IsolatedStoreImpl listener_store_; Stats::Scope& listener_scope_{*listener_store_.rootScope()}; testing::NiceMock drain_manager_; + testing::NiceMock listener_info_; }; class MockUpstreamFactoryContext : public UpstreamFactoryContext { From 89681fbfc892d8d77ed64fe54771c11f682352da Mon Sep 17 00:00:00 2001 From: wbpcode/wangbaiping Date: Wed, 29 Apr 2026 02:55:42 +0000 Subject: [PATCH 2/6] update comment Signed-off-by: wbpcode/wangbaiping --- .../network/tcp_proxy/v3/tcp_proxy.proto | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto index 40939c8e75327..6af9f55ceceb9 100644 --- a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto +++ b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto @@ -83,17 +83,17 @@ enum ProxyProtocolTlvMergePolicy { // [#next-free-field: 25] message TcpProxy { option (udpa.annotations.versioning).previous_message_type = - "envoy.config.filter.network.tcp_proxy.v2.TcpProxy"; + "envoy.config.filter.network.tcp_proxy.v2.TcpProxy"; // Allows specification of multiple upstream clusters along with weights indicating the percentage of // traffic forwarded to each cluster. The cluster selection is based on these weights. message WeightedCluster { option (udpa.annotations.versioning).previous_message_type = - "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.WeightedCluster"; + "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.WeightedCluster"; message ClusterWeight { option (udpa.annotations.versioning).previous_message_type = - "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.WeightedCluster.ClusterWeight"; + "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.WeightedCluster.ClusterWeight"; // Name of the upstream cluster. string name = 1 [(validate.rules).string = {min_len: 1}]; @@ -122,7 +122,7 @@ message TcpProxy { // [#next-free-field: 10] message TunnelingConfig { option (udpa.annotations.versioning).previous_message_type = - "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.TunnelingConfig"; + "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.TunnelingConfig"; // The hostname to send in the synthesized CONNECT headers to the upstream proxy. // This field evaluates command operators if present; otherwise, the value is used as-is. @@ -154,7 +154,7 @@ message TcpProxy { // // Neither ``:``-prefixed pseudo-headers like ``:path`` nor the ``host`` header can be overridden. repeated config.core.v3.HeaderValueOption headers_to_add = 3 - [(validate.rules).repeated = {max_items: 1000}]; + [(validate.rules).repeated = {max_items: 1000}]; // Save response headers to the downstream connection's filter state for consumption // by network filters. The filter state key is ``envoy.tcp_proxy.propagate_response_headers``. @@ -225,7 +225,7 @@ message TcpProxy { // specified interval. // The interval must be at least 1ms. google.protobuf.Duration access_log_flush_interval = 1 - [(validate.rules).duration = {gte {nanos: 1000000}}]; + [(validate.rules).duration = {gte {nanos: 1000000}}]; // If set to ``true``, the access log is flushed when the TCP proxy successfully establishes a // connection with the upstream. If the connection fails, the access log is not flushed. @@ -312,7 +312,7 @@ message TcpProxy { // established. If not set, there is no maximum duration. When ``max_downstream_connection_duration`` is // reached, the connection is closed. The duration must be at least ``1ms``. google.protobuf.Duration max_downstream_connection_duration = 13 - [(validate.rules).duration = {gte {nanos: 1000000}}]; + [(validate.rules).duration = {gte {nanos: 1000000}}]; // Percentage-based jitter for ``max_downstream_connection_duration``. The jitter increases the // ``max_downstream_connection_duration`` by a random duration up to the provided percentage. @@ -343,7 +343,7 @@ message TcpProxy { // :ref:`flush_access_log_on_connected // `. bool flush_access_log_on_connected = 16 - [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]; + [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]; // Additional access log options for the TCP proxy. TcpAccessLogOptions access_log_options = 17; @@ -363,7 +363,7 @@ message TcpProxy { // :ref:`ProxyProtocolTlvMergePolicy // `. ProxyProtocolTlvMergePolicy proxy_protocol_tlv_merge_policy = 23 - [(validate.rules).enum = {defined_only: true}]; + [(validate.rules).enum = {defined_only: true}]; // Specifies when to establish the upstream connection. // @@ -391,9 +391,9 @@ message TcpProxy { // receiving anything from downstream, which could fill the early data buffer. google.protobuf.UInt32Value max_early_data_bytes = 22 [(validate.rules).uint32 = {lte: 1048576}]; - // If set to ``true``, the TCP proxy checks the factory context's drain decision after each - // downstream read and downstream write. When drain close is requested for the listener's - // traffic direction, the downstream connection is closed with ``FlushWrite``. + // If set to ``true``, the TCP proxy checks if the downstream connection was marked as drained + // after each read or write. When drain close is requested for the listener's traffic direction, + // the downstream connection is closed with ``FlushWrite``. // // This is disabled by default for backward compatibility. bool check_drain_close = 24; From 801544d212a97a832612bf0d373a595d14d40c3e Mon Sep 17 00:00:00 2001 From: wbpcode/wangbaiping Date: Wed, 29 Apr 2026 02:57:27 +0000 Subject: [PATCH 3/6] add document of stats Signed-off-by: wbpcode/wangbaiping --- .../configuration/listeners/network_filters/tcp_proxy_filter.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst index afbf63c685808..223d03d8613db 100644 --- a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst +++ b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst @@ -225,3 +225,4 @@ The downstream statistics are rooted at *tcp..* with the following on_demand_cluster_timeout, Counter, Total number of connections closed due to on demand cluster lookup timeout upstream_flush_total, Counter, Total number of connections that continued to flush upstream data after the downstream connection was closed upstream_flush_active, Gauge, Total connections currently continuing to flush upstream data after the downstream connection was closed + downstream_cx_drain_close, Counter, Total number of connections closed due to drain close From e3fa7cb54845e7f500676eec997cb7e5290df7b3 Mon Sep 17 00:00:00 2001 From: wbpcode/wangbaiping Date: Wed, 29 Apr 2026 08:54:59 +0000 Subject: [PATCH 4/6] fix format Signed-off-by: wbpcode/wangbaiping --- .../network/tcp_proxy/v3/tcp_proxy.proto | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto index 6af9f55ceceb9..8b0e6a8f917aa 100644 --- a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto +++ b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto @@ -83,17 +83,17 @@ enum ProxyProtocolTlvMergePolicy { // [#next-free-field: 25] message TcpProxy { option (udpa.annotations.versioning).previous_message_type = - "envoy.config.filter.network.tcp_proxy.v2.TcpProxy"; + "envoy.config.filter.network.tcp_proxy.v2.TcpProxy"; // Allows specification of multiple upstream clusters along with weights indicating the percentage of // traffic forwarded to each cluster. The cluster selection is based on these weights. message WeightedCluster { option (udpa.annotations.versioning).previous_message_type = - "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.WeightedCluster"; + "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.WeightedCluster"; message ClusterWeight { option (udpa.annotations.versioning).previous_message_type = - "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.WeightedCluster.ClusterWeight"; + "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.WeightedCluster.ClusterWeight"; // Name of the upstream cluster. string name = 1 [(validate.rules).string = {min_len: 1}]; @@ -122,7 +122,7 @@ message TcpProxy { // [#next-free-field: 10] message TunnelingConfig { option (udpa.annotations.versioning).previous_message_type = - "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.TunnelingConfig"; + "envoy.config.filter.network.tcp_proxy.v2.TcpProxy.TunnelingConfig"; // The hostname to send in the synthesized CONNECT headers to the upstream proxy. // This field evaluates command operators if present; otherwise, the value is used as-is. @@ -154,7 +154,7 @@ message TcpProxy { // // Neither ``:``-prefixed pseudo-headers like ``:path`` nor the ``host`` header can be overridden. repeated config.core.v3.HeaderValueOption headers_to_add = 3 - [(validate.rules).repeated = {max_items: 1000}]; + [(validate.rules).repeated = {max_items: 1000}]; // Save response headers to the downstream connection's filter state for consumption // by network filters. The filter state key is ``envoy.tcp_proxy.propagate_response_headers``. @@ -225,7 +225,7 @@ message TcpProxy { // specified interval. // The interval must be at least 1ms. google.protobuf.Duration access_log_flush_interval = 1 - [(validate.rules).duration = {gte {nanos: 1000000}}]; + [(validate.rules).duration = {gte {nanos: 1000000}}]; // If set to ``true``, the access log is flushed when the TCP proxy successfully establishes a // connection with the upstream. If the connection fails, the access log is not flushed. @@ -312,7 +312,7 @@ message TcpProxy { // established. If not set, there is no maximum duration. When ``max_downstream_connection_duration`` is // reached, the connection is closed. The duration must be at least ``1ms``. google.protobuf.Duration max_downstream_connection_duration = 13 - [(validate.rules).duration = {gte {nanos: 1000000}}]; + [(validate.rules).duration = {gte {nanos: 1000000}}]; // Percentage-based jitter for ``max_downstream_connection_duration``. The jitter increases the // ``max_downstream_connection_duration`` by a random duration up to the provided percentage. @@ -343,7 +343,7 @@ message TcpProxy { // :ref:`flush_access_log_on_connected // `. bool flush_access_log_on_connected = 16 - [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]; + [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]; // Additional access log options for the TCP proxy. TcpAccessLogOptions access_log_options = 17; @@ -363,7 +363,7 @@ message TcpProxy { // :ref:`ProxyProtocolTlvMergePolicy // `. ProxyProtocolTlvMergePolicy proxy_protocol_tlv_merge_policy = 23 - [(validate.rules).enum = {defined_only: true}]; + [(validate.rules).enum = {defined_only: true}]; // Specifies when to establish the upstream connection. // From 7f7c7e530575a9a14310698fca2206693f3a2047 Mon Sep 17 00:00:00 2001 From: wbpcode/wangbaiping Date: Mon, 4 May 2026 14:34:05 +0000 Subject: [PATCH 5/6] try improve coverage Signed-off-by: wbpcode/wangbaiping --- .../extraction_util/BUILD | 1 + .../extraction_util/extraction_util_test.cc | 56 +++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/test/extensions/filters/http/proto_message_extraction/extraction_util/BUILD b/test/extensions/filters/http/proto_message_extraction/extraction_util/BUILD index 525c22c841824..411faf617f675 100644 --- a/test/extensions/filters/http/proto_message_extraction/extraction_util/BUILD +++ b/test/extensions/filters/http/proto_message_extraction/extraction_util/BUILD @@ -18,6 +18,7 @@ envoy_cc_test( rbe_pool = "6gig", deps = [ "//source/extensions/filters/http/proto_message_extraction/extraction_util", + "//source/extensions/filters/http/proto_message_extraction/extraction_util:proto_extractor", "//test/proto:extraction_proto_cc_proto", "//test/test_common:environment_lib", "//test/test_common:status_utility_lib", diff --git a/test/extensions/filters/http/proto_message_extraction/extraction_util/extraction_util_test.cc b/test/extensions/filters/http/proto_message_extraction/extraction_util/extraction_util_test.cc index aa2970e239bcc..a375b4c467993 100644 --- a/test/extensions/filters/http/proto_message_extraction/extraction_util/extraction_util_test.cc +++ b/test/extensions/filters/http/proto_message_extraction/extraction_util/extraction_util_test.cc @@ -6,6 +6,7 @@ #include #include "source/extensions/filters/http/proto_message_extraction/extraction_util/extraction_util.h" +#include "source/extensions/filters/http/proto_message_extraction/extraction_util/proto_extractor.h" #include "test/proto/extraction.pb.h" #include "test/test_common/environment.h" @@ -1081,6 +1082,61 @@ INSTANTIATE_TEST_SUITE_P( return info.param.test_name; }); +// When the extracted field value is non-empty, both target_resource (callback=false) and +// target_resource_callback (callback=true) should be populated. +TEST_F(ExtractionUtilTest, GetTargetResourceOrTargetResourceCallback_NonEmptyValue) { + FieldPathToExtractType field_policies = {{"bucket.name", {ExtractedMessageDirective::EXTRACT}}}; + auto extractor = ProtoExtractor::Create(ScrubberContext::kTestScrubbing, type_helper_.get(), + request_type_, field_policies); + + ExtractedMessageMetadata metadata = extractor->ExtractMessage(test_request_raw_proto_); + + // callback=false path: target_resource is set. + ASSERT_TRUE(metadata.target_resource.has_value()); + EXPECT_EQ(metadata.target_resource.value(), "test-bucket"); + + // callback=true path with non-empty value: target_resource_callback is set. + ASSERT_TRUE(metadata.target_resource_callback.has_value()); + EXPECT_EQ(metadata.target_resource_callback.value(), "test-bucket"); +} + +// When the extracted field value is empty, callback=false sets target_resource to "" and +// callback=true falls through to the else branch (also setting target_resource to ""), so +// target_resource_callback remains unset. +TEST_F(ExtractionUtilTest, GetTargetResourceOrTargetResourceCallback_EmptyValue) { + test_request_proto_.mutable_bucket()->clear_name(); + test_request_raw_proto_ = CordMessageData(test_request_proto_.SerializeAsCord()); + + FieldPathToExtractType field_policies = {{"bucket.name", {ExtractedMessageDirective::EXTRACT}}}; + auto extractor = ProtoExtractor::Create(ScrubberContext::kTestScrubbing, type_helper_.get(), + request_type_, field_policies); + + ExtractedMessageMetadata metadata = extractor->ExtractMessage(test_request_raw_proto_); + + // callback=false path: target_resource is set with an empty string. + ASSERT_TRUE(metadata.target_resource.has_value()); + EXPECT_EQ(metadata.target_resource.value(), ""); + + // callback=true path with empty value hits the else branch, so target_resource_callback is NOT + // set. + EXPECT_FALSE(metadata.target_resource_callback.has_value()); +} + +// When ExtractStringFieldValue fails (field is not a string type), the function returns early +// without setting either target_resource or target_resource_callback. +TEST_F(ExtractionUtilTest, GetTargetResourceOrTargetResourceCallback_ExtractionFailure) { + // "bucket.ratio" is a float field (not a string), so ExtractStringFieldValue will fail, + // but it is a valid field path for the scrubber so no crash occurs. + FieldPathToExtractType field_policies = {{"bucket.ratio", {ExtractedMessageDirective::EXTRACT}}}; + auto extractor = ProtoExtractor::Create(ScrubberContext::kTestScrubbing, type_helper_.get(), + request_type_, field_policies); + + ExtractedMessageMetadata metadata = extractor->ExtractMessage(test_request_raw_proto_); + + EXPECT_FALSE(metadata.target_resource.has_value()); + EXPECT_FALSE(metadata.target_resource_callback.has_value()); +} + } // namespace } // namespace ProtoMessageExtraction } // namespace HttpFilters From 07169e2cc1efc4d274b6de4471abb36e714cc1a1 Mon Sep 17 00:00:00 2001 From: wbpcode/wangbaiping Date: Thu, 7 May 2026 01:42:12 +0000 Subject: [PATCH 6/6] address some comments Signed-off-by: wbpcode/wangbaiping --- .../extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto | 2 +- .../listeners/network_filters/tcp_proxy_filter.rst | 2 +- source/common/tcp_proxy/tcp_proxy.cc | 2 +- test/common/tcp_proxy/tcp_proxy_test.cc | 6 +++--- test/extensions/filters/network/tcp_proxy/config_test.cc | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto index f2e0e04189aca..e6d0737829277 100644 --- a/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto +++ b/api/envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.proto @@ -403,5 +403,5 @@ message TcpProxy { // the downstream connection is closed with ``FlushWrite``. // // This is disabled by default for backward compatibility. - bool check_drain_close = 24; + google.protobuf.BoolValue check_drain_close = 24; } diff --git a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst index 223d03d8613db..662f324bb06b2 100644 --- a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst +++ b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst @@ -210,6 +210,7 @@ The downstream statistics are rooted at *tcp..* with the following downstream_cx_total, Counter, Total number of connections handled by the filter downstream_cx_no_route, Counter, Number of connections for which no matching route was found or the cluster for the route was not found + downstream_cx_drain_close, Counter, Total number of connections closed due to drain close downstream_cx_tx_bytes_total, Counter, Total bytes written to the downstream connection downstream_cx_tx_bytes_buffered, Gauge, Total bytes currently buffered to the downstream connection downstream_cx_rx_bytes_total, Counter, Total bytes read from the downstream connection @@ -225,4 +226,3 @@ The downstream statistics are rooted at *tcp..* with the following on_demand_cluster_timeout, Counter, Total number of connections closed due to on demand cluster lookup timeout upstream_flush_total, Counter, Total number of connections that continued to flush upstream data after the downstream connection was closed upstream_flush_active, Gauge, Total connections currently continuing to flush upstream data after the downstream connection was closed - downstream_cx_drain_close, Counter, Total number of connections closed due to drain close diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index e446c0a76aca7..e07abe5d9808f 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -226,7 +226,7 @@ Config::Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProx envoy::config::core::v3::TrafficDirection::INBOUND ? Network::DrainDirection::InboundOnly : Network::DrainDirection::All), - check_drain_close_(config.check_drain_close()) { + check_drain_close_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, check_drain_close, false)) { upstream_drain_manager_slot_->set([](Event::Dispatcher&) { ThreadLocal::ThreadLocalObjectSharedPtr drain_manager = std::make_shared(); diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index c17900c1f4f15..bed72243461bc 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -280,7 +280,7 @@ TEST_P(TcpProxyTest, DrainCloseIgnoredWhenFlagDisabled) { TEST_P(TcpProxyTest, DrainCloseAfterDownstreamRead) { auto config = defaultConfig(); - config.set_check_drain_close(true); + config.mutable_check_drain_close()->set_value(true); setup(1, config); EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All)) @@ -298,7 +298,7 @@ TEST_P(TcpProxyTest, DrainCloseAfterDownstreamRead) { TEST_P(TcpProxyTest, DrainCloseUsesInboundOnlyScopeForInboundListeners) { auto config = defaultConfig(); - config.set_check_drain_close(true); + config.mutable_check_drain_close()->set_value(true); EXPECT_CALL(factory_context_.listener_info_, direction()) .WillRepeatedly(Return(envoy::config::core::v3::TrafficDirection::INBOUND)); setup(1, config); @@ -318,7 +318,7 @@ TEST_P(TcpProxyTest, DrainCloseUsesInboundOnlyScopeForInboundListeners) { TEST_P(TcpProxyTest, DrainCloseAfterDownstreamWrite) { auto config = defaultConfig(); - config.set_check_drain_close(true); + config.mutable_check_drain_close()->set_value(true); setup(1, config); raiseEventUpstreamConnected(0); diff --git a/test/extensions/filters/network/tcp_proxy/config_test.cc b/test/extensions/filters/network/tcp_proxy/config_test.cc index 9046247971bcc..32b3114ab1570 100644 --- a/test/extensions/filters/network/tcp_proxy/config_test.cc +++ b/test/extensions/filters/network/tcp_proxy/config_test.cc @@ -83,7 +83,7 @@ TEST(ConfigTest, ConfigWithDrainCloseCheck) { factory.createEmptyConfigProto().get()); config.set_stat_prefix("prefix"); config.set_cluster("cluster"); - config.set_check_drain_close(true); + config.mutable_check_drain_close()->set_value(true); EXPECT_TRUE(factory.createFilterFactoryFromProto(config, context).ok()); }