From 5451efd9b8f8a444431197050e45ba974ed4e9d8 Mon Sep 17 00:00:00 2001 From: imccarten1 Date: Mon, 29 Apr 2024 13:45:58 -0400 Subject: [PATCH] async_client: add support for filter state (#33772) Add support for FilterState to AsyncClient This allows upstream filters used in an AsyncClient request to access the FilterState. Signed-off-by: Isabel Mccarten --- envoy/http/async_client.h | 12 ++++ source/common/http/async_client_impl.cc | 6 +- source/common/stream_info/stream_info_impl.h | 24 ++++---- test/common/http/async_client_impl_test.cc | 60 ++++++++++++++++++++ 4 files changed, 89 insertions(+), 13 deletions(-) diff --git a/envoy/http/async_client.h b/envoy/http/async_client.h index 892b5c6a1869..49cf51a31197 100644 --- a/envoy/http/async_client.h +++ b/envoy/http/async_client.h @@ -9,6 +9,7 @@ #include "envoy/http/filter.h" #include "envoy/http/header_map.h" #include "envoy/http/message.h" +#include "envoy/stream_info/filter_state.h" #include "envoy/stream_info/stream_info.h" #include "envoy/tracing/tracer.h" @@ -267,6 +268,12 @@ class AsyncClient { return *this; } + // Set FilterState on async stream allowing upstream filters to access it. + StreamOptions& setFilterState(Envoy::StreamInfo::FilterStateSharedPtr fs) { + filter_state = fs; + return *this; + } + // Set buffer restriction and accounting for the stream. StreamOptions& setBufferAccount(const Buffer::BufferMemoryAccountSharedPtr& account) { account_ = account; @@ -331,6 +338,7 @@ class AsyncClient { ParentContext parent_context; envoy::config::core::v3::Metadata metadata; + Envoy::StreamInfo::FilterStateSharedPtr filter_state; // Buffer memory account for tracking bytes. Buffer::BufferMemoryAccountSharedPtr account_{nullptr}; @@ -378,6 +386,10 @@ class AsyncClient { StreamOptions::setMetadata(m); return *this; } + RequestOptions& setFilterState(Envoy::StreamInfo::FilterStateSharedPtr fs) { + StreamOptions::setFilterState(fs); + return *this; + } RequestOptions& setRetryPolicy(const envoy::config::route::v3::RetryPolicy& p) { StreamOptions::setRetryPolicy(p); return *this; diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index bf71e2febadd..9d283cd4118b 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -10,6 +10,7 @@ #include "source/common/http/null_route_impl.h" #include "source/common/http/utility.h" #include "source/common/protobuf/message_validator_impl.h" +#include "source/common/stream_info/filter_state_impl.h" #include "source/common/tracing/http_tracer_impl.h" #include "source/common/upstream/retry_factory.h" @@ -100,7 +101,10 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal router_(options.filter_config_ ? options.filter_config_ : parent.config_, parent.config_->async_stats_), stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr, - StreamInfo::FilterState::LifeSpan::FilterChain), + options.filter_state != nullptr + ? options.filter_state + : std::make_shared( + StreamInfo::FilterState::LifeSpan::FilterChain)), tracing_config_(Tracing::EgressConfig::get()), retry_policy_(createRetryPolicy(parent, options, parent_.factory_context_)), route_(std::make_shared( diff --git a/source/common/stream_info/stream_info_impl.h b/source/common/stream_info/stream_info_impl.h index 7e770d6d7145..833f57444a19 100644 --- a/source/common/stream_info/stream_info_impl.h +++ b/source/common/stream_info/stream_info_impl.h @@ -133,6 +133,18 @@ struct StreamInfoImpl : public StreamInfo { std::move(parent_filter_state), parent_life_span), life_span)) {} + StreamInfoImpl( + absl::optional protocol, TimeSource& time_source, + const Network::ConnectionInfoProviderSharedPtr& downstream_connection_info_provider, + FilterStateSharedPtr filter_state) + : time_source_(time_source), start_time_(time_source.systemTime()), + start_time_monotonic_(time_source.monotonicTime()), protocol_(protocol), + filter_state_(std::move(filter_state)), + downstream_connection_info_provider_(downstream_connection_info_provider != nullptr + ? downstream_connection_info_provider + : emptyDownstreamAddressProvider()), + trace_reason_(Tracing::Reason::NotTraceable) {} + SystemTime startTime() const override { return start_time_; } MonotonicTime startTimeMonotonic() const override { return start_time_monotonic_; } @@ -460,18 +472,6 @@ struct StreamInfoImpl : public StreamInfo { std::make_shared(nullptr, nullptr)); } - StreamInfoImpl( - absl::optional protocol, TimeSource& time_source, - const Network::ConnectionInfoProviderSharedPtr& downstream_connection_info_provider, - FilterStateSharedPtr filter_state) - : time_source_(time_source), start_time_(time_source.systemTime()), - start_time_monotonic_(time_source.monotonicTime()), protocol_(protocol), - filter_state_(std::move(filter_state)), - downstream_connection_info_provider_(downstream_connection_info_provider != nullptr - ? downstream_connection_info_provider - : emptyDownstreamAddressProvider()), - trace_reason_(Tracing::Reason::NotTraceable) {} - std::shared_ptr upstream_info_; uint64_t bytes_received_{}; uint64_t bytes_retransmitted_{}; diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 5f52b2980876..3aefd2475118 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -5,6 +5,7 @@ #include "envoy/config/core/v3/base.pb.h" #include "envoy/config/route/v3/route_components.pb.h" +#include "envoy/stream_info/filter_state.h" #include "source/common/buffer/buffer_impl.h" #include "source/common/http/async_client_impl.h" @@ -13,6 +14,7 @@ #include "source/common/http/utility.h" #include "source/common/router/context_impl.h" #include "source/common/router/upstream_codec_filter.h" +#include "source/common/stream_info/filter_state_impl.h" #include "test/common/http/common.h" #include "test/mocks/buffer/mocks.h" @@ -701,6 +703,64 @@ TEST_F(AsyncClientImplTest, WithMetadata) { response_decoder_->decodeData(data, true); } +class TestStateObject : public StreamInfo::FilterState::Object { +public: + TestStateObject(std::string value) : value_(value) {} + + const std::string& value() const { return value_; } + +private: + std::string value_; +}; + +TEST_F(AsyncClientImplTest, WithFilterState) { + message_->body().add("test-body"); + Buffer::Instance& data = message_->body(); + + EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _)) + .WillOnce(Invoke( + [&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks, + const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_, + stream_info_, {}); + response_decoder_ = &decoder; + return nullptr; + })); + + EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) + .WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) { + const StreamInfo::FilterState& filter_state = context->requestStreamInfo()->filterState(); + const TestStateObject* state = filter_state.getDataReadOnly("test-filter"); + EXPECT_NE(state, nullptr); + EXPECT_EQ(state->value(), "stored-test-state"); + return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_); + })); + + TestRequestHeaderMapImpl copy(message_->headers()); + copy.addCopy("x-envoy-internal", "true"); + copy.addCopy("x-forwarded-for", "127.0.0.1"); + copy.addCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(©), false)); + + AsyncClient::RequestOptions options; + auto state_object = std::make_shared("stored-test-state"); + auto filter_state = + std::make_shared(StreamInfo::FilterState::LifeSpan::FilterChain); + filter_state->setData("test-filter", state_object, StreamInfo::FilterState::StateType::Mutable); + options.setFilterState(filter_state); + + auto* request = client_.send(std::move(message_), callbacks_, options); + EXPECT_NE(request, nullptr); + + expectSuccess(request, 200); + + ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), false); + response_decoder_->decodeData(data, true); +} + TEST_F(AsyncClientImplTest, Retry) { ON_CALL(factory_context_.runtime_loader_.snapshot_, featureEnabled("upstream.use_retry", 100)) .WillByDefault(Return(true));