Skip to content

Commit

Permalink
HCM: on downstream timeout, set response 504 if the request is fully …
Browse files Browse the repository at this point in the history
…read (#20026)

Signed-off-by: Yuchen Dai <silentdai@gmail.com>
  • Loading branch information
lambdai committed Mar 15, 2022
1 parent e626e89 commit d7edf3b
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Minor Behavior Changes
* grpc: flip runtime guard ``envoy.reloadable_features.enable_grpc_async_client_cache`` to be default enabled. async grpc client created through getOrCreateRawAsyncClient will be cached by default.
* health_checker: exposing `initial_metadata` to GrpcHealthCheck in a way similar to `request_headers_to_add` of HttpHealthCheck.
* http: avoiding delay-close for HTTP/1.0 responses framed by connection: close as well as HTTP/1.1 if the request is fully read. This means for responses to such requests, the FIN will be sent immediately after the response. This behavior can be temporarily reverted by setting ``envoy.reloadable_features.skip_delay_close`` to false. If clients are are seen to be receiving sporadic partial responses and flipping this flag fixes it, please notify the project immediately.
* http: changed the http status code to 504 from 408 if the request timeouts after the request is completed. This behavior can be temporarily reverted by setting the runtime guard ``envoy.reloadable_features.override_request_timeout_by_gateway_timeout`` to false.
* http: lazy disable downstream connection reading in the HTTP/1 codec to reduce unnecessary system calls. This behavioral change can be temporarily reverted by setting runtime guard ``envoy.reloadable_features.http1_lazy_read_disable`` to false.
* http: now the max concurrent streams of http2 connection can not only be adjusted down according to the SETTINGS frame but also can be adjusted up, of course, it can not exceed the configured upper bounds. This fix is guarded by ``envoy.reloadable_features.http2_allow_capacity_increase_by_settings``.
* http: when writing custom filters, `injectEncodedDataToFilterChain` and `injectDecodedDataToFilterChain` now trigger sending of headers if they were not yet sent due to `StopIteration`. Previously, calling one of the inject functions in that state would trigger an assertion. See issue #19891 for more details.
Expand Down
30 changes: 26 additions & 4 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,26 @@ void ConnectionManagerImpl::ActiveStream::resetIdleTimer() {

void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
connection_manager_.stats_.named_.downstream_rq_idle_timeout_.inc();

// See below for more information on this early return block.
if (Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.override_request_timeout_by_gateway_timeout")) {
filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::StreamIdleTimeout);
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
"stream timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
return;
}

// There are 2 issues in the blow code. First, `responseHeaders().has_value()` is not the best
// predicate. `remoteDecodeComplete()` is preferable. Second, `sendLocalReply()` smartly ends the
// stream if any response was pushed to decoder and explicitly `endStream()` is not required.
//
// The above code is expected to resolve both. The original code here before it is fully verified.
//
// TODO(lambdai): delete the block below along with the removal of
// `override_request_timeout_by_gateway_timeout`.

// If headers have not been sent to the user, send a 408.
if (responseHeaders().has_value()) {
// TODO(htuch): We could send trailers here with an x-envoy timeout header
Expand All @@ -764,7 +784,6 @@ void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
connection_manager_.doEndStream(*this);
} else {
// TODO(mattklein) this may result in multiple flags. This Ok?
filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::StreamIdleTimeout);
sendLocalReply(Http::Code::RequestTimeout, "stream timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
Expand All @@ -773,20 +792,23 @@ void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {

void ConnectionManagerImpl::ActiveStream::onRequestTimeout() {
connection_manager_.stats_.named_.downstream_rq_timeout_.inc();
sendLocalReply(Http::Code::RequestTimeout, "request timeout", nullptr, absl::nullopt,
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
"request timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
}

void ConnectionManagerImpl::ActiveStream::onRequestHeaderTimeout() {
connection_manager_.stats_.named_.downstream_rq_header_timeout_.inc();
sendLocalReply(Http::Code::RequestTimeout, "request header timeout", nullptr, absl::nullopt,
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
"request header timeout", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
}

void ConnectionManagerImpl::ActiveStream::onStreamMaxDurationReached() {
ENVOY_STREAM_LOG(debug, "Stream max duration time reached", *this);
connection_manager_.stats_.named_.downstream_rq_max_duration_reached_.inc();
sendLocalReply(Http::Code::RequestTimeout, "downstream duration timeout", nullptr,
sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
"downstream duration timeout", nullptr,
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
}
Expand Down
1 change: 0 additions & 1 deletion source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,6 @@ void FilterManager::sendLocalReply(
}

stream_info_.setResponseCodeDetails(details);

StreamFilterBase::LocalReplyData data{code, details, false};
FilterManager::onLocalReply(data);
if (data.reset_imminent_) {
Expand Down
10 changes: 10 additions & 0 deletions source/common/http/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1134,5 +1134,15 @@ bool Utility::isSafeRequest(Http::RequestHeaderMap& request_headers) {
method == Http::Headers::get().MethodValues.Trace;
}

Http::Code Utility::maybeRequestTimeoutCode(bool remote_decode_complete) {
return remote_decode_complete &&
Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.override_request_timeout_by_gateway_timeout")
? Http::Code::GatewayTimeout
// Http::Code::RequestTimeout is more expensive because HTTP1 client cannot use the
// connection any more.
: Http::Code::RequestTimeout;
}

} // namespace Http
} // namespace Envoy
5 changes: 5 additions & 0 deletions source/common/http/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@ convertCoreToRouteRetryPolicy(const envoy::config::core::v3::RetryPolicy& retry_
*/
bool isSafeRequest(Http::RequestHeaderMap& request_headers);

/**
* Return the GatewayTimeout HTTP code to indicate the request is full received.
*/
Http::Code maybeRequestTimeoutCode(bool remote_decode_complete);

} // namespace Utility
} // namespace Http
} // namespace Envoy
11 changes: 9 additions & 2 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1045,10 +1045,17 @@ void Filter::onStreamMaxDurationReached(UpstreamRequest& upstream_request) {

callbacks_->streamInfo().setResponseFlag(
StreamInfo::ResponseFlag::UpstreamMaxStreamDurationReached);
// Grab the const ref to call the const method of StreamInfo.
const auto& stream_info = callbacks_->streamInfo();
const bool downstream_decode_complete =
stream_info.downstreamTiming().has_value() &&
stream_info.downstreamTiming().value().get().lastDownstreamRxByteReceived().has_value();

// sendLocalReply may instead reset the stream if downstream_response_started_ is true.
callbacks_->sendLocalReply(
Http::Code::RequestTimeout, "upstream max stream duration reached", modify_headers_,
absl::nullopt, StreamInfo::ResponseCodeDetails::get().UpstreamMaxStreamDurationReached);
Http::Utility::maybeRequestTimeoutCode(downstream_decode_complete),
"upstream max stream duration reached", modify_headers_, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().UpstreamMaxStreamDurationReached);
}

void Filter::updateOutlierDetection(Upstream::Outlier::Result result,
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ RUNTIME_GUARD(envoy_reloadable_features_internal_address);
RUNTIME_GUARD(envoy_reloadable_features_listener_reuse_port_default_enabled);
RUNTIME_GUARD(envoy_reloadable_features_listener_wildcard_match_ip_family);
RUNTIME_GUARD(envoy_reloadable_features_new_tcp_connection_pool);
RUNTIME_GUARD(envoy_reloadable_features_override_request_timeout_by_gateway_timeout);
RUNTIME_GUARD(envoy_reloadable_features_postpone_h3_client_connect_to_next_loop);
RUNTIME_GUARD(envoy_reloadable_features_proxy_102_103);
RUNTIME_GUARD(envoy_reloadable_features_sanitize_http_header_referer);
Expand Down
59 changes: 59 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3379,6 +3379,65 @@ TEST_F(HttpConnectionManagerImplTest, MaxStreamDurationCallbackResetStream) {
EXPECT_EQ(1U, stats_.named_.downstream_rq_rx_reset_.value());
}

TEST_F(HttpConnectionManagerImplTest, MaxStreamDurationFiredReturn408IfRequestWasNotComplete) {
max_stream_duration_ = std::chrono::milliseconds(10);
setup(false, "");
Event::MockTimer* duration_timer = setUpTimer();

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance& data) -> Http::Status {
EXPECT_CALL(*duration_timer, enableTimer(max_stream_duration_.value(), _)).Times(2);
decoder_ = &conn_manager_->newStream(response_encoder_);

RequestHeaderMapPtr headers{new TestRequestHeaderMapImpl{
{":authority", "localhost:8080"}, {":path", "/"}, {":method", "GET"}}};
// The codec will dispatch data after the header.
decoder_->decodeHeaders(std::move(headers), false);
data.drain(4);
return Http::okStatus();
}));
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

EXPECT_CALL(*duration_timer, disableTimer());
// Response code 408 after downstream max stream timeout because the request is not fully read by
// the decoder.
EXPECT_CALL(response_encoder_, encodeHeaders(_, false))
.WillOnce(Invoke([](const ResponseHeaderMap& headers, bool) -> void {
EXPECT_EQ("408", headers.getStatusValue());
}));
EXPECT_CALL(response_encoder_, encodeData(_, true));
duration_timer->invokeCallback();
}

TEST_F(HttpConnectionManagerImplTest, MaxStreamDurationFiredReturn504IfRequestWasFullyRead) {
max_stream_duration_ = std::chrono::milliseconds(10);
setup(false, "");
Event::MockTimer* duration_timer = setUpTimer();

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance& data) -> Http::Status {
EXPECT_CALL(*duration_timer, enableTimer(max_stream_duration_.value(), _)).Times(2);
decoder_ = &conn_manager_->newStream(response_encoder_);

RequestHeaderMapPtr headers{new TestRequestHeaderMapImpl{
{":authority", "localhost:8080"}, {":path", "/"}, {":method", "GET"}}};
// This is a header only request.
decoder_->decodeHeaders(std::move(headers), true);
data.drain(4);
return Http::okStatus();
}));
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

EXPECT_CALL(*duration_timer, disableTimer());
// 504 direct response after downstream max stream timeout because the request is fully read.
EXPECT_CALL(response_encoder_, encodeHeaders(_, false))
.WillOnce(Invoke([](const ResponseHeaderMap& headers, bool) -> void {
EXPECT_EQ("504", headers.getStatusValue());
}));
EXPECT_CALL(response_encoder_, encodeData(_, true));
duration_timer->invokeCallback();
}

TEST_F(HttpConnectionManagerImplTest, Http10Rejected) {
setup(false, "");
EXPECT_CALL(*codec_, protocol()).Times(AnyNumber()).WillRepeatedly(Return(Protocol::Http10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ std::string jsonStrToPbStrucStr(std::string json) {
}

TEST_P(GrpcJsonTranscoderIntegrationTest, DeepStruct) {
// Lower the timeout for the 408 response.
// Lower the timeout for a incomplete response.
config_helper_.addConfigModifier(
[&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
hcm) -> void {
Expand All @@ -842,7 +842,7 @@ TEST_P(GrpcJsonTranscoderIntegrationTest, DeepStruct) {
Http::TestRequestHeaderMapImpl{
{":method", "POST"}, {":path", "/echoStruct"}, {":authority", "host"}},
createDeepJson(100, true), {}, {}, Status(),
Http::TestResponseHeaderMapImpl{{":status", "408"}, {"content-type", "text/plain"}}, "");
Http::TestResponseHeaderMapImpl{{":status", "504"}, {"content-type", "text/plain"}}, "");

// The invalid deep struct is detected.
testTranscoding<bookstore::EchoStructReqResp, bookstore::EchoStructReqResp>(
Expand Down
5 changes: 3 additions & 2 deletions test/integration/http_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,12 @@ void IntegrationCodecClient::sendMetadata(Http::RequestEncoder& encoder,
}

std::pair<Http::RequestEncoder&, IntegrationStreamDecoderPtr>
IntegrationCodecClient::startRequest(const Http::RequestHeaderMap& headers) {
IntegrationCodecClient::startRequest(const Http::RequestHeaderMap& headers,
bool header_only_request) {
auto response = std::make_unique<IntegrationStreamDecoder>(dispatcher_);
Http::RequestEncoder& encoder = newStream(*response);
encoder.getStream().addCallbacks(*response);
encoder.encodeHeaders(headers, false).IgnoreError();
encoder.encodeHeaders(headers, /*end_stream=*/header_only_request).IgnoreError();
flushWrite();
return {encoder, std::move(response)};
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/http_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class IntegrationCodecClient : public Http::CodecClientProd {
// Intentionally makes a copy of metadata_map.
void sendMetadata(Http::RequestEncoder& encoder, Http::MetadataMap metadata_map);
std::pair<Http::RequestEncoder&, IntegrationStreamDecoderPtr>
startRequest(const Http::RequestHeaderMap& headers);
startRequest(const Http::RequestHeaderMap& headers, bool header_only_request = false);
ABSL_MUST_USE_RESULT AssertionResult
waitForDisconnect(std::chrono::milliseconds time_to_wait = TestUtility::DefaultTimeout);
Network::ClientConnection* connection() const { return connection_.get(); }
Expand Down
2 changes: 1 addition & 1 deletion test/integration/overload_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ TEST_P(OverloadScaledTimerIntegrationTest, CloseIdleHttpStream) {
test_server_->waitForCounterGe("http.config_test.downstream_rq_idle_timeout", 1);
ASSERT_TRUE(response->waitForEndStream());

EXPECT_EQ(response->headers().getStatusValue(), "408");
EXPECT_EQ(response->headers().getStatusValue(), "504");
EXPECT_THAT(response->body(), HasSubstr("stream timeout"));
}

Expand Down
60 changes: 60 additions & 0 deletions test/integration/protocol_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2564,6 +2564,66 @@ TEST_P(DownstreamProtocolIntegrationTest, BasicMaxStreamTimeout) {

test_server_->waitForCounterGe("http.config_test.downstream_rq_max_duration_reached", 1);
ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());
EXPECT_EQ("408", response->headers().getStatusValue());
}

// Test that when request timeout and the request is completed, the gateway timeout (504) is
// returned as response code instead of request timeout (408).
TEST_P(ProtocolIntegrationTest, MaxStreamTimeoutWhenRequestIsNotComplete) {
config_helper_.setDownstreamMaxStreamDuration(std::chrono::milliseconds(500));

autonomous_upstream_ = false;

initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));

// The request is not header only. Envoy is expecting more data to end the request.
auto encoder_decoder =
codec_client_->startRequest(default_request_headers_, /*header_only_request=*/true);
request_encoder_ = &encoder_decoder.first;
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForHeadersComplete());

test_server_->waitForCounterGe("http.config_test.downstream_rq_max_duration_reached", 1);
ASSERT_TRUE(response->waitForEndStream());

EXPECT_TRUE(upstream_request_->complete());
ASSERT_TRUE(response->complete());
EXPECT_EQ("504", response->headers().getStatusValue());
}

// Test case above except disabling runtime guard "override_request_timeout_by_gateway_timeout".
// Verify the old behavior is reverted by disabling the runtime guard.
TEST_P(ProtocolIntegrationTest, MaxStreamTimeoutWhenRequestIsNotCompleteRuntimeDisabled) {
config_helper_.setDownstreamMaxStreamDuration(std::chrono::milliseconds(500));

config_helper_.addRuntimeOverride(
"envoy.reloadable_features.override_request_timeout_by_gateway_timeout", "false");
autonomous_upstream_ = false;

initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));

// The request is not header only. Envoy is expecting more data to end the request.
auto encoder_decoder =
codec_client_->startRequest(default_request_headers_, /*header_only_request=*/true);
request_encoder_ = &encoder_decoder.first;
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForHeadersComplete());

test_server_->waitForCounterGe("http.config_test.downstream_rq_max_duration_reached", 1);
ASSERT_TRUE(response->waitForEndStream());

EXPECT_TRUE(upstream_request_->complete());
ASSERT_TRUE(response->complete());
EXPECT_EQ("408", response->headers().getStatusValue());
}

TEST_P(DownstreamProtocolIntegrationTest, MaxRequestsPerConnectionReached) {
Expand Down

0 comments on commit d7edf3b

Please sign in to comment.