Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ enum ProxyProtocolTlvMergePolicy {
APPEND_IF_EXISTS_OR_ADD = 2;
}

// [#next-free-field: 24]
// [#next-free-field: 25]
message TcpProxy {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.network.tcp_proxy.v2.TcpProxy";
Expand Down Expand Up @@ -397,4 +397,11 @@ message TcpProxy {
// Use this carefully with server-first protocols. The upstream may send data before
// receiving anything from downstream, which could fill the early data buffer.
google.protobuf.UInt32Value max_early_data_bytes = 22 [(validate.rules).uint32 = {lte: 1048576}];

// If set to ``true``, the TCP proxy checks if the downstream connection was marked as drained
// after each read or write. When drain close is requested for the listener's traffic direction,
// the downstream connection is closed with ``FlushWrite``.
//
// This is disabled by default for backward compatibility.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enable by default and leave a runtime setting to temporarily change the default. It seems like much better behavior to have this enabled.

Copy link
Copy Markdown
Member Author

@wbpcode wbpcode May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if there is a case that the users want to keep the existing TCP connections as long time as possible by setting a long drain duration. Different with HTTP, we have no way to close the connection gracefully.
So, I slightly inclined to keep an option to control it. But I can change the bool to wrapped value so it's would be easier to change the default value in the future. WDYT?

Copy link
Copy Markdown
Member Author

@wbpcode wbpcode May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you strongly prefer to use runtime guard and enable it by default, I am also fine to that. Feel free to let me know.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly, it was just a thought. I'm ok leaving it as is.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you used a message here, you could have put a timer-based check as well in the future. I don't think we see TCP keep-alives in Envoy, unlike HTTP, so a connection can be stuck without drain or data without a timer.

google.protobuf.BoolValue check_drain_close = 24;
}
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`

new_features:
- area: tcp_proxy
change: |
Added :ref:`check_drain_close
<envoy_v3_api_field_extensions.filters.network.tcp_proxy.v3.TcpProxy.check_drain_close>` to the TCP proxy
filter to close downstream connections with ``FlushWrite`` when the drain manager requests drain close during
downstream read or write handling.
Comment thread
wbpcode marked this conversation as resolved.
- area: stat_sinks
change: |
Added :ref:`max_data_points_per_request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ The downstream statistics are rooted at *tcp.<stat_prefix>.* with the following

downstream_cx_total, Counter, Total number of connections handled by the filter
downstream_cx_no_route, Counter, Number of connections for which no matching route was found or the cluster for the route was not found
downstream_cx_drain_close, Counter, Total number of connections closed due to drain close
downstream_cx_tx_bytes_total, Counter, Total bytes written to the downstream connection
downstream_cx_tx_bytes_buffered, Gauge, Total bytes currently buffered to the downstream connection
downstream_cx_rx_bytes_total, Counter, Total bytes read from the downstream connection
Expand Down
1 change: 1 addition & 0 deletions envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ struct LocalCloseReasonValues {
const std::string TransportSocketTimeout = "transport_socket_timeout";
const std::string TriggeredDelayedCloseTimeout = "triggered_delayed_close_timeout";
const std::string TcpProxyInitializationFailure = "tcp_initializion_failure:";
const std::string TcpProxyDrainClose = "tcp_proxy_drain_close";
const std::string TcpSessionIdleTimeout = "tcp_session_idle_timeout";
const std::string MaxConnectionDurationReached = "max_connection_duration_reached";
const std::string ClosingUpstreamTcpDueToDownstreamRemoteClose =
Expand Down
23 changes: 22 additions & 1 deletion source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,13 @@ Config::Config(const envoy::extensions::filters::network::tcp_proxy::v3::TcpProx
upstream_drain_manager_slot_(context.serverFactoryContext().threadLocal().allocateSlot()),
shared_config_(std::make_shared<SharedConfig>(config, context)),
random_generator_(context.serverFactoryContext().api().randomGenerator()),
regex_engine_(context.serverFactoryContext().regexEngine()) {
regex_engine_(context.serverFactoryContext().regexEngine()),
drain_decision_(context.drainDecision()),
drain_close_scope_(context.listenerInfo().direction() ==
envoy::config::core::v3::TrafficDirection::INBOUND
? Network::DrainDirection::InboundOnly
: Network::DrainDirection::All),
check_drain_close_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, check_drain_close, false)) {
upstream_drain_manager_slot_->set([](Event::Dispatcher&) {
ThreadLocal::ThreadLocalObjectSharedPtr drain_manager =
std::make_shared<UpstreamDrainManager>();
Expand Down Expand Up @@ -1112,6 +1118,7 @@ Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) {
// Before there is an upstream the connection should be readDisabled. If the upstream is
// destroyed, there should be no further reads as well.
ASSERT(0 == data.length());
maybeCloseDownstreamForDrainClose();
Comment thread
wbpcode marked this conversation as resolved.
Comment thread
wbpcode marked this conversation as resolved.
return Network::FilterStatus::StopIteration;
}

Expand Down Expand Up @@ -1248,6 +1255,20 @@ void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) {
read_callbacks_->connection().write(data, end_stream);
ASSERT(0 == data.length());
resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive?
maybeCloseDownstreamForDrainClose();
}

void Filter::maybeCloseDownstreamForDrainClose() {
if (!config_->checkDrainClose() || downstream_closed_ ||
read_callbacks_->connection().state() != Network::Connection::State::Open ||
!config_->drainDecision().drainClose(config_->drainCloseScope())) {
return;
}

ENVOY_CONN_LOG(debug, "drain closing tcp_proxy connection", read_callbacks_->connection());
config_->stats().downstream_cx_drain_close_.inc();
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite,
StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose);
}

void Filter::onUpstreamEvent(Network::ConnectionEvent event) {
Expand Down
8 changes: 8 additions & 0 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ constexpr absl::string_view ReceiveBeforeConnectKey = "envoy.tcp_proxy.receive_b
* All tcp proxy stats. @see stats_macros.h
*/
#define ALL_TCP_PROXY_STATS(COUNTER, GAUGE) \
COUNTER(downstream_cx_drain_close) \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this counter described in a doc?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's me take a check.

COUNTER(downstream_cx_no_route) \
COUNTER(downstream_cx_rx_bytes_total) \
COUNTER(downstream_cx_total) \
Expand Down Expand Up @@ -377,6 +378,9 @@ class Config {
}

const absl::optional<uint32_t>& maxEarlyDataBytes() const { return max_early_data_bytes_; }
bool checkDrainClose() const { return check_drain_close_; }
const Network::DrainDecision& drainDecision() const { return drain_decision_; }
Network::DrainDirection drainCloseScope() const { return drain_close_scope_; }

private:
struct SimpleRouteImpl : public Route {
Expand Down Expand Up @@ -433,6 +437,9 @@ class Config {
envoy::extensions::filters::network::tcp_proxy::v3::UpstreamConnectMode upstream_connect_mode_{
envoy::extensions::filters::network::tcp_proxy::v3::IMMEDIATE};
absl::optional<uint32_t> max_early_data_bytes_;
const Network::DrainDecision& drain_decision_;
const Network::DrainDirection drain_close_scope_{};
const bool check_drain_close_{false};
};

using ConfigSharedPtr = std::shared_ptr<Config>;
Expand Down Expand Up @@ -690,6 +697,7 @@ class Filter : public Network::ReadFilter,
void onDownstreamEvent(Network::ConnectionEvent event);
void onUpstreamData(Buffer::Instance& data, bool end_stream);
void onUpstreamEvent(Network::ConnectionEvent event);
void maybeCloseDownstreamForDrainClose();
void onUpstreamConnection();
void onIdleTimeout();
void resetIdleTimer();
Expand Down
68 changes: 68 additions & 0 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,74 @@ TEST_P(TcpProxyTest, HalfCloseProxy) {
upstream_callbacks_->onEvent(Network::ConnectionEvent::RemoteClose);
}

TEST_P(TcpProxyTest, DrainCloseIgnoredWhenFlagDisabled) {
setup(1);

EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All)).Times(0);
EXPECT_CALL(filter_callbacks_.connection_, close(_, _)).Times(0);

raiseEventUpstreamConnected(0);

Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false));
filter_->onData(buffer, false);
}

TEST_P(TcpProxyTest, DrainCloseAfterDownstreamRead) {
auto config = defaultConfig();
config.mutable_check_drain_close()->set_value(true);
setup(1, config);

EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All))
.WillOnce(Return(true));
EXPECT_CALL(filter_callbacks_.connection_,
close(Network::ConnectionCloseType::FlushWrite,
StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose));

raiseEventUpstreamConnected(0);

Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false));
filter_->onData(buffer, false);
}

TEST_P(TcpProxyTest, DrainCloseUsesInboundOnlyScopeForInboundListeners) {
auto config = defaultConfig();
config.mutable_check_drain_close()->set_value(true);
EXPECT_CALL(factory_context_.listener_info_, direction())
.WillRepeatedly(Return(envoy::config::core::v3::TrafficDirection::INBOUND));
setup(1, config);

EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::InboundOnly))
.WillOnce(Return(true));
EXPECT_CALL(filter_callbacks_.connection_,
close(Network::ConnectionCloseType::FlushWrite,
StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose));

raiseEventUpstreamConnected(0);

Buffer::OwnedImpl buffer("hello");
EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), false));
filter_->onData(buffer, false);
}

TEST_P(TcpProxyTest, DrainCloseAfterDownstreamWrite) {
auto config = defaultConfig();
config.mutable_check_drain_close()->set_value(true);
setup(1, config);

raiseEventUpstreamConnected(0);

Buffer::OwnedImpl buffer("world");
EXPECT_CALL(factory_context_.drain_manager_, drainClose(Network::DrainDirection::All))
.WillOnce(Return(true));
EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&buffer), false));
EXPECT_CALL(filter_callbacks_.connection_,
close(Network::ConnectionCloseType::FlushWrite,
StreamInfo::LocalCloseReasons::get().TcpProxyDrainClose));
upstream_callbacks_->onUpstreamData(buffer, false);
}

// Test with an explicitly configured upstream.
TEST_P(TcpProxyTest, ExplicitFactory) {
// Explicitly configure an HTTP upstream, to test factory creation.
Expand Down
13 changes: 13 additions & 0 deletions test/extensions/filters/network/tcp_proxy/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ TEST(ConfigTest, ConfigTest) {
cb(connection);
}

TEST(ConfigTest, ConfigWithDrainCloseCheck) {
NiceMock<Server::Configuration::MockFactoryContext> context;
ConfigFactory factory;
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy config =
*dynamic_cast<envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy*>(
factory.createEmptyConfigProto().get());
config.set_stat_prefix("prefix");
config.set_cluster("cluster");
config.mutable_check_drain_close()->set_value(true);

EXPECT_TRUE(factory.createFilterFactoryFromProto(config, context).ok());
}

TEST(ConfigTest, TunnelingConfigWithFormatters) {
Envoy::Formatter::TestCommandFactory test_factory;
Registry::InjectFactory<Envoy::Formatter::CommandParserFactory> register_factory(test_factory);
Expand Down
4 changes: 4 additions & 0 deletions test/mocks/server/factory_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Envoy {
namespace Server {
namespace Configuration {

using ::testing::Return;
using ::testing::ReturnRef;

MockFactoryContext::MockFactoryContext() {
Expand All @@ -22,6 +23,9 @@ MockFactoryContext::MockFactoryContext() {

ON_CALL(*this, drainDecision()).WillByDefault(ReturnRef(drain_manager_));
ON_CALL(*this, listenerScope()).WillByDefault(ReturnRef(*listener_store_.rootScope()));
ON_CALL(*this, listenerInfo()).WillByDefault(ReturnRef(listener_info_));
ON_CALL(listener_info_, direction())
.WillByDefault(Return(envoy::config::core::v3::TrafficDirection::UNSPECIFIED));
}

MockFactoryContext::~MockFactoryContext() = default;
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/server/factory_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "source/common/router/context_impl.h"
#include "source/common/tls/context_manager_impl.h"

#include "test/mocks/network/mocks.h"
#include "test/mocks/server/admin.h"
#include "test/mocks/server/drain_manager.h"
#include "test/mocks/server/instance.h"
Expand Down Expand Up @@ -39,6 +40,7 @@ class MockFactoryContext : public virtual ListenerFactoryContext {
Stats::IsolatedStoreImpl listener_store_;
Stats::Scope& listener_scope_{*listener_store_.rootScope()};
testing::NiceMock<MockDrainManager> drain_manager_;
testing::NiceMock<Network::MockListenerInfo> listener_info_;
};

class MockUpstreamFactoryContext : public UpstreamFactoryContext {
Expand Down
Loading