Skip to content

Commit

Permalink
codec: support half-close in HTTP (#22802)
Browse files Browse the repository at this point in the history
Signed-off-by: Kuat Yessenov kuat@google.com

Commit Message: Hold ActiveRequest destruction until a full close instead of doing immediately on response decode.
Additional Description: This is needed to support half closed tcp_proxy CONNECT streams. Closing immediately on response triggers TCP connection close on the draining upstream which breaks tcp_proxy continuing to send data. Fixes #22748. See H2 state transition diagram here: https://httpwg.org/specs/rfc9113.html#StreamStates.

HTTP/1.1 is particularly difficult to change because message completion is triggered by the parser. This PR keeps the behavior unchanged for HTTP/1.1 codec.

QUIC requires adding STOP_SENDING support on client side, otherwise, the underlying stream is destroyed before the callbacks kick in. The test in protocol integration is updated to ensure that test processes client STOP_SENDING before closing the connection.

Risk Level: medium / high
Testing: manually verified the crash is fixed, reproducing test case added
Docs Changes: none
Release Notes: yes
Runtime feature: envoy.reloadable_features.http_response_half_close
  • Loading branch information
kyessenov committed Sep 1, 2022
1 parent 2397773 commit 3e2f150
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 27 deletions.
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ behavior_changes:
- area: tls
change: |
Change TLS and QUIC transport sockets to support asynchronous cert validation extension. This behavior change can be reverted by setting runtime guard ``envoy.reloadable_features.tls_async_cert_validation`` to false.
- area: http
change: |
For HTTP/2 and HTTP/3 codecs, all clients now continue sending data upstream after receiving an end of the server
stream. This supports the server half-close semantics for TCP tunneling with CONNECT as well as bi-directional
streaming calls. This behavior change can be reverted by setting the
``envoy.reloadable_features.http_response_half_close`` runtime flag to false.
minor_behavior_changes:
# *Changes that may cause incompatibilities for some users, but should not for most*
Expand Down
1 change: 1 addition & 0 deletions source/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ envoy_cc_library(
"//source/common/http/http1:codec_lib",
"//source/common/http/http2:codec_lib",
"//source/common/network:filter_lib",
"//source/common/runtime:runtime_features_lib",
] + envoy_select_enable_http3([
"//source/common/quic:codec_lib",
]),
Expand Down
26 changes: 21 additions & 5 deletions source/common/http/codec_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,14 @@ void CodecClient::deleteRequest(ActiveRequest& request) {

RequestEncoder& CodecClient::newStream(ResponseDecoder& response_decoder) {
ActiveRequestPtr request(new ActiveRequest(*this, response_decoder));
request->encoder_ = &codec_->newStream(*request);
request->encoder_->getStream().addCallbacks(*request);
request->setEncoder(codec_->newStream(*request));
LinkedList::moveIntoList(std::move(request), active_requests_);

auto upstream_info = connection_->streamInfo().upstreamInfo();
upstream_info->setUpstreamNumStreams(upstream_info->upstreamNumStreams() + 1);

disableIdleTimer();
return *active_requests_.front()->encoder_;
return *active_requests_.front();
}

void CodecClient::onEvent(Network::ConnectionEvent event) {
Expand Down Expand Up @@ -118,7 +117,7 @@ void CodecClient::onEvent(Network::ConnectionEvent event) {
}
while (!active_requests_.empty()) {
// Fake resetting all active streams so that reset() callbacks get invoked.
active_requests_.front()->encoder_->getStream().resetStream(reason);
active_requests_.front()->getStream().resetStream(reason);
}
}
}
Expand All @@ -128,12 +127,29 @@ void CodecClient::responsePreDecodeComplete(ActiveRequest& request) {
if (codec_client_callbacks_) {
codec_client_callbacks_->onStreamPreDecodeComplete();
}
request.decode_complete_ = true;
if (request.encode_complete_ || !request.wait_encode_complete_) {
completeRequest(request);
} else {
ENVOY_CONN_LOG(debug, "waiting for encode to complete", *connection_);
}
}

void CodecClient::requestEncodeComplete(ActiveRequest& request) {
ENVOY_CONN_LOG(debug, "encode complete", *connection_);
request.encode_complete_ = true;
if (request.decode_complete_) {
completeRequest(request);
}
}

void CodecClient::completeRequest(ActiveRequest& request) {
deleteRequest(request);

// HTTP/2 can send us a reset after a complete response if the request was not complete. Users
// of CodecClient will deal with the premature response case and we should not handle any
// further reset notification.
request.encoder_->getStream().removeCallbacks(request);
request.removeEncoderCallbacks();
}

void CodecClient::onReset(ActiveRequest& request, StreamResetReason reason) {
Expand Down
35 changes: 32 additions & 3 deletions source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "source/common/common/logger.h"
#include "source/common/http/codec_wrappers.h"
#include "source/common/network/filter_impl.h"
#include "source/common/runtime/runtime_features.h"

namespace Envoy {
namespace Http {
Expand Down Expand Up @@ -219,9 +220,23 @@ class CodecClient : protected Logger::Loggable<Logger::Id::client>,
struct ActiveRequest : LinkedObject<ActiveRequest>,
public Event::DeferredDeletable,
public StreamCallbacks,
public ResponseDecoderWrapper {
public ResponseDecoderWrapper,
public RequestEncoderWrapper {
ActiveRequest(CodecClient& parent, ResponseDecoder& inner)
: ResponseDecoderWrapper(inner), parent_(parent) {}
: ResponseDecoderWrapper(inner), RequestEncoderWrapper(nullptr), parent_(parent) {
switch (parent.protocol()) {
case Protocol::Http10:
case Protocol::Http11:
// HTTP/1.1 codec does not support half-close on the response completion.
wait_encode_complete_ = false;
break;
case Protocol::Http2:
case Protocol::Http3:
wait_encode_complete_ =
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http_response_half_close");
break;
}
}

// StreamCallbacks
void onResetStream(StreamResetReason reason, absl::string_view) override {
Expand All @@ -234,8 +249,20 @@ class CodecClient : protected Logger::Loggable<Logger::Id::client>,
void onPreDecodeComplete() override { parent_.responsePreDecodeComplete(*this); }
void onDecodeComplete() override {}

RequestEncoder* encoder_{};
// RequestEncoderWrapper
void onEncodeComplete() override { parent_.requestEncodeComplete(*this); }

void setEncoder(RequestEncoder& encoder) {
inner_encoder_ = &encoder;
inner_encoder_->getStream().addCallbacks(*this);
}

void removeEncoderCallbacks() { inner_encoder_->getStream().removeCallbacks(*this); }

CodecClient& parent_;
bool wait_encode_complete_{true};
bool encode_complete_{false};
bool decode_complete_{false};
};

using ActiveRequestPtr = std::unique_ptr<ActiveRequest>;
Expand All @@ -245,6 +272,8 @@ class CodecClient : protected Logger::Loggable<Logger::Id::client>,
* wrapped decoder.
*/
void responsePreDecodeComplete(ActiveRequest& request);
void requestEncodeComplete(ActiveRequest& request);
void completeRequest(ActiveRequest& request);

void deleteRequest(ActiveRequest& request);
void onReset(ActiveRequest& request, StreamResetReason reason);
Expand Down
29 changes: 20 additions & 9 deletions source/common/http/codec_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,47 +73,58 @@ class RequestEncoderWrapper : public RequestEncoder {
public:
// RequestEncoder
Status encodeHeaders(const RequestHeaderMap& headers, bool end_stream) override {
RETURN_IF_ERROR(inner_.encodeHeaders(headers, end_stream));
ASSERT(inner_encoder_);
RETURN_IF_ERROR(inner_encoder_->encodeHeaders(headers, end_stream));
if (end_stream) {
onEncodeComplete();
}
return okStatus();
}

void encodeData(Buffer::Instance& data, bool end_stream) override {
inner_.encodeData(data, end_stream);
ASSERT(inner_encoder_);
inner_encoder_->encodeData(data, end_stream);
if (end_stream) {
onEncodeComplete();
}
}

void encodeTrailers(const RequestTrailerMap& trailers) override {
inner_.encodeTrailers(trailers);
ASSERT(inner_encoder_);
inner_encoder_->encodeTrailers(trailers);
onEncodeComplete();
}

void enableTcpTunneling() override { inner_.enableTcpTunneling(); }
void enableTcpTunneling() override {
ASSERT(inner_encoder_);
inner_encoder_->enableTcpTunneling();
}

void encodeMetadata(const MetadataMapVector& metadata_map_vector) override {
inner_.encodeMetadata(metadata_map_vector);
ASSERT(inner_encoder_);
inner_encoder_->encodeMetadata(metadata_map_vector);
}

Stream& getStream() override { return inner_.getStream(); }
Stream& getStream() override {
ASSERT(inner_encoder_);
return inner_encoder_->getStream();
}

Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override {
return inner_.http1StreamEncoderOptions();
ASSERT(inner_encoder_);
return inner_encoder_->http1StreamEncoderOptions();
}

protected:
RequestEncoderWrapper(RequestEncoder& inner) : inner_(inner) {}
RequestEncoderWrapper(RequestEncoder* inner) : inner_encoder_(inner) {}

/**
* Consumers of the wrapper generally want to know when an encode is complete. This is called at
* that time and is implemented by derived classes.
*/
virtual void onEncodeComplete() PURE;

RequestEncoder& inner_;
RequestEncoder* inner_encoder_;
};

} // namespace Http
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ namespace Http {
namespace Http1 {

ActiveClient::StreamWrapper::StreamWrapper(ResponseDecoder& response_decoder, ActiveClient& parent)
: RequestEncoderWrapper(parent.codec_client_->newStream(*this)),
: RequestEncoderWrapper(&parent.codec_client_->newStream(*this)),
ResponseDecoderWrapper(response_decoder), parent_(parent) {
RequestEncoderWrapper::inner_.getStream().addCallbacks(*this);
RequestEncoderWrapper::inner_encoder_->getStream().addCallbacks(*this);
}

ActiveClient::StreamWrapper::~StreamWrapper() {
Expand Down
24 changes: 23 additions & 1 deletion source/common/quic/envoy_quic_client_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,23 @@ void EnvoyQuicClientStream::OnStreamFrame(const quic::QuicStreamFrame& frame) {
quic::QuicSpdyClientStream::OnStreamFrame(frame);
}

bool EnvoyQuicClientStream::OnStopSending(quic::QuicResetStreamError error) {
// Only called in IETF Quic to close write side.
ENVOY_STREAM_LOG(debug, "received STOP_SENDING with reset code={}", *this, error.internal_code());
bool end_stream_encoded = local_end_stream_;
// This call will close write.
if (!quic::QuicSpdyClientStream::OnStopSending(error)) {
return false;
}
if (read_side_closed() && !end_stream_encoded) {
stats_.rx_reset_.inc();
// If both directions are closed but end stream hasn't been encoded yet, notify reset callbacks.
// Treat this as a remote reset, since the stream will be closed in both directions.
runResetCallbacks(quicRstErrorToEnvoyRemoteResetReason(error.internal_code()));
}
return true;
}

void EnvoyQuicClientStream::OnBodyAvailable() {
ASSERT(FinishedReadingHeaders());
if (read_side_closed()) {
Expand Down Expand Up @@ -315,8 +332,13 @@ void EnvoyQuicClientStream::maybeDecodeTrailers() {
void EnvoyQuicClientStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) {
ENVOY_STREAM_LOG(debug, "received reset code={}", *this, frame.error_code);
stats_.rx_reset_.inc();
bool end_stream_decoded_and_encoded = read_side_closed() && local_end_stream_;
// This closes read side in IETF Quic, but doesn't close write side.
quic::QuicSpdyClientStream::OnStreamReset(frame);
runResetCallbacks(quicRstErrorToEnvoyRemoteResetReason(frame.error_code));
ASSERT(read_side_closed());
if (write_side_closed() && !end_stream_decoded_and_encoded) {
runResetCallbacks(quicRstErrorToEnvoyRemoteResetReason(frame.error_code));
}
}

void EnvoyQuicClientStream::ResetWithError(quic::QuicResetStreamError error) {
Expand Down
1 change: 1 addition & 0 deletions source/common/quic/envoy_quic_client_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class EnvoyQuicClientStream : public quic::QuicSpdyClientStream,

// quic::QuicStream
void OnStreamFrame(const quic::QuicStreamFrame& frame) override;
bool OnStopSending(quic::QuicResetStreamError error) override;
// quic::QuicSpdyStream
void OnBodyAvailable() override;
void OnStreamReset(const quic::QuicRstStreamFrame& frame) override;
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ RUNTIME_GUARD(envoy_reloadable_features_http2_new_codec_wrapper);
RUNTIME_GUARD(envoy_reloadable_features_http3_sends_early_data);
RUNTIME_GUARD(envoy_reloadable_features_http_100_continue_case_insensitive);
RUNTIME_GUARD(envoy_reloadable_features_http_reject_path_with_fragment);
RUNTIME_GUARD(envoy_reloadable_features_http_response_half_close);
RUNTIME_GUARD(envoy_reloadable_features_http_skip_adding_content_length_to_upgrade);
RUNTIME_GUARD(envoy_reloadable_features_http_strip_fragment_from_path_unsafe_if_disabled);
RUNTIME_GUARD(envoy_reloadable_features_local_ratelimit_match_all_descriptors);
Expand Down
1 change: 1 addition & 0 deletions test/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ envoy_cc_test(
"//test/mocks/upstream:cluster_info_mocks",
"//test/test_common:environment_lib",
"//test/test_common:network_utility_lib",
"//test/test_common:status_utility_lib",
"//test/test_common:utility_lib",
],
)
Expand Down
18 changes: 15 additions & 3 deletions test/common/http/codec_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "test/test_common/environment.h"
#include "test/test_common/network_utility.h"
#include "test/test_common/printers.h"
#include "test/test_common/status_utility.h"
#include "test/test_common/utility.h"

#include "gmock/gmock.h"
Expand Down Expand Up @@ -54,6 +55,7 @@ class CodecClientTest : public Event::TestUsingSimulatedTime, public testing::Te
Invoke([this](Network::ReadFilterSharedPtr filter) -> void { filter_ = filter; }));

codec_ = new Http::MockClientConnection();
EXPECT_CALL(*codec_, protocol()).WillRepeatedly(Return(Protocol::Http11));

Network::ClientConnectionPtr connection{connection_};
EXPECT_CALL(dispatcher_, createTimer_(_));
Expand Down Expand Up @@ -105,8 +107,11 @@ TEST_F(CodecClientTest, BasicHeaderOnlyResponse) {
}));

Http::MockResponseDecoder outer_decoder;
client_->newStream(outer_decoder);
Http::RequestEncoder& request_encoder = client_->newStream(outer_decoder);

TestRequestHeaderMapImpl request_headers{
{":authority", "host"}, {":path", "/"}, {":method", "GET"}};
EXPECT_OK(request_encoder.encodeHeaders(request_headers, true));
ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}};
EXPECT_CALL(outer_decoder, decodeHeaders_(Pointee(Ref(*response_headers)), true));
inner_decoder->decodeHeaders(std::move(response_headers), true);
Expand All @@ -123,8 +128,11 @@ TEST_F(CodecClientTest, BasicResponseWithBody) {
}));

Http::MockResponseDecoder outer_decoder;
client_->newStream(outer_decoder);
Http::RequestEncoder& request_encoder = client_->newStream(outer_decoder);

TestRequestHeaderMapImpl request_headers{
{":authority", "host"}, {":path", "/"}, {":method", "GET"}};
EXPECT_OK(request_encoder.encodeHeaders(request_headers, true));
ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}};
EXPECT_CALL(outer_decoder, decodeHeaders_(Pointee(Ref(*response_headers)), false));
inner_decoder->decodeHeaders(std::move(response_headers), false);
Expand Down Expand Up @@ -168,11 +176,14 @@ TEST_F(CodecClientTest, IdleTimerWithNoActiveRequests) {
}));

Http::MockResponseDecoder outer_decoder;
Http::StreamEncoder& request_encoder = client_->newStream(outer_decoder);
Http::RequestEncoder& request_encoder = client_->newStream(outer_decoder);
Http::MockStreamCallbacks callbacks;
request_encoder.getStream().addCallbacks(callbacks);
connection_cb_->onEvent(Network::ConnectionEvent::Connected);

TestRequestHeaderMapImpl request_headers{
{":authority", "host"}, {":path", "/"}, {":method", "GET"}};
EXPECT_OK(request_encoder.encodeHeaders(request_headers, true));
ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}};
EXPECT_CALL(outer_decoder, decodeHeaders_(Pointee(Ref(*response_headers)), false));
inner_decoder->decodeHeaders(std::move(response_headers), false);
Expand Down Expand Up @@ -299,6 +310,7 @@ class CodecNetworkTest : public Event::TestUsingSimulatedTime,
client_connection_->addConnectionCallbacks(client_callbacks_);

codec_ = new Http::MockClientConnection();
EXPECT_CALL(*codec_, protocol()).WillRepeatedly(Return(Protocol::Http11));
client_ = std::make_unique<CodecClientForTest>(CodecType::HTTP1, std::move(client_connection),
codec_, nullptr, host_, *dispatcher_);

Expand Down
2 changes: 1 addition & 1 deletion test/common/http/codec_wrappers_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Http {

class MockRequestEncoderWrapper : public RequestEncoderWrapper {
public:
MockRequestEncoderWrapper() : RequestEncoderWrapper(inner_encoder_) {}
MockRequestEncoderWrapper() : RequestEncoderWrapper(&inner_encoder_) {}
void onEncodeComplete() override { encode_complete_ = true; }

MockRequestEncoder& innerEncoder() { return inner_encoder_; }
Expand Down
11 changes: 10 additions & 1 deletion test/common/quic/envoy_quic_client_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,22 @@ TEST_F(EnvoyQuicClientStreamTest, ResetStream) {
EXPECT_TRUE(quic_stream_->rst_sent());
}

TEST_F(EnvoyQuicClientStreamTest, ReceiveResetStream) {
TEST_F(EnvoyQuicClientStreamTest, ReceiveResetStreamWriteClosed) {
auto result = quic_stream_->encodeHeaders(request_headers_, true);
EXPECT_TRUE(result.ok());
EXPECT_CALL(stream_callbacks_, onResetStream(Http::StreamResetReason::RemoteReset, _));
quic_stream_->OnStreamReset(quic::QuicRstStreamFrame(
quic::kInvalidControlFrameId, quic_stream_->id(), quic::QUIC_STREAM_NO_ERROR, 0));
EXPECT_TRUE(quic_stream_->rst_received());
}

TEST_F(EnvoyQuicClientStreamTest, ReceiveResetStreamWriteOpen) {
quic_stream_->OnStreamReset(quic::QuicRstStreamFrame(
quic::kInvalidControlFrameId, quic_stream_->id(), quic::QUIC_STREAM_NO_ERROR, 0));
EXPECT_TRUE(quic_stream_->rst_received());
EXPECT_CALL(stream_callbacks_, onResetStream(_, _));
}

TEST_F(EnvoyQuicClientStreamTest, CloseConnectionDuringDecodingHeader) {
const auto result = quic_stream_->encodeHeaders(request_headers_, false);
EXPECT_TRUE(result.ok());
Expand Down

0 comments on commit 3e2f150

Please sign in to comment.