diff --git a/api/envoy/extensions/filters/network/reverse_tunnel/v3/reverse_tunnel.proto b/api/envoy/extensions/filters/network/reverse_tunnel/v3/reverse_tunnel.proto index 6dce5bda0608f..77e1552166593 100644 --- a/api/envoy/extensions/filters/network/reverse_tunnel/v3/reverse_tunnel.proto +++ b/api/envoy/extensions/filters/network/reverse_tunnel/v3/reverse_tunnel.proto @@ -96,7 +96,7 @@ message Validation { // Configuration for the reverse tunnel network filter. // This filter handles reverse tunnel connection acceptance and rejection by processing // HTTP requests where required identification values are provided via HTTP headers. -// [#next-free-field: 8] +// [#next-free-field: 9] message ReverseTunnel { // Ping interval for health checks on established reverse tunnel connections. // If not specified, defaults to ``2 seconds``. @@ -140,4 +140,9 @@ message ReverseTunnel { // flag to the same value. // Defaults to ``false``. bool use_http_upgrade = 7; + + // When true, skip worker-thread rebalancing for accepted reverse tunnel connections. + // This avoids the cross-worker lock in pickLeastLoadedSocketManager. + // Default: false (rebalancing enabled). + bool skip_rebalancing = 8; } diff --git a/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.cc b/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.cc index 9bfb9bc811fa0..85613a45c5aea 100644 --- a/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.cc +++ b/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.cc @@ -142,8 +142,7 @@ ReverseTunnelFilterConfig::ReverseTunnelFilterConfig( ? std::chrono::milliseconds( DurationUtil::durationToMilliseconds(proto_config.ping_interval())) : std::chrono::milliseconds(2000)), - auto_close_connections_( - proto_config.auto_close_connections() ? proto_config.auto_close_connections() : false), + auto_close_connections_(proto_config.auto_close_connections()), request_path_( proto_config.request_path().empty() ? std::string(::Envoy::Extensions::Bootstrap::ReverseConnection:: @@ -167,7 +166,8 @@ ReverseTunnelFilterConfig::ReverseTunnelFilterConfig( ? proto_config.validation().dynamic_metadata_namespace() : "envoy.filters.network.reverse_tunnel"), required_cluster_name_(proto_config.required_cluster_name()), - use_http_upgrade_(proto_config.use_http_upgrade()) {} + use_http_upgrade_(proto_config.use_http_upgrade()), + skip_rebalancing_(proto_config.skip_rebalancing()) {} bool ReverseTunnelFilterConfig::validateIdentifiers( absl::string_view node_id, absl::string_view cluster_id, absl::string_view tenant_id, @@ -587,7 +587,7 @@ void ReverseTunnelFilter::processAcceptedConnection(absl::string_view node_id, ENVOY_CONN_LOG(trace, "reverse_tunnel: registering wrapped socket for reuse", connection); socket_manager->addConnectionSocket(std::string(node_id), std::string(cluster_id), std::move(wrapped_socket), ping_seconds, - false /* rebalanced */, tenant_id); + /* rebalanced= */ config_->skipRebalancing(), tenant_id); ENVOY_CONN_LOG(debug, "reverse_tunnel: successfully registered wrapped socket for reuse", connection); diff --git a/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.h b/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.h index a665c806998d7..af87b72caf460 100644 --- a/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.h +++ b/source/extensions/filters/network/reverse_tunnel/reverse_tunnel_filter.h @@ -65,6 +65,9 @@ class ReverseTunnelFilterConfig : public Logger::Loggable { // Returns whether the handshake is negotiated as an HTTP/1.1 Upgrade exchange. bool useHttpUpgrade() const { return use_http_upgrade_; } + // Returns whether worker-thread rebalancing should be skipped for accepted connections. + bool skipRebalancing() const { return skip_rebalancing_; } + private: ReverseTunnelFilterConfig( const envoy::extensions::filters::network::reverse_tunnel::v3::ReverseTunnel& proto_config, @@ -89,6 +92,8 @@ class ReverseTunnelFilterConfig : public Logger::Loggable { // When true, expect `Connection: Upgrade` + `Upgrade: reverse-tunnel` and respond `101`. const bool use_http_upgrade_{false}; + + const bool skip_rebalancing_{false}; }; using ReverseTunnelFilterConfigSharedPtr = std::shared_ptr; diff --git a/test/extensions/filters/network/reverse_tunnel/config_test.cc b/test/extensions/filters/network/reverse_tunnel/config_test.cc index ac2eae1e54cc3..70c7bfd648ae3 100644 --- a/test/extensions/filters/network/reverse_tunnel/config_test.cc +++ b/test/extensions/filters/network/reverse_tunnel/config_test.cc @@ -358,6 +358,25 @@ request_method: GET EXPECT_THAT(result.status().message(), testing::HasSubstr("Failed to parse tenant_id_format")); } +// Tests that the ReverseTunnelFilterConfig is formed properly and the filter construction works. +TEST(ReverseTunnelFilterConfigFactoryTest, ConfigurationSkipRebalancingEnabled) { + envoy::extensions::filters::network::reverse_tunnel::v3::ReverseTunnel proto_config; + proto_config.set_skip_rebalancing(true); + proto_config.set_request_path("/request"); + proto_config.set_request_method(envoy::config::core::v3::POST); + + ReverseTunnelFilterConfigFactory factory; + NiceMock context; + auto result = factory.createFilterFactoryFromProto(proto_config, context); + ASSERT_TRUE(result.ok()); + Network::FilterFactoryCb cb = result.value(); + EXPECT_TRUE(cb != nullptr); + + Network::MockFilterManager filter_manager; + EXPECT_CALL(filter_manager, addReadFilter(_)); + cb(filter_manager); +} + } // namespace } // namespace ReverseTunnel } // namespace NetworkFilters diff --git a/test/extensions/filters/network/reverse_tunnel/filter_unit_test.cc b/test/extensions/filters/network/reverse_tunnel/filter_unit_test.cc index 5d331e5b51895..e7b9e990882e2 100644 --- a/test/extensions/filters/network/reverse_tunnel/filter_unit_test.cc +++ b/test/extensions/filters/network/reverse_tunnel/filter_unit_test.cc @@ -480,6 +480,7 @@ TEST_F(ReverseTunnelFilterUnitTest, ConfigurationDefaults) { EXPECT_FALSE(config->autoCloseConnections()); EXPECT_EQ("/reverse_connections/request", config->requestPath()); EXPECT_EQ("GET", config->requestMethod()); + EXPECT_FALSE(config->skipRebalancing()); } // Test RequestDecoder methods not fully covered. @@ -2183,6 +2184,15 @@ TEST_F(ReverseTunnelFilterWithUpstreamTest, UpgradeMode_MissingUpgradeRejected) EXPECT_EQ(1, parse_error->value()); } +TEST_F(ReverseTunnelFilterUnitTest, FilterConfigLoadsSkipRebalancing) { + envoy::extensions::filters::network::reverse_tunnel::v3::ReverseTunnel cfg; + cfg.set_skip_rebalancing(true); + + auto config_or_error = ReverseTunnelFilterConfig::create(cfg, factory_context_); + ASSERT_TRUE(config_or_error.ok()); + EXPECT_TRUE(config_or_error.value()->skipRebalancing()); +} + } // namespace } // namespace ReverseTunnel } // namespace NetworkFilters