Skip to content

Commit

Permalink
async_client: add support for filter state (#33772)
Browse files Browse the repository at this point in the history
Add support for FilterState to AsyncClient

This allows upstream filters used in an AsyncClient request to access the FilterState.

Signed-off-by: Isabel Mccarten <imccarten@google.com>
  • Loading branch information
imccarten1 committed Apr 29, 2024
1 parent bfb5443 commit 5451efd
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 13 deletions.
12 changes: 12 additions & 0 deletions envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/http/filter.h"
#include "envoy/http/header_map.h"
#include "envoy/http/message.h"
#include "envoy/stream_info/filter_state.h"
#include "envoy/stream_info/stream_info.h"
#include "envoy/tracing/tracer.h"

Expand Down Expand Up @@ -267,6 +268,12 @@ class AsyncClient {
return *this;
}

// Set FilterState on async stream allowing upstream filters to access it.
StreamOptions& setFilterState(Envoy::StreamInfo::FilterStateSharedPtr fs) {
filter_state = fs;
return *this;
}

// Set buffer restriction and accounting for the stream.
StreamOptions& setBufferAccount(const Buffer::BufferMemoryAccountSharedPtr& account) {
account_ = account;
Expand Down Expand Up @@ -331,6 +338,7 @@ class AsyncClient {
ParentContext parent_context;

envoy::config::core::v3::Metadata metadata;
Envoy::StreamInfo::FilterStateSharedPtr filter_state;

// Buffer memory account for tracking bytes.
Buffer::BufferMemoryAccountSharedPtr account_{nullptr};
Expand Down Expand Up @@ -378,6 +386,10 @@ class AsyncClient {
StreamOptions::setMetadata(m);
return *this;
}
RequestOptions& setFilterState(Envoy::StreamInfo::FilterStateSharedPtr fs) {
StreamOptions::setFilterState(fs);
return *this;
}
RequestOptions& setRetryPolicy(const envoy::config::route::v3::RetryPolicy& p) {
StreamOptions::setRetryPolicy(p);
return *this;
Expand Down
6 changes: 5 additions & 1 deletion source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "source/common/http/null_route_impl.h"
#include "source/common/http/utility.h"
#include "source/common/protobuf/message_validator_impl.h"
#include "source/common/stream_info/filter_state_impl.h"
#include "source/common/tracing/http_tracer_impl.h"
#include "source/common/upstream/retry_factory.h"

Expand Down Expand Up @@ -100,7 +101,10 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal
router_(options.filter_config_ ? options.filter_config_ : parent.config_,
parent.config_->async_stats_),
stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr,
StreamInfo::FilterState::LifeSpan::FilterChain),
options.filter_state != nullptr
? options.filter_state
: std::make_shared<StreamInfo::FilterStateImpl>(
StreamInfo::FilterState::LifeSpan::FilterChain)),
tracing_config_(Tracing::EgressConfig::get()),
retry_policy_(createRetryPolicy(parent, options, parent_.factory_context_)),
route_(std::make_shared<NullRouteImpl>(
Expand Down
24 changes: 12 additions & 12 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ struct StreamInfoImpl : public StreamInfo {
std::move(parent_filter_state), parent_life_span),
life_span)) {}

StreamInfoImpl(
absl::optional<Http::Protocol> protocol, TimeSource& time_source,
const Network::ConnectionInfoProviderSharedPtr& downstream_connection_info_provider,
FilterStateSharedPtr filter_state)
: time_source_(time_source), start_time_(time_source.systemTime()),
start_time_monotonic_(time_source.monotonicTime()), protocol_(protocol),
filter_state_(std::move(filter_state)),
downstream_connection_info_provider_(downstream_connection_info_provider != nullptr
? downstream_connection_info_provider
: emptyDownstreamAddressProvider()),
trace_reason_(Tracing::Reason::NotTraceable) {}

SystemTime startTime() const override { return start_time_; }

MonotonicTime startTimeMonotonic() const override { return start_time_monotonic_; }
Expand Down Expand Up @@ -460,18 +472,6 @@ struct StreamInfoImpl : public StreamInfo {
std::make_shared<Network::ConnectionInfoSetterImpl>(nullptr, nullptr));
}

StreamInfoImpl(
absl::optional<Http::Protocol> protocol, TimeSource& time_source,
const Network::ConnectionInfoProviderSharedPtr& downstream_connection_info_provider,
FilterStateSharedPtr filter_state)
: time_source_(time_source), start_time_(time_source.systemTime()),
start_time_monotonic_(time_source.monotonicTime()), protocol_(protocol),
filter_state_(std::move(filter_state)),
downstream_connection_info_provider_(downstream_connection_info_provider != nullptr
? downstream_connection_info_provider
: emptyDownstreamAddressProvider()),
trace_reason_(Tracing::Reason::NotTraceable) {}

std::shared_ptr<UpstreamInfo> upstream_info_;
uint64_t bytes_received_{};
uint64_t bytes_retransmitted_{};
Expand Down
60 changes: 60 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/route/v3/route_components.pb.h"
#include "envoy/stream_info/filter_state.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/http/async_client_impl.h"
Expand All @@ -13,6 +14,7 @@
#include "source/common/http/utility.h"
#include "source/common/router/context_impl.h"
#include "source/common/router/upstream_codec_filter.h"
#include "source/common/stream_info/filter_state_impl.h"

#include "test/common/http/common.h"
#include "test/mocks/buffer/mocks.h"
Expand Down Expand Up @@ -701,6 +703,64 @@ TEST_F(AsyncClientImplTest, WithMetadata) {
response_decoder_->decodeData(data, true);
}

class TestStateObject : public StreamInfo::FilterState::Object {
public:
TestStateObject(std::string value) : value_(value) {}

const std::string& value() const { return value_; }

private:
std::string value_;
};

TEST_F(AsyncClientImplTest, WithFilterState) {
message_->body().add("test-body");
Buffer::Instance& data = message_->body();

EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
.WillOnce(Invoke(
[&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
stream_info_, {});
response_decoder_ = &decoder;
return nullptr;
}));

EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _))
.WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional<Http::Protocol>,
Upstream::LoadBalancerContext* context) {
const StreamInfo::FilterState& filter_state = context->requestStreamInfo()->filterState();
const TestStateObject* state = filter_state.getDataReadOnly<TestStateObject>("test-filter");
EXPECT_NE(state, nullptr);
EXPECT_EQ(state->value(), "stored-test-state");
return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_);
}));

TestRequestHeaderMapImpl copy(message_->headers());
copy.addCopy("x-envoy-internal", "true");
copy.addCopy("x-forwarded-for", "127.0.0.1");
copy.addCopy(":scheme", "http");

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));

AsyncClient::RequestOptions options;
auto state_object = std::make_shared<TestStateObject>("stored-test-state");
auto filter_state =
std::make_shared<StreamInfo::FilterStateImpl>(StreamInfo::FilterState::LifeSpan::FilterChain);
filter_state->setData("test-filter", state_object, StreamInfo::FilterState::StateType::Mutable);
options.setFilterState(filter_state);

auto* request = client_.send(std::move(message_), callbacks_, options);
EXPECT_NE(request, nullptr);

expectSuccess(request, 200);

ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers), false);
response_decoder_->decodeData(data, true);
}

TEST_F(AsyncClientImplTest, Retry) {
ON_CALL(factory_context_.runtime_loader_.snapshot_, featureEnabled("upstream.use_retry", 100))
.WillByDefault(Return(true));
Expand Down

0 comments on commit 5451efd

Please sign in to comment.