Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcp tunneling: optionally move response header to downstream info #23118

Merged
merged 14 commits into from
Sep 26, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ message TcpProxy {
// Neither ``:-prefixed`` pseudo-headers nor the Host: header can be overridden.
repeated config.core.v3.HeaderValueOption headers_to_add = 3
[(validate.rules).repeated = {max_items: 1000}];

// Emit response headers to the downstream info filter state for consumption
// by the network filters. The filter state key is ``envoy.tcp_proxy.tunnel_response_headers``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make the key propagate as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

bool emit_response_headers = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd lean towards something akin to
save_connect_headers but I'll defer to api shephards on this one :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markdroth For API review and deciding on naming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling this propagate_response_headers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

}

message OnDemand {
Expand Down
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ new_features:
- area: redis
change: |
added support for quit command to the redis proxy.
- area: tcp_proxy
change: |
added support for emitting the response headers in :ref:`TunnelingConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emitting -> propagating ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

<envoy_v3_api_field_extensions.filters.network.tcp_proxy.v3.TcpProxy.TunnelingConfig.emit_response_headers>` to
the downstream info filter state.

deprecated:
- area: http
Expand Down
4 changes: 4 additions & 0 deletions envoy/tcp/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class TunnelingConfigHelper {

// The evaluator to add additional HTTP request headers to the upstream request.
virtual Envoy::Http::HeaderEvaluator& headerEvaluator() const PURE;

// Emit HTTP response headers to the downstream filter state.
virtual void emitResponseHeaders(Http::ResponseHeaderMapPtr&& headers,
const StreamInfo::FilterStateSharedPtr& filter_state) const PURE;
};

using TunnelingConfigHelperOptConstRef = OptRef<const TunnelingConfigHelper>;
Expand Down
5 changes: 5 additions & 0 deletions envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class LoadBalancerContext {
*/
virtual const Network::Connection* downstreamConnection() const PURE;

/**
* @return StreamInfo::StreamInfo* from the incoming connection or nullptr.
*/
virtual StreamInfo::StreamInfo* downstreamInfo() const PURE;

/**
* @return const Http::HeaderMap* the incoming headers or nullptr to use during load
* balancing.
Expand Down
1 change: 1 addition & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
const Network::Connection* downstreamConnection() const override {
return callbacks_->connection().ptr();
}
StreamInfo::StreamInfo* downstreamInfo() const override { return &callbacks_->streamInfo(); }
kyessenov marked this conversation as resolved.
Show resolved Hide resolved
const Http::RequestHeaderMap* downstreamHeaders() const override { return downstream_headers_; }

bool shouldSelectAnotherHost(const Upstream::Host& host) override {
Expand Down
1 change: 1 addition & 0 deletions source/common/tcp_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ envoy_cc_library(
"//source/common/upstream:load_balancer_lib",
"//source/extensions/upstreams/tcp/generic:config",
"@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto",
],
)
30 changes: 29 additions & 1 deletion source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "envoy/buffer/buffer.h"
#include "envoy/config/accesslog/v3/accesslog.pb.h"
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/event/dispatcher.h"
#include "envoy/event/timer.h"
#include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.h"
Expand Down Expand Up @@ -541,12 +542,28 @@ const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() {
}
}

ProtobufTypes::MessagePtr TunnelResponseHeaders::serializeAsProto() const {
auto proto_out = std::make_unique<envoy::config::core::v3::HeaderMap>();
response_headers_->iterate([&proto_out](const Http::HeaderEntry& e) -> Http::HeaderMap::Iterate {
auto* new_header = proto_out->add_headers();
new_header->set_key(std::string(e.key().getStringView()));
new_header->set_value(std::string(e.value().getStringView()));
return Http::HeaderMap::Iterate::Continue;
});
return proto_out;
}

const std::string& TunnelResponseHeaders::key() {
CONSTRUCT_ON_FIRST_USE(std::string, "envoy.tcp_proxy.tunnel_response_headers");
}

TunnelingConfigHelperImpl::TunnelingConfigHelperImpl(
const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig&
config_message,
Server::Configuration::FactoryContext& context)
: use_post_(config_message.use_post()),
header_parser_(Envoy::Router::HeaderParser::configure(config_message.headers_to_add())) {
header_parser_(Envoy::Router::HeaderParser::configure(config_message.headers_to_add())),
emit_response_headers_(config_message.emit_response_headers()) {
envoy::config::core::v3::SubstitutionFormatString substitution_format_config;
substitution_format_config.mutable_text_format_source()->set_inline_string(
config_message.hostname());
Expand All @@ -561,6 +578,17 @@ std::string TunnelingConfigHelperImpl::host(const StreamInfo::StreamInfo& stream
absl::string_view());
}

void TunnelingConfigHelperImpl::emitResponseHeaders(
Http::ResponseHeaderMapPtr&& headers,
const StreamInfo::FilterStateSharedPtr& filter_state) const {
if (!emit_response_headers_) {
return;
}
filter_state->setData(
TunnelResponseHeaders::key(), std::make_shared<TunnelResponseHeaders>(std::move(headers)),
StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection);
}

void Filter::onConnectTimeout() {
ENVOY_CONN_LOG(debug, "connect timeout", read_callbacks_->connection());
read_callbacks_->upstreamHost()->outlierDetector().putResult(
Expand Down
24 changes: 23 additions & 1 deletion source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,23 @@ using RouteConstSharedPtr = std::shared_ptr<const Route>;
using TunnelingConfig =
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;

class TunnelingConfigHelperImpl : public TunnelingConfigHelper {
/**
* Response headers for the tunneling connections.
*/
class TunnelResponseHeaders : public StreamInfo::FilterState::Object {
public:
TunnelResponseHeaders(Http::ResponseHeaderMapPtr&& response_headers)
: response_headers_(std::move(response_headers)) {}
const Http::ResponseHeaderMap& value() const { return *response_headers_; }
ProtobufTypes::MessagePtr serializeAsProto() const override;
static const std::string& key();

private:
const Http::ResponseHeaderMapPtr response_headers_;
};

class TunnelingConfigHelperImpl : public TunnelingConfigHelper,
protected Logger::Loggable<Logger::Id::filter> {
public:
TunnelingConfigHelperImpl(
const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig&
Expand All @@ -121,11 +137,14 @@ class TunnelingConfigHelperImpl : public TunnelingConfigHelper {
std::string host(const StreamInfo::StreamInfo& stream_info) const override;
bool usePost() const override { return use_post_; }
Envoy::Http::HeaderEvaluator& headerEvaluator() const override { return *header_parser_; }
void emitResponseHeaders(Http::ResponseHeaderMapPtr&& headers,
const StreamInfo::FilterStateSharedPtr& filter_state) const override;

private:
const bool use_post_;
std::unique_ptr<Envoy::Router::HeaderParser> header_parser_;
Formatter::FormatterPtr hostname_fmt_;
const bool emit_response_headers_;
};

/**
Expand Down Expand Up @@ -360,6 +379,9 @@ class Filter : public Network::ReadFilter,
const Network::Connection* downstreamConnection() const override {
return &read_callbacks_->connection();
}
StreamInfo::StreamInfo* downstreamInfo() const override {
return &read_callbacks_->connection().streamInfo();
}

Network::TransportSocketOptionsConstSharedPtr upstreamTransportSocketOptions() const override {
return transport_socket_options_;
Expand Down
8 changes: 4 additions & 4 deletions source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ TcpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {

HttpUpstream::HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config,
const StreamInfo::StreamInfo& downstream_info)
StreamInfo::StreamInfo& downstream_info)
: config_(config), downstream_info_(downstream_info), response_decoder_(*this),
upstream_callbacks_(callbacks) {}

Expand Down Expand Up @@ -199,7 +199,7 @@ HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
Http::CodecType type)
: config_(config), type_(type), upstream_callbacks_(upstream_callbacks),
downstream_info_(context->downstreamConnection()->streamInfo()) {
downstream_info_(*context->downstreamInfo()) {
absl::optional<Http::Protocol> protocol;
if (type_ == Http::CodecType::HTTP3) {
protocol = Http::Protocol::Http3;
Expand Down Expand Up @@ -259,7 +259,7 @@ void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& h

Http2Upstream::Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config,
const StreamInfo::StreamInfo& downstream_info)
StreamInfo::StreamInfo& downstream_info)
: HttpUpstream(callbacks, config, downstream_info) {}

bool Http2Upstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
Expand Down Expand Up @@ -298,7 +298,7 @@ void Http2Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, boo

Http1Upstream::Http1Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config,
const StreamInfo::StreamInfo& downstream_info)
StreamInfo::StreamInfo& downstream_info)
: HttpUpstream(callbacks, config, downstream_info) {}

void Http1Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool) {
Expand Down
12 changes: 7 additions & 5 deletions source/common/tcp_proxy/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba
GenericConnectionPoolCallbacks* callbacks_{};
Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks_;
std::unique_ptr<HttpUpstream> upstream_;
const StreamInfo::StreamInfo& downstream_info_;
StreamInfo::StreamInfo& downstream_info_;
};

class TcpUpstream : public GenericUpstream {
Expand Down Expand Up @@ -149,15 +149,15 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks {

protected:
HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config, const StreamInfo::StreamInfo& downstream_info);
const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info);
void resetEncoder(Network::ConnectionEvent event, bool inform_downstream = true);

// The encoder offered by the upstream http client.
Http::RequestEncoder* request_encoder_{};
// The config object that is owned by the downstream network filter chain factory.
const TunnelingConfigHelper& config_;
// The downstream info that is owned by the downstream connection.
const StreamInfo::StreamInfo& downstream_info_;
StreamInfo::StreamInfo& downstream_info_;

private:
class DecoderShim : public Http::ResponseDecoder {
Expand All @@ -169,6 +169,8 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks {
if (!parent_.isValidResponse(*headers) || end_stream) {
parent_.resetEncoder(Network::ConnectionEvent::LocalClose);
} else if (parent_.conn_pool_callbacks_ != nullptr) {
parent_.config_.emitResponseHeaders(std::move(headers),
parent_.downstream_info_.filterState());
parent_.conn_pool_callbacks_->onSuccess(*parent_.request_encoder_);
parent_.conn_pool_callbacks_.reset();
}
Expand Down Expand Up @@ -201,7 +203,7 @@ class HttpUpstream : public GenericUpstream, protected Http::StreamCallbacks {
class Http1Upstream : public HttpUpstream {
public:
Http1Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config, const StreamInfo::StreamInfo& downstream_info);
const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info);

void encodeData(Buffer::Instance& data, bool end_stream) override;
void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) override;
Expand All @@ -211,7 +213,7 @@ class Http1Upstream : public HttpUpstream {
class Http2Upstream : public HttpUpstream {
public:
Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
const TunnelingConfigHelper& config, const StreamInfo::StreamInfo& downstream_info);
const TunnelingConfigHelper& config, StreamInfo::StreamInfo& downstream_info);

void setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) override;
bool isValidResponse(const Http::ResponseHeaderMap& headers) override;
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ class LoadBalancerContextBase : public LoadBalancerContext {

const Network::Connection* downstreamConnection() const override { return nullptr; }

StreamInfo::StreamInfo* downstreamInfo() const override { return nullptr; }

const Router::MetadataMatchCriteria* metadataMatchCriteria() override { return nullptr; }

const Http::RequestHeaderMap* downstreamHeaders() const override { return nullptr; }
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/subset_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::Loggable<Logger::Id::ups
const Network::Connection* downstreamConnection() const override {
return wrapped_->downstreamConnection();
}
StreamInfo::StreamInfo* downstreamInfo() const override { return wrapped_->downstreamInfo(); }
const Http::RequestHeaderMap* downstreamHeaders() const override {
return wrapped_->downstreamHeaders();
}
Expand Down
1 change: 1 addition & 0 deletions source/extensions/clusters/aggregate/lb_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class AggregateLoadBalancerContext : public Upstream::LoadBalancerContextBase {
const Network::Connection* downstreamConnection() const override {
return context_->downstreamConnection();
}
StreamInfo::StreamInfo* downstreamInfo() const override { return context_->downstreamInfo(); }
const Router::MetadataMatchCriteria* metadataMatchCriteria() override {
return context_->metadataMatchCriteria();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ const Network::Connection* Router::downstreamConnection() const {
return callbacks_ != nullptr ? callbacks_->connection() : nullptr;
}

StreamInfo::StreamInfo* Router::downstreamInfo() const {
if (callbacks_ != nullptr) {
return &callbacks_->streamInfo();
}
return nullptr;
}

void Router::cleanup() {
if (upstream_request_) {
upstream_request_.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
// Upstream::LoadBalancerContextBase
const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() override;
const Network::Connection* downstreamConnection() const override;
StreamInfo::StreamInfo* downstreamInfo() const override;

// Tcp::ConnectionPool::UpstreamCallbacks
void onUpstreamData(Buffer::Instance& data, bool end_stream) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,13 @@ const Network::Connection* Router::downstreamConnection() const {
return nullptr;
}

StreamInfo::StreamInfo* Router::downstreamInfo() const {
if (callbacks_ != nullptr) {
return &callbacks_->streamInfo();
}
return nullptr;
}

void Router::cleanup() { upstream_request_.reset(); }

} // namespace Router
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,

// Upstream::LoadBalancerContext
const Network::Connection* downstreamConnection() const override;
StreamInfo::StreamInfo* downstreamInfo() const override;
const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() override {
const Envoy::Router::MetadataMatchCriteria* route_criteria =
(route_entry_ != nullptr) ? route_entry_->metadataMatchCriteria() : nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class ShadowRouterImpl : public ShadowRouterHandle,

// Upstream::LoadBalancerContextBase
const Network::Connection* downstreamConnection() const override { return nullptr; }
StreamInfo::StreamInfo* downstreamInfo() const override { return nullptr; }
const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() override { return nullptr; }

// Event::DeferredDeletable
Expand Down
2 changes: 2 additions & 0 deletions test/extensions/filters/network/dubbo_proxy/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ class DubboRouterTestBase {
router_ = std::make_unique<Router>(context_.clusterManager());

EXPECT_EQ(nullptr, router_->downstreamConnection());
EXPECT_EQ(nullptr, router_->downstreamInfo());

router_->setDecoderFilterCallbacks(callbacks_);
router_->setEncoderFilterCallbacks(encoder_callbacks_);
Expand Down Expand Up @@ -275,6 +276,7 @@ class DubboRouterTestBase {

EXPECT_CALL(callbacks_, connection()).WillRepeatedly(Return(&connection_));
EXPECT_EQ(&connection_, router_->downstreamConnection());
EXPECT_EQ(&callbacks_.stream_info_, router_->downstreamInfo());

// Not yet implemented:
EXPECT_EQ(absl::optional<uint64_t>(), router_->computeHashKey());
Expand Down
2 changes: 2 additions & 0 deletions test/extensions/filters/network/thrift_proxy/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class ThriftRouterTestBase {
shadow_writer, close_downstream_on_error);

EXPECT_EQ(nullptr, router_->downstreamConnection());
EXPECT_EQ(nullptr, router_->downstreamInfo());

router_->setDecoderFilterCallbacks(callbacks_);
}
Expand Down Expand Up @@ -267,6 +268,7 @@ class ThriftRouterTestBase {
EXPECT_CALL(callbacks_, connection()).WillRepeatedly(Return(&connection_));
EXPECT_CALL(callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_EQ(&connection_, router_->downstreamConnection());
EXPECT_EQ(&callbacks_.stream_info_, router_->downstreamInfo());

// Not yet implemented:
EXPECT_EQ(absl::optional<uint64_t>(), router_->computeHashKey());
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/upstreams/tcp/generic/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ namespace Generic {
class TcpConnPoolTest : public ::testing::Test {
public:
TcpConnPoolTest() {
EXPECT_CALL(connection_, streamInfo()).WillRepeatedly(ReturnRef(downstream_stream_info_));
EXPECT_CALL(lb_context_, downstreamConnection()).WillRepeatedly(Return(&connection_));
EXPECT_CALL(lb_context_, downstreamInfo()).WillRepeatedly(Return(&downstream_stream_info_));
}
NiceMock<Upstream::MockThreadLocalCluster> thread_local_cluster_;
GenericConnPoolFactory factory_;
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,7 @@ envoy_cc_test(
"//source/extensions/filters/network/tcp_proxy:config",
"//source/extensions/upstreams/http/tcp:config",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/access_loggers/file/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/upstreams/http/tcp/v3:pkg_cc_proto",
],
Expand Down
Loading