Skip to content

Commit

Permalink
ext_proc: observability mode part 4: Implement the deferred closure t…
Browse files Browse the repository at this point in the history
…imeout. (#35084)


---------

Signed-off-by: tyxia <tyxia@google.com>
  • Loading branch information
tyxia committed Jul 9, 2024
1 parent 1e5e731 commit 10ac2b6
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ message ExternalProcessor {
RouteCacheAction route_cache_action = 18
[(udpa.annotations.field_migrate).oneof_promotion = "clear_route_cache_type"];

// [#not-implemented-hide:]
// Specifies the deferred closure timeout for gRPC stream that connects to external processor. Currently, the deferred stream closure
// is only used in :ref:`observability_mode <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.observability_mode>`.
// In observability mode, gRPC streams may be held open to the external processor longer than the lifetime of the regular client to
Expand Down
11 changes: 7 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,10 @@ FilterConfig::FilterConfig(
Server::Configuration::CommonFactoryContext& context)
: failure_mode_allow_(config.failure_mode_allow()),
observability_mode_(config.observability_mode()),
route_cache_action_(config.route_cache_action()), message_timeout_(message_timeout),
max_message_timeout_ms_(max_message_timeout_ms),
route_cache_action_(config.route_cache_action()),
deferred_close_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, deferred_close_timeout,
DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS)),
message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms),
stats_(generateStats(stats_prefix, config.stat_prefix(), scope)),
processing_mode_(config.processing_mode()),
mutation_checker_(config.mutation_rules(), context.regexEngine()),
Expand Down Expand Up @@ -356,7 +358,8 @@ Filter::StreamOpenState Filter::openStream() {

// TODO(tyxia) Switch to address of stream
stream_ = config_->threadLocalStreamManager().store(decoder_callbacks_->streamId(),
std::move(stream_object), config_->stats());
std::move(stream_object), config_->stats(),
config_->deferredCloseTimeout());
// For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not
// have a proper implementation of streamInfo.
if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) {
Expand Down Expand Up @@ -1378,7 +1381,7 @@ void DeferredDeletableStream::deferredClose(Envoy::Event::Dispatcher& dispatcher
uint64_t stream_id) {
derferred_close_timer =
dispatcher.createTimer([this, stream_id] { closeStreamOnTimer(stream_id); });
derferred_close_timer->enableTimer(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS));
derferred_close_timer->enableTimer(std::chrono::milliseconds(deferred_close_timeout));
}

std::string responseCaseToString(const ProcessingResponse::ResponseCase response_case) {
Expand Down
18 changes: 12 additions & 6 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,17 @@ class ImmediateMutationChecker {
};

class ThreadLocalStreamManager;
// TODO(tyxia) Make it configurable.
inline constexpr uint32_t DEFAULT_CLOSE_TIMEOUT_MS = 1000;
// Default value is 5000 milliseconds (5 seconds)
inline constexpr uint32_t DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS = 5000;

// Deferred deletable stream wrapper.
struct DeferredDeletableStream : public Logger::Loggable<Logger::Id::ext_proc> {
explicit DeferredDeletableStream(ExternalProcessorStreamPtr stream,
ThreadLocalStreamManager& stream_manager,
const ExtProcFilterStats& stat)
: stream_(std::move(stream)), parent(stream_manager), stats(stat) {}
const ExtProcFilterStats& stat,
const std::chrono::milliseconds& timeout)
: stream_(std::move(stream)), parent(stream_manager), stats(stat),
deferred_close_timeout(timeout) {}

void deferredClose(Envoy::Event::Dispatcher& dispatcher, uint64_t stream_id);

Expand All @@ -163,6 +165,7 @@ struct DeferredDeletableStream : public Logger::Loggable<Logger::Id::ext_proc> {
ThreadLocalStreamManager& parent;
ExtProcFilterStats stats;
Event::TimerPtr derferred_close_timer;
const std::chrono::milliseconds deferred_close_timeout;
};

using DeferredDeletableStreamPtr = std::unique_ptr<DeferredDeletableStream>;
Expand All @@ -172,9 +175,10 @@ class ThreadLocalStreamManager : public Envoy::ThreadLocal::ThreadLocalObject {
// Store the ExternalProcessorStreamPtr (as a wrapper object) in the map and return the raw
// pointer of ExternalProcessorStream.
ExternalProcessorStream* store(uint64_t stream_id, ExternalProcessorStreamPtr stream,
const ExtProcFilterStats& stat) {
const ExtProcFilterStats& stat,
const std::chrono::milliseconds& timeout) {
stream_manager_[stream_id] =
std::make_unique<DeferredDeletableStream>(std::move(stream), *this, stat);
std::make_unique<DeferredDeletableStream>(std::move(stream), *this, stat, timeout);
return stream_manager_[stream_id]->stream_.get();
}

Expand Down Expand Up @@ -207,6 +211,7 @@ class FilterConfig {

bool observabilityMode() const { return observability_mode_; }

const std::chrono::milliseconds& deferredCloseTimeout() const { return deferred_close_timeout_; }
const std::chrono::milliseconds& messageTimeout() const { return message_timeout_; }

uint32_t maxMessageTimeout() const { return max_message_timeout_ms_; }
Expand Down Expand Up @@ -270,6 +275,7 @@ class FilterConfig {
const bool observability_mode_;
envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor::RouteCacheAction
route_cache_action_;
const std::chrono::milliseconds deferred_close_timeout_;
const std::chrono::milliseconds message_timeout_;
const uint32_t max_message_timeout_ms_;

Expand Down
23 changes: 12 additions & 11 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ using envoy::service::ext_proc::v3::ImmediateResponse;
using envoy::service::ext_proc::v3::ProcessingRequest;
using envoy::service::ext_proc::v3::ProcessingResponse;
using envoy::service::ext_proc::v3::TrailersResponse;
using Extensions::HttpFilters::ExternalProcessing::DEFAULT_CLOSE_TIMEOUT_MS;
using Extensions::HttpFilters::ExternalProcessing::DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS;
using Extensions::HttpFilters::ExternalProcessing::HasNoHeader;
using Extensions::HttpFilters::ExternalProcessing::HeaderProtosEqual;
using Extensions::HttpFilters::ExternalProcessing::makeHeaderValue;
Expand Down Expand Up @@ -4157,7 +4157,7 @@ TEST_P(ExtProcIntegrationTest, ObservabilityModeWithHeader) {

verifyDownstreamResponse(*response, 200);

timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS));
timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS));
}

TEST_P(ExtProcIntegrationTest, ObservabilityModeWithBody) {
Expand Down Expand Up @@ -4200,7 +4200,7 @@ TEST_P(ExtProcIntegrationTest, ObservabilityModeWithBody) {

verifyDownstreamResponse(*response, 200);

timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS));
timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS));
}

TEST_P(ExtProcIntegrationTest, ObservabilityModeWithWrongBodyMode) {
Expand All @@ -4218,7 +4218,7 @@ TEST_P(ExtProcIntegrationTest, ObservabilityModeWithWrongBodyMode) {
handleUpstreamRequest();
verifyDownstreamResponse(*response, 200);

timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS));
timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS));
}

TEST_P(ExtProcIntegrationTest, ObservabilityModeWithTrailer) {
Expand Down Expand Up @@ -4250,11 +4250,13 @@ TEST_P(ExtProcIntegrationTest, ObservabilityModeWithTrailer) {
verifyDownstreamResponse(*response, 200);
EXPECT_THAT(*(response->trailers()), HasNoHeader("x-modified-trailers"));

timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS));
timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS));
}

TEST_P(ExtProcIntegrationTest, ObservabilityModeWithFullRequest) {
proto_config_.set_observability_mode(true);
uint32_t deferred_close_timeout_ms = 1000;
proto_config_.mutable_deferred_close_timeout()->set_seconds(deferred_close_timeout_ms / 1000);

proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED);
proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND);
Expand All @@ -4272,11 +4274,13 @@ TEST_P(ExtProcIntegrationTest, ObservabilityModeWithFullRequest) {
handleUpstreamRequest();
verifyDownstreamResponse(*response, 200);

timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS));
timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(deferred_close_timeout_ms));
}

TEST_P(ExtProcIntegrationTest, ObservabilityModeWithFullResponse) {
proto_config_.set_observability_mode(true);
uint32_t deferred_close_timeout_ms = 1000;
proto_config_.mutable_deferred_close_timeout()->set_seconds(deferred_close_timeout_ms / 1000);

proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP);
proto_config_.mutable_processing_mode()->set_response_body_mode(ProcessingMode::STREAMED);
Expand All @@ -4294,7 +4298,7 @@ TEST_P(ExtProcIntegrationTest, ObservabilityModeWithFullResponse) {

verifyDownstreamResponse(*response, 200);

timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS));
timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(deferred_close_timeout_ms));
}

TEST_P(ExtProcIntegrationTest, ObservabilityModeWithLogging) {
Expand Down Expand Up @@ -4386,9 +4390,6 @@ TEST_P(ExtProcIntegrationTest, GetAndSetHeadersUpstreamObservabilityMode) {
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));

// EXPECT_THAT(upstream_request_->headers(), HasNoHeader("x-remove-this"));
// EXPECT_THAT(upstream_request_->headers(), HasNoHeader("x-new-header", "new"));

upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(100, true);

Expand Down Expand Up @@ -4486,7 +4487,7 @@ TEST_P(ExtProcIntegrationTest, InvalidServerOnResponseInObservabilityMode) {
handleUpstreamRequest();
EXPECT_FALSE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, processor_connection_,
std::chrono::milliseconds(25000)));
timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS));
timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS));
}

TEST_P(ExtProcIntegrationTest, SidestreamPushbackDownstream) {
Expand Down
6 changes: 3 additions & 3 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4104,7 +4104,7 @@ TEST_F(HttpFilterTest, HeaderProcessingInObservabilityMode) {
// Deferred close timer is expected to be enabled by `DeferredDeletableStream`'s deferredClose(),
// which is triggered by filter onDestroy() function below.
EXPECT_CALL(*deferred_close_timer_,
enableTimer(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS), _));
enableTimer(std::chrono::milliseconds(DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS), _));
filter_->onDestroy();
deferred_close_timer_->invokeCallback();

Expand Down Expand Up @@ -4192,7 +4192,7 @@ TEST_F(HttpFilterTest, StreamingBodiesInObservabilityMode) {
// Deferred close timer is expected to be enabled by `DeferredDeletableStream`'s deferredClose(),
// which is triggered by filter onDestroy() function.
EXPECT_CALL(*deferred_close_timer_,
enableTimer(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS), _));
enableTimer(std::chrono::milliseconds(DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS), _));
filter_->onDestroy();
deferred_close_timer_->invokeCallback();

Expand Down Expand Up @@ -4240,7 +4240,7 @@ TEST_F(HttpFilterTest, StreamingAllDataInObservabilityMode) {
// Deferred close timer is expected to be enabled by `DeferredDeletableStream`'s deferredClose(),
// which is triggered by filter onDestroy() function.
EXPECT_CALL(*deferred_close_timer_,
enableTimer(std::chrono::milliseconds(DEFAULT_CLOSE_TIMEOUT_MS), _));
enableTimer(std::chrono::milliseconds(DEFAULT_DEFERRED_CLOSE_TIMEOUT_MS), _));
filter_->onDestroy();
deferred_close_timer_->invokeCallback();

Expand Down

0 comments on commit 10ac2b6

Please sign in to comment.