From 08f2ec09165c5e542579ef484ef2c9f3bb8d783a Mon Sep 17 00:00:00 2001 From: code Date: Mon, 29 Apr 2024 23:26:59 +0800 Subject: [PATCH] generic proxy: expose the codec error to upper layer (#33788) Signed-off-by: wbpcode --- .../source/interface/codec_callbacks.h | 12 +- .../filters/network/source/interface/stream.h | 16 +- .../filters/network/source/proxy.cc | 9 +- .../filters/network/source/proxy.h | 3 +- .../filters/network/source/router/router.cc | 234 +++++++++--------- .../filters/network/source/router/router.h | 49 ++-- .../network/test/codecs/dubbo/config_test.cc | 4 +- .../network/test/codecs/http1/config_test.cc | 24 +- .../network/test/codecs/kafka/config_test.cc | 4 +- .../filters/network/test/integration_test.cc | 4 +- .../filters/network/test/mocks/codec.h | 5 +- .../filters/network/test/proxy_test.cc | 31 +++ .../network/test/router/router_test.cc | 48 +++- 13 files changed, 261 insertions(+), 182 deletions(-) diff --git a/contrib/generic_proxy/filters/network/source/interface/codec_callbacks.h b/contrib/generic_proxy/filters/network/source/interface/codec_callbacks.h index 45a8dc7c3205..cfb45cbb511a 100644 --- a/contrib/generic_proxy/filters/network/source/interface/codec_callbacks.h +++ b/contrib/generic_proxy/filters/network/source/interface/codec_callbacks.h @@ -31,8 +31,9 @@ class ServerCodecCallbacks { /** * If request decoding failure then this method will be called. + * @param reason the reason of decoding failure. */ - virtual void onDecodingFailure() PURE; + virtual void onDecodingFailure(absl::string_view reason = {}) PURE; /** * Write specified data to the downstream connection. This is could be used to write @@ -71,8 +72,9 @@ class ClientCodecCallbacks { /** * If response decoding failure then this method will be called. + * @param reason the reason of decoding failure. */ - virtual void onDecodingFailure() PURE; + virtual void onDecodingFailure(absl::string_view reason = {}) PURE; /** * Write specified data to the upstream connection. This is could be used to write @@ -114,6 +116,12 @@ class EncodingCallbacks { */ virtual void onEncodingSuccess(Buffer::Instance& buffer, bool end_stream) PURE; + /** + * If encoding failure then this method will be called. + * @param reason the reason of encoding failure. + */ + virtual void onEncodingFailure(absl::string_view reason = {}) PURE; + /** * The route that the request is matched to. This is optional when encoding the response * (by server codec) because the request may not be matched to any route and the diff --git a/contrib/generic_proxy/filters/network/source/interface/stream.h b/contrib/generic_proxy/filters/network/source/interface/stream.h index a345be85261b..a40d86570ecb 100644 --- a/contrib/generic_proxy/filters/network/source/interface/stream.h +++ b/contrib/generic_proxy/filters/network/source/interface/stream.h @@ -285,18 +285,10 @@ using ResponseSharedPtr = std::shared_ptr; template class StreamFramePtrHelper { public: - StreamFramePtrHelper(StreamFramePtr frame) { - auto frame_ptr = frame.release(); - - auto typed_frame_ptr = dynamic_cast(frame_ptr); - - if (typed_frame_ptr == nullptr) { - // If the frame is not the expected type, wrap it - // in the original StreamFramePtr. - frame_ = StreamFramePtr{frame_ptr}; - } else { - // If the frame is the expected type, wrap it - // in the typed frame unique pointer. + StreamFramePtrHelper(StreamFramePtr frame) : frame_(std::move(frame)) { + if (auto typed_frame_ptr = dynamic_cast(frame_.get()); typed_frame_ptr != nullptr) { + // If the frame is the expected type, wrap it in the typed frame unique pointer. + frame_.release(); typed_frame_ = std::unique_ptr{typed_frame_ptr}; } } diff --git a/contrib/generic_proxy/filters/network/source/proxy.cc b/contrib/generic_proxy/filters/network/source/proxy.cc index 13e5cef3f9b9..3e8b9b2d6e5b 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.cc +++ b/contrib/generic_proxy/filters/network/source/proxy.cc @@ -314,6 +314,11 @@ void ActiveStream::onEncodingSuccess(Buffer::Instance& buffer, bool end_stream) parent_.deferredStream(*this); } +void ActiveStream::onEncodingFailure(absl::string_view reason) { + ENVOY_LOG(error, "Generic proxy: response encoding failure: {}", reason); + resetStream(DownstreamStreamResetReason::ProtocolError); +} + void ActiveStream::initializeFilterChain(FilterChainFactory& factory) { factory.createFilterChain(*this); // Reverse the encoder filter chain so that the first encoder filter is the last filter in the @@ -390,9 +395,9 @@ void Filter::onDecodingSuccess(StreamFramePtr request) { onDecodingFailure(); } -void Filter::onDecodingFailure() { +void Filter::onDecodingFailure(absl::string_view reason) { + ENVOY_LOG(error, "generic proxy: request decoding failure: {}", reason); stats_helper_.onRequestDecodingError(); - resetDownstreamAllStreams(DownstreamStreamResetReason::ProtocolError); closeDownstreamConnection(); } diff --git a/contrib/generic_proxy/filters/network/source/proxy.h b/contrib/generic_proxy/filters/network/source/proxy.h index 71fdf1180436..611a663707c0 100644 --- a/contrib/generic_proxy/filters/network/source/proxy.h +++ b/contrib/generic_proxy/filters/network/source/proxy.h @@ -263,6 +263,7 @@ class ActiveStream : public FilterChainManager, // ResponseEncoderCallback void onEncodingSuccess(Buffer::Instance& buffer, bool end_stream) override; + void onEncodingFailure(absl::string_view reason = {}) override; OptRef routeEntry() const override { return makeOptRefFromPtr(cached_route_entry_.get()); } @@ -372,7 +373,7 @@ class Filter : public Envoy::Network::ReadFilter, // RequestDecoderCallback void onDecodingSuccess(StreamFramePtr request) override; - void onDecodingFailure() override; + void onDecodingFailure(absl::string_view reason = {}) override; void writeToConnection(Buffer::Instance& buffer) override; OptRef connection() override; diff --git a/contrib/generic_proxy/filters/network/source/router/router.cc b/contrib/generic_proxy/filters/network/source/router/router.cc index bd588e919cc2..c4d745102faf 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.cc +++ b/contrib/generic_proxy/filters/network/source/router/router.cc @@ -20,10 +20,22 @@ namespace GenericProxy { namespace Router { namespace { -absl::string_view resetReasonToStringView(StreamResetReason reason) { - static std::string Reasons[] = {"local_reset", "connection_failure", "connection_termination", - "overflow", "protocol_error"}; - return Reasons[static_cast(reason)]; + +struct ReasonViewAndFlag { + absl::string_view view{}; + absl::optional flag{}; +}; + +static constexpr ReasonViewAndFlag ReasonViewAndFlags[] = { + {"local_reset", absl::nullopt}, + {"connection_failure", StreamInfo::CoreResponseFlag::UpstreamConnectionFailure}, + {"connection_termination", StreamInfo::CoreResponseFlag::UpstreamConnectionTermination}, + {"overflow", StreamInfo::CoreResponseFlag::UpstreamOverflow}, + {"protocol_error", StreamInfo::CoreResponseFlag::UpstreamProtocolError}, +}; + +ReasonViewAndFlag resetReasonToViewAndFlag(StreamResetReason reason) { + return ReasonViewAndFlags[static_cast(reason)]; } constexpr absl::string_view RouterFilterName = "envoy.filters.generic.router"; @@ -82,7 +94,7 @@ void GenericUpstream::onPoolFailure(ConnectionPool::PoolFailureReason reason, host != nullptr ? host->address()->asStringView() : absl::string_view{}); tcp_pool_handle_ = nullptr; - upstream_host_ = std::move(host); + mayUpdateUpstreamHost(std::move(host)); onPoolFailureImpl(reason, transport_failure_reason); } @@ -94,7 +106,7 @@ void GenericUpstream::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_ host->address()->asStringView()); tcp_pool_handle_ = nullptr; - upstream_host_ = std::move(host); + mayUpdateUpstreamHost(std::move(host)); owned_conn_data_ = std::move(conn_data); owned_conn_data_->addUpstreamCallbacks(*this); @@ -150,36 +162,25 @@ void BoundGenericUpstream::onDownstreamConnectionEvent(Network::ConnectionEvent void BoundGenericUpstream::insertUpstreamRequest(uint64_t stream_id, UpstreamRequest* pending_request) { + if (waiting_response_requests_.contains(stream_id) || + waiting_upstream_requests_.contains(stream_id)) { + ENVOY_LOG(error, "generic proxy: stream_id {} already registered", stream_id); + // Close downstream connection because we treat this as request decoding failure. + // The downstream closing will trigger the upstream connection closing. + downstream_connection_.close(Network::ConnectionCloseType::FlushWrite); + return; + } + if (upstream_connection_ready_.has_value()) { // Upstream connection is already ready. If the upstream connection is failed then // all pending requests will be reset and no new upstream request will be created. - ASSERT(upstream_connection_ready_.value()); if (!upstream_connection_ready_.value()) { return; } - ASSERT(waiting_upstream_requests_.empty()); - - if (waiting_response_requests_.contains(stream_id)) { - ENVOY_LOG(error, "generic proxy: stream_id {} already registered for response", stream_id); - // Close downstream connection because we treat this as request decoding failure. - // The downstream closing will trigger the upstream connection closing. - downstream_connection_.close(Network::ConnectionCloseType::FlushWrite); - return; - } - waiting_response_requests_[stream_id] = pending_request; - pending_request->onUpstreamSuccess(upstream_host_); + pending_request->onUpstreamSuccess(); } else { - // Waiting for the upstream connection to be ready. - if (waiting_upstream_requests_.contains(stream_id)) { - ENVOY_LOG(error, "generic proxy: stream_id {} already registered for upstream", stream_id); - // Close downstream connection because we treat this as request decoding failure. - // The downstream closing will trigger the upstream connection closing. - downstream_connection_.close(Network::ConnectionCloseType::FlushWrite); - return; - } - waiting_upstream_requests_[stream_id] = pending_request; // Try to initialize the upstream connection after there is at least one pending request. @@ -199,8 +200,6 @@ void BoundGenericUpstream::onEventImpl(Network::ConnectionEvent event) { return; } - ASSERT(waiting_upstream_requests_.empty()); - while (!waiting_response_requests_.empty()) { auto it = waiting_response_requests_.begin(); auto cb = it->second; @@ -218,7 +217,7 @@ void BoundGenericUpstream::onEventImpl(Network::ConnectionEvent event) { void BoundGenericUpstream::cleanUp(bool close_connection) { // Shared upstream manager never release the connection back to the pool // because the connection is bound to the downstream connection. - if (!close_connection) { + if (!close_connection || prevent_clean_up_) { return; } // Only actually do the cleanup when we want to close the connection. @@ -244,7 +243,7 @@ void BoundGenericUpstream::onPoolSuccessImpl() { waiting_upstream_requests_.erase(it); // Now, notify the upstream request that the upstream connection is ready. - cb->onUpstreamSuccess(upstream_host_); + cb->onUpstreamSuccess(); } } @@ -266,7 +265,7 @@ void BoundGenericUpstream::onPoolFailureImpl(ConnectionPool::PoolFailureReason r waiting_upstream_requests_.erase(it); // Now, notify the upstream request that the upstream connection is failed. - cb->onUpstreamFailure(reason, transport_failure_reason, upstream_host_); + cb->onUpstreamFailure(reason, transport_failure_reason); } // If the downstream connection is not closed, close it. @@ -293,15 +292,23 @@ void BoundGenericUpstream::onDecodingSuccess(StreamFramePtr response) { return cb->onDecodingSuccess(std::move(response)); } -void BoundGenericUpstream::onDecodingFailure() { - ENVOY_LOG(error, "generic proxy bound upstream manager: decoding failure"); +void BoundGenericUpstream::onDecodingFailure(absl::string_view reason) { + ENVOY_LOG(error, "generic proxy bound upstream manager: decoding failure ({})", reason); - // This will trigger the upstream connection close event and all pending requests will be reset - // by the upstream connection close event. - cleanUp(true); + // Prevent the clean up to ensure the connection will not be closed by the following + // upstream request reset. + prevent_clean_up_ = true; - // All pending streams will be reset by the upstream connection close event. - ASSERT(waiting_response_requests_.empty()); + while (!waiting_response_requests_.empty()) { + auto it = waiting_response_requests_.begin(); + auto upstream_request = it->second; + waiting_response_requests_.erase(it); + upstream_request->onDecodingFailure(reason); + } + + // Now to clean up the connection after all upstream requests are reset. + prevent_clean_up_ = false; + cleanUp(true); } OwnedGenericUpstream::OwnedGenericUpstream(const CodecFactory& codec_factory, @@ -324,13 +331,13 @@ void OwnedGenericUpstream::onEventImpl(Network::ConnectionEvent event) { void OwnedGenericUpstream::onPoolSuccessImpl() { ASSERT(upstream_request_ != nullptr); - upstream_request_->onUpstreamSuccess(upstream_host_); + upstream_request_->onUpstreamSuccess(); } void OwnedGenericUpstream::onPoolFailureImpl(ConnectionPool::PoolFailureReason reason, absl::string_view transport_failure_reason) { ASSERT(upstream_request_ != nullptr); - upstream_request_->onUpstreamFailure(reason, transport_failure_reason, upstream_host_); + upstream_request_->onUpstreamFailure(reason, transport_failure_reason); } // ResponseDecoderCallback @@ -338,9 +345,9 @@ void OwnedGenericUpstream::onDecodingSuccess(StreamFramePtr response) { ASSERT(upstream_request_ != nullptr); upstream_request_->onDecodingSuccess(std::move(response)); } -void OwnedGenericUpstream::onDecodingFailure() { +void OwnedGenericUpstream::onDecodingFailure(absl::string_view reason) { ASSERT(upstream_request_ != nullptr); - upstream_request_->onDecodingFailure(); + upstream_request_->onDecodingFailure(reason); } UpstreamRequest::UpstreamRequest(RouterFilter& parent, GenericUpstreamSharedPtr generic_upstream) @@ -348,6 +355,9 @@ UpstreamRequest::UpstreamRequest(RouterFilter& parent, GenericUpstreamSharedPtr stream_info_(parent.time_source_, nullptr, StreamInfo::FilterState::LifeSpan::FilterChain), upstream_info_(std::make_shared()) { + // Host is known at this point and set the initial upstream host. + onUpstreamHostSelected(generic_upstream_->upstreamHost()); + // Set the upstream info for the stream info. stream_info_.setUpstreamInfo(upstream_info_); parent_.callbacks_->streamInfo().setUpstreamInfo(upstream_info_); @@ -374,7 +384,7 @@ void UpstreamRequest::startStream() { generic_upstream_->insertUpstreamRequest(stream_id_, this); } -void UpstreamRequest::resetStream(StreamResetReason reason) { +void UpstreamRequest::resetStream(StreamResetReason reason, absl::string_view reason_detail) { if (reset_or_response_complete_) { return; } @@ -387,7 +397,7 @@ void UpstreamRequest::resetStream(StreamResetReason reason) { if (span_ != nullptr) { span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); - span_->setTag(Tracing::Tags::get().ErrorReason, resetReasonToStringView(reason)); + span_->setTag(Tracing::Tags::get().ErrorReason, resetReasonToViewAndFlag(reason).view); TraceContextBridge trace_context{*parent_.request_stream_}; Tracing::TracerUtility::finalizeSpan(*span_, trace_context, stream_info_, tracing_config_.value().get(), true); @@ -397,7 +407,7 @@ void UpstreamRequest::resetStream(StreamResetReason reason) { deferredDelete(); // Notify the parent filter that the upstream request has been reset. - parent_.onUpstreamRequestReset(*this, reason); + parent_.onUpstreamRequestReset(*this, reason, reason_detail); } void UpstreamRequest::clearStream(bool close_connection) { @@ -475,40 +485,45 @@ void UpstreamRequest::onEncodingSuccess(Buffer::Instance& buffer, bool end_strea } } +void UpstreamRequest::onEncodingFailure(absl::string_view reason) { + // The request encoding failure is treated as a protocol error. + resetStream(StreamResetReason::ProtocolError, reason); +} + OptRef UpstreamRequest::routeEntry() const { return makeOptRefFromPtr(parent_.route_entry_); } -void UpstreamRequest::onUpstreamFailure(ConnectionPool::PoolFailureReason reason, absl::string_view, - Upstream::HostDescriptionConstSharedPtr host) { +void UpstreamRequest::onUpstreamFailure(ConnectionPool::PoolFailureReason reason, + absl::string_view transport_failure_reason) { ENVOY_LOG(debug, "upstream request: tcp connection (bound or owned) failure"); - - // Mimic an upstream reset. - onUpstreamHostSelected(std::move(host)); + onUpstreamHostSelected(generic_upstream_->upstreamHost()); + onUpstreamConnectionReady(); if (reason == ConnectionPool::PoolFailureReason::Overflow) { - resetStream(StreamResetReason::Overflow); + resetStream(StreamResetReason::Overflow, transport_failure_reason); return; } - - resetStream(StreamResetReason::ConnectionFailure); + resetStream(StreamResetReason::ConnectionFailure, transport_failure_reason); } -void UpstreamRequest::onUpstreamSuccess(Upstream::HostDescriptionConstSharedPtr host) { +void UpstreamRequest::onUpstreamSuccess() { ENVOY_LOG(debug, "upstream request: {} tcp connection has ready", parent_.config_->bindUpstreamConnection() ? "bound" : "owned"); + onUpstreamHostSelected(generic_upstream_->upstreamHost()); + onUpstreamConnectionReady(); + + const auto upstream_host = upstream_info_->upstream_host_.get(); + const Tracing::UpstreamContext upstream_context( + upstream_host, upstream_host ? &upstream_host->cluster() : nullptr, + Tracing::ServiceType::Unknown, false); - onUpstreamHostSelected(std::move(host)); + TraceContextBridge trace_context{*parent_.request_stream_}; if (span_ != nullptr) { - TraceContextBridge trace_context{*parent_.request_stream_}; - Tracing::UpstreamContext upstream_context( - upstream_info_->upstream_host_.get(), // host_ - &upstream_info_->upstream_host_->cluster(), // cluster_ - Tracing::ServiceType::Unknown, // service_type_ - false // async_client_span_ - ); span_->injectContext(trace_context, upstream_context); + } else { + parent_.callbacks_->activeSpan().injectContext(trace_context, upstream_context); } sendRequestStartToUpstream(); @@ -517,38 +532,33 @@ void UpstreamRequest::onUpstreamSuccess(Upstream::HostDescriptionConstSharedPtr void UpstreamRequest::onDecodingSuccess(StreamFramePtr response) { const bool end_stream = response->frameFlags().endStream(); + + if (!response_stream_header_received_) { + // The first frame of response is received. + upstream_info_->upstreamTiming().onFirstUpstreamRxByteReceived(parent_.time_source_); + } if (end_stream) { + // The response is fully received. + upstream_info_->upstreamTiming().onLastUpstreamRxByteReceived(parent_.time_source_); + + // The response is complete and clear the stream. clearStream(response->frameFlags().streamFlags().drainClose()); } if (response_stream_header_received_) { - if (end_stream) { - // The response is fully received. - upstream_info_->upstreamTiming().onLastUpstreamRxByteReceived(parent_.time_source_); - } - parent_.onResponseFrame(std::move(response)); } else { - // The first frame of response is received. - upstream_info_->upstreamTiming().onFirstUpstreamRxByteReceived(parent_.time_source_); - StreamFramePtrHelper helper(std::move(response)); if (helper.typed_frame_ == nullptr) { - ENVOY_LOG(error, "upstream request: first frame is not StreamResponse"); - resetStream(StreamResetReason::ProtocolError); + resetStream(StreamResetReason::ProtocolError, "first frame is not StreamResponse"); return; } response_stream_header_received_ = true; - - if (end_stream) { - // The response is fully received. - upstream_info_->upstreamTiming().onLastUpstreamRxByteReceived(parent_.time_source_); - } parent_.onResponseStart(std::move(helper.typed_frame_)); } } -void UpstreamRequest::onDecodingFailure() { +void UpstreamRequest::onDecodingFailure(absl::string_view reason) { // Decoding failure after the response is complete, close the connection. // This should only happen when some special cases, for example: // The HTTP response is complete but the request is not fully sent. @@ -557,7 +567,7 @@ void UpstreamRequest::onDecodingFailure() { generic_upstream_->cleanUp(true); return; } - resetStream(StreamResetReason::ProtocolError); + resetStream(StreamResetReason::ProtocolError, reason); } void UpstreamRequest::onConnectionClose(Network::ConnectionEvent event) { @@ -569,10 +579,10 @@ void UpstreamRequest::onConnectionClose(Network::ConnectionEvent event) { switch (event) { case Network::ConnectionEvent::LocalClose: - resetStream(StreamResetReason::LocalReset); + resetStream(StreamResetReason::LocalReset, {}); break; case Network::ConnectionEvent::RemoteClose: - resetStream(StreamResetReason::ConnectionTermination); + resetStream(StreamResetReason::ConnectionTermination, {}); break; default: break; @@ -580,11 +590,18 @@ void UpstreamRequest::onConnectionClose(Network::ConnectionEvent event) { } void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) { - ENVOY_LOG(debug, "upstream request: selected upstream {}", host->address()->asString()); + if (host == nullptr || host == upstream_info_->upstream_host_) { + return; + } + + ENVOY_LOG(debug, "upstream request: selected host {}", host->address()->asStringView()); + upstream_info_->upstream_host_ = std::move(host); +} + +void UpstreamRequest::onUpstreamConnectionReady() { ASSERT(connecting_start_time_.has_value()); upstream_info_->upstreamTiming().recordConnectionPoolCallbackLatency( connecting_start_time_.value(), parent_.time_source_); - upstream_info_->setUpstreamHost(std::move(host)); } void UpstreamRequest::encodeBufferToUpstream(Buffer::Instance& buffer) { @@ -610,22 +627,23 @@ void RouterFilter::completeDirectly() { callbacks_->completeDirectly(); } -void RouterFilter::onUpstreamRequestReset(UpstreamRequest&, StreamResetReason reason) { +void RouterFilter::onUpstreamRequestReset(UpstreamRequest&, StreamResetReason reason, + absl::string_view reason_detail) { if (filter_complete_) { return; } // TODO(wbpcode): To support retry policy. - resetStream(reason); + resetStream(reason, reason_detail); } -void RouterFilter::cleanUpstreamRequests(bool filter_complete) { - // If filter_complete_ is true then the resetStream() of RouterFilter will not be called on the - // onUpstreamRequestReset() of RouterFilter. - filter_complete_ = filter_complete; +void RouterFilter::cleanUpstreamRequests() { + // Set filter_complete_ to true then the resetStream() of RouterFilter will not be called + // on the onUpstreamRequestReset() of RouterFilter. + filter_complete_ = true; while (!upstream_requests_.empty()) { - (*upstream_requests_.back()).resetStream(StreamResetReason::LocalReset); + (*upstream_requests_.back()).resetStream(StreamResetReason::LocalReset, {}); } } @@ -633,42 +651,20 @@ void RouterFilter::onDestroy() { if (filter_complete_) { return; } - cleanUpstreamRequests(true); + cleanUpstreamRequests(); } -void RouterFilter::resetStream(StreamResetReason reason) { +void RouterFilter::resetStream(StreamResetReason reason, absl::string_view reason_detail) { if (filter_complete_) { return; } filter_complete_ = true; - ASSERT(upstream_requests_.empty()); - switch (reason) { - case StreamResetReason::LocalReset: - // Note if the connection is closed because of the downstream connection close, this - // resetStream() will not be called. So this means the connection is closed by the Envoy self - // with unknown reason. - callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, resetReasonToStringView(reason))); - break; - case StreamResetReason::ProtocolError: - callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamProtocolError); - callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, resetReasonToStringView(reason))); - break; - case StreamResetReason::ConnectionFailure: - callbacks_->streamInfo().setResponseFlag( - StreamInfo::CoreResponseFlag::UpstreamConnectionFailure); - callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, resetReasonToStringView(reason))); - break; - case StreamResetReason::ConnectionTermination: - callbacks_->streamInfo().setResponseFlag( - StreamInfo::CoreResponseFlag::UpstreamConnectionTermination); - callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, resetReasonToStringView(reason))); - break; - case StreamResetReason::Overflow: - callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow); - callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, resetReasonToStringView(reason))); - break; + const auto [view, flag] = resetReasonToViewAndFlag(reason); + if (flag.has_value()) { + callbacks_->streamInfo().setResponseFlag(flag.value()); } + callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, view), reason_detail); } void RouterFilter::kickOffNewUpstreamRequest() { diff --git a/contrib/generic_proxy/filters/network/source/router/router.h b/contrib/generic_proxy/filters/network/source/router/router.h index 89952742ae19..82888d291b6a 100644 --- a/contrib/generic_proxy/filters/network/source/router/router.h +++ b/contrib/generic_proxy/filters/network/source/router/router.h @@ -35,6 +35,8 @@ enum class StreamResetReason : uint32_t { Overflow, // Protocol error. ProtocolError, + + LastReason = ProtocolError, }; class RouterFilter; @@ -47,7 +49,8 @@ class GenericUpstream : public ClientCodecCallbacks, public Envoy::Logger::Loggable { public: GenericUpstream(Upstream::TcpPoolData&& tcp_pool_data, ClientCodecPtr&& client_codec) - : tcp_pool_data_(std::move(tcp_pool_data)), client_codec_(std::move(client_codec)) { + : tcp_pool_data_(std::move(tcp_pool_data)), client_codec_(std::move(client_codec)), + upstream_host_(tcp_pool_data_.host()) { client_codec_->setCodecCallbacks(*this); } ~GenericUpstream() override; @@ -68,14 +71,22 @@ class GenericUpstream : public ClientCodecCallbacks, void onEvent(Network::ConnectionEvent event) override { onEventImpl(event); } ClientCodec& clientCodec() { return *client_codec_; } + void mayUpdateUpstreamHost(Upstream::HostDescriptionConstSharedPtr real_host) { + if (real_host == nullptr || real_host == upstream_host_) { + return; + } + + // Update the upstream host iff the connection pool callbacks provide a different + // value. + upstream_host_ = std::move(real_host); + } + Upstream::HostDescriptionConstSharedPtr upstreamHost() const { return upstream_host_; } // ResponseDecoderCallback void writeToConnection(Buffer::Instance& buffer) override; OptRef connection() override; OptRef upstreamCluster() const override { - if (upstream_host_ == nullptr) { - return {}; - } + ASSERT(upstream_host_ != nullptr); return upstream_host_->cluster(); } @@ -89,6 +100,7 @@ class GenericUpstream : public ClientCodecCallbacks, protected: Upstream::TcpPoolData tcp_pool_data_; ClientCodecPtr client_codec_; + Upstream::HostDescriptionConstSharedPtr upstream_host_; // Whether the upstream connection is created. This will be set to true when the initialize() // is called. @@ -96,8 +108,6 @@ class GenericUpstream : public ClientCodecCallbacks, Tcp::ConnectionPool::Cancellable* tcp_pool_handle_{}; Tcp::ConnectionPool::ConnectionDataPtr owned_conn_data_; - - Upstream::HostDescriptionConstSharedPtr upstream_host_; }; using GenericUpstreamSharedPtr = std::shared_ptr; @@ -118,9 +128,9 @@ class BoundGenericUpstream : public GenericUpstream, void onEventImpl(Network::ConnectionEvent event) override; void cleanUp(bool close_connection) override; - // ResponseDecoderCallback + // ClientCodecCallbacks void onDecodingSuccess(StreamFramePtr response) override; - void onDecodingFailure() override; + void onDecodingFailure(absl::string_view reason = {}) override; // GenericUpstream void insertUpstreamRequest(uint64_t stream_id, UpstreamRequest* pending_request) override; @@ -147,6 +157,7 @@ class BoundGenericUpstream : public GenericUpstream, std::unique_ptr connection_event_watcher_; absl::optional upstream_connection_ready_; + bool prevent_clean_up_{}; // Pending upstream requests that are waiting for the upstream response to be received. absl::flat_hash_map waiting_response_requests_; @@ -169,7 +180,7 @@ class OwnedGenericUpstream : public GenericUpstream { // ResponseDecoderCallback void onDecodingSuccess(StreamFramePtr response) override; - void onDecodingFailure() override; + void onDecodingFailure(absl::string_view reason = {}) override; // GenericUpstream void insertUpstreamRequest(uint64_t stream_id, UpstreamRequest* pending_request) override; @@ -187,27 +198,28 @@ class UpstreamRequest : public LinkedObject, UpstreamRequest(RouterFilter& parent, GenericUpstreamSharedPtr generic_upstream); void startStream(); - void resetStream(StreamResetReason reason); + void resetStream(StreamResetReason reason, absl::string_view reason_detail); void clearStream(bool close_connection); // Called when the stream has been reset or completed. void deferredDelete(); void onUpstreamFailure(ConnectionPool::PoolFailureReason reason, - absl::string_view transport_failure_reason, - Upstream::HostDescriptionConstSharedPtr host); - void onUpstreamSuccess(Upstream::HostDescriptionConstSharedPtr host); + absl::string_view transport_failure_reason); + void onUpstreamSuccess(); void onConnectionClose(Network::ConnectionEvent event); void onDecodingSuccess(StreamFramePtr response); - void onDecodingFailure(); + void onDecodingFailure(absl::string_view reason); // RequestEncoderCallback void onEncodingSuccess(Buffer::Instance& buffer, bool end_stream) override; + void onEncodingFailure(absl::string_view reason) override; OptRef routeEntry() const override; void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host); + void onUpstreamConnectionReady(); void encodeBufferToUpstream(Buffer::Instance& buffer); void sendRequestStartToUpstream(); @@ -273,8 +285,8 @@ class RouterFilter : public DecoderFilter, void onResponseFrame(StreamFramePtr frame); void completeDirectly(); - void onUpstreamRequestReset(UpstreamRequest& upstream_request, StreamResetReason reason); - void cleanUpstreamRequests(bool filter_complete); + void onUpstreamRequestReset(UpstreamRequest& upstream_request, StreamResetReason reason, + absl::string_view reason_detail); void setRouteEntry(const RouteEntry* route_entry) { route_entry_ = route_entry; } @@ -292,7 +304,10 @@ class RouterFilter : public DecoderFilter, friend class UpstreamManagerImpl; void kickOffNewUpstreamRequest(); - void resetStream(StreamResetReason reason); + void resetStream(StreamResetReason reason, absl::string_view reason_detail); + + // Clean up all the upstream requests. + void cleanUpstreamRequests(); // Set filter_complete_ to true before any local or upstream response. Because the // response processing may complete and destroy the L7 filter chain directly and cause the diff --git a/contrib/generic_proxy/filters/network/test/codecs/dubbo/config_test.cc b/contrib/generic_proxy/filters/network/test/codecs/dubbo/config_test.cc index ea37d43a8d85..2d6d0308d639 100644 --- a/contrib/generic_proxy/filters/network/test/codecs/dubbo/config_test.cc +++ b/contrib/generic_proxy/filters/network/test/codecs/dubbo/config_test.cc @@ -195,7 +195,7 @@ TEST(DubboServerCodecTest, DubboServerCodecTest) { buffer.writeBEInt(0); buffer.writeBEInt(0); - EXPECT_CALL(callbacks, onDecodingFailure()); + EXPECT_CALL(callbacks, onDecodingFailure(_)); server_codec.decode(buffer, false); } @@ -318,7 +318,7 @@ TEST(DubboClientCodecTest, DubboClientCodecTest) { buffer.writeBEInt(0); buffer.writeBEInt(0); - EXPECT_CALL(callbacks, onDecodingFailure()); + EXPECT_CALL(callbacks, onDecodingFailure(_)); client_codec.decode(buffer, false); } diff --git a/contrib/generic_proxy/filters/network/test/codecs/http1/config_test.cc b/contrib/generic_proxy/filters/network/test/codecs/http1/config_test.cc index 118c48d760c0..b9562fd19a39 100644 --- a/contrib/generic_proxy/filters/network/test/codecs/http1/config_test.cc +++ b/contrib/generic_proxy/filters/network/test/codecs/http1/config_test.cc @@ -422,7 +422,7 @@ TEST_F(Http1ServerCodecTest, UnexpectedRequestTest) { "custom: value\r\n" "\r\n"); - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } @@ -444,7 +444,7 @@ TEST_F(Http1ServerCodecTest, UnexpectedRequestTest) { "0\r\n" // Last chunk header. "\r\n"); // Last chunk footer. - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } @@ -465,7 +465,7 @@ TEST_F(Http1ServerCodecTest, UnexpectedRequestTest) { "0\r\n" // Last chunk header. "\r\n"); // Last chunk footer. - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } @@ -479,7 +479,7 @@ TEST_F(Http1ServerCodecTest, UnexpectedRequestTest) { "custom: value\r\n" "\r\n"); - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } @@ -494,7 +494,7 @@ TEST_F(Http1ServerCodecTest, UnexpectedRequestTest) { "custom: value\r\n" "\r\n"); - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } } @@ -731,7 +731,7 @@ TEST_F(Http1ServerCodecTest, NewRequestBeforeFirstRequestCompleteTest) { codec_->decode(buffer, false); // First request is not complete, so the codec should close the connection. - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer2, false); } @@ -786,7 +786,7 @@ TEST_F(Http1ServerCodecTest, SingleFrameModeRequestTooLargeTest) { "\r\n" "body~"); - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } @@ -1130,7 +1130,7 @@ TEST_F(Http1ClientCodecTest, UnexpectedResponseTest) { "0\r\n" // Last chunk header. "\r\n"); // Last chunk footer. - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } @@ -1152,7 +1152,7 @@ TEST_F(Http1ClientCodecTest, UnexpectedResponseTest) { "0\r\n" // Last chunk header. "\r\n"); // Last chunk footer. - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } } @@ -1309,9 +1309,9 @@ TEST_F(Http1ClientCodecTest, ResponseCompleteBeforeRequestCompleteTest) { // Decode the response. EXPECT_CALL(codec_callbacks_, onDecodingSuccess(_)).Times(3); - // Finally, the onDecodingFailure() is called because the request is not complete and the + // Finally, the onDecodingFailure(_) is called because the request is not complete and the // response is complete. - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } @@ -1394,7 +1394,7 @@ TEST_F(Http1ClientCodecTest, SingleFrameModeResponseTooLargeTest) { "\r\n" "body~"); - EXPECT_CALL(codec_callbacks_, onDecodingFailure()); + EXPECT_CALL(codec_callbacks_, onDecodingFailure(_)); codec_->decode(buffer, false); } diff --git a/contrib/generic_proxy/filters/network/test/codecs/kafka/config_test.cc b/contrib/generic_proxy/filters/network/test/codecs/kafka/config_test.cc index 7bee7cecd916..6b8f3df0bec6 100644 --- a/contrib/generic_proxy/filters/network/test/codecs/kafka/config_test.cc +++ b/contrib/generic_proxy/filters/network/test/codecs/kafka/config_test.cc @@ -68,7 +68,7 @@ TEST(KafkaCodecTest, KafkaRequestCallbacksTest) { } { - EXPECT_CALL(callbacks, onDecodingFailure()); + EXPECT_CALL(callbacks, onDecodingFailure(_)); request_callbacks.onFailedParse(nullptr); } } @@ -91,7 +91,7 @@ TEST(KafkaCodecTest, KafkaResponseCallbacksTest) { } { - EXPECT_CALL(callbacks, onDecodingFailure()); + EXPECT_CALL(callbacks, onDecodingFailure(_)); response_callbacks.onFailedParse(nullptr); } } diff --git a/contrib/generic_proxy/filters/network/test/integration_test.cc b/contrib/generic_proxy/filters/network/test/integration_test.cc index 78a8b51f2fdd..25dfe302f2b4 100644 --- a/contrib/generic_proxy/filters/network/test/integration_test.cc +++ b/contrib/generic_proxy/filters/network/test/integration_test.cc @@ -72,6 +72,7 @@ class IntegrationTest : public testing::TestWithParam routeEntry() const override { return {}; } void onEncodingSuccess(Buffer::Instance& buffer, bool) override { buffer_.move(buffer); } + void onEncodingFailure(absl::string_view) override {} Buffer::OwnedImpl buffer_; }; using TestRequestEncoderCallbackSharedPtr = std::shared_ptr; @@ -79,6 +80,7 @@ class IntegrationTest : public testing::TestWithParam routeEntry() const override { return {}; } void onEncodingSuccess(Buffer::Instance& buffer, bool) override { buffer_.move(buffer); } + void onEncodingFailure(absl::string_view) override {} Buffer::OwnedImpl buffer_; }; using TestResponseEncoderCallbackSharedPtr = std::shared_ptr; @@ -112,7 +114,7 @@ class IntegrationTest : public testing::TestWithParamdispatcher_->exit(); } } - void onDecodingFailure() override {} + void onDecodingFailure(absl::string_view) override {} void writeToConnection(Buffer::Instance&) override {} OptRef connection() override { if (parent_.upstream_connection_ != nullptr) { diff --git a/contrib/generic_proxy/filters/network/test/mocks/codec.h b/contrib/generic_proxy/filters/network/test/mocks/codec.h index f534d5c8685b..4229105383ff 100644 --- a/contrib/generic_proxy/filters/network/test/mocks/codec.h +++ b/contrib/generic_proxy/filters/network/test/mocks/codec.h @@ -11,7 +11,7 @@ namespace GenericProxy { class MockServerCodecCallbacks : public ServerCodecCallbacks { public: MOCK_METHOD(void, onDecodingSuccess, (StreamFramePtr request)); - MOCK_METHOD(void, onDecodingFailure, ()); + MOCK_METHOD(void, onDecodingFailure, (absl::string_view)); MOCK_METHOD(void, writeToConnection, (Buffer::Instance & buffer)); MOCK_METHOD(OptRef, connection, ()); }; @@ -19,7 +19,7 @@ class MockServerCodecCallbacks : public ServerCodecCallbacks { class MockClientCodecCallbacks : public ClientCodecCallbacks { public: MOCK_METHOD(void, onDecodingSuccess, (StreamFramePtr response)); - MOCK_METHOD(void, onDecodingFailure, ()); + MOCK_METHOD(void, onDecodingFailure, (absl::string_view)); MOCK_METHOD(void, writeToConnection, (Buffer::Instance & buffer)); MOCK_METHOD(OptRef, connection, ()); MOCK_METHOD(OptRef, upstreamCluster, (), (const)); @@ -28,6 +28,7 @@ class MockClientCodecCallbacks : public ClientCodecCallbacks { class MockEncodingCallbacks : public EncodingCallbacks { public: MOCK_METHOD(void, onEncodingSuccess, (Buffer::Instance & buffer, bool end_stream)); + MOCK_METHOD(void, onEncodingFailure, (absl::string_view)); MOCK_METHOD(OptRef, routeEntry, (), (const)); }; diff --git a/contrib/generic_proxy/filters/network/test/proxy_test.cc b/contrib/generic_proxy/filters/network/test/proxy_test.cc index a60d7c6fe5d8..a818e4d9b6ad 100644 --- a/contrib/generic_proxy/filters/network/test/proxy_test.cc +++ b/contrib/generic_proxy/filters/network/test/proxy_test.cc @@ -393,6 +393,37 @@ TEST_F(FilterTest, OnDecodingFailureWithActiveStreams) { 2); } +TEST_F(FilterTest, OnEncodingFailureWithActiveStreams) { + initializeFilter(); + + Buffer::OwnedImpl fake_empty_buffer; + EXPECT_CALL(*server_codec_, decode(_, _)).WillOnce(Invoke([&](Buffer::Instance&, bool) { + auto request_0 = std::make_unique(); + auto request_1 = std::make_unique(); + + filter_->onDecodingSuccess(std::move(request_0)); + filter_->onDecodingSuccess(std::move(request_1)); + })); + filter_->onData(fake_empty_buffer, false); + + EXPECT_EQ(2, filter_->activeStreamsForTest().size()); + + EXPECT_EQ(filter_config_->stats().downstream_rq_total_.value(), 2); + EXPECT_EQ(filter_config_->stats().downstream_rq_active_.value(), 2); + + // One of stream encoding failed. + filter_->activeStreamsForTest().begin()->get()->onEncodingFailure(); + + EXPECT_EQ(1, filter_->activeStreamsForTest().size()); + + EXPECT_EQ(filter_config_->stats().downstream_rq_total_.value(), 2); + EXPECT_EQ(filter_config_->stats().downstream_rq_active_.value(), 1); + EXPECT_EQ(filter_config_->stats().downstream_rq_reset_.value(), 1); + EXPECT_EQ( + factory_context_.store_.counter("generic_proxy.test_prefix.downstream_rq_flag.DPE").value(), + 1); +} + TEST_F(FilterTest, ActiveStreamRouteEntry) { initializeFilter(); diff --git a/contrib/generic_proxy/filters/network/test/router/router_test.cc b/contrib/generic_proxy/filters/network/test/router/router_test.cc index d79f4d6730a9..9923852c0e88 100644 --- a/contrib/generic_proxy/filters/network/test/router/router_test.cc +++ b/contrib/generic_proxy/filters/network/test/router/router_test.cc @@ -134,7 +134,7 @@ class RouterFilterTest : public testing::TestWithParam { } } - void notifyPoolReady() { + void notifyPoolReady(bool encoding_success = true) { if (creating_connection_) { creating_connection_ = false; @@ -143,7 +143,12 @@ class RouterFilterTest : public testing::TestWithParam { EXPECT_TRUE(boundUpstreamConnection()->waitingResponseRequestsForTest().empty()); } - EXPECT_CALL(mock_upstream_connection_, write(_, _)).Times(testing::AtLeast(1)); + if (!encoding_success) { + EXPECT_CALL(mock_upstream_connection_, write(_, _)).Times(0); + } else { + EXPECT_CALL(mock_upstream_connection_, write(_, _)).Times(testing::AtLeast(1)); + } + factory_context_.server_factory_context_.cluster_manager_.thread_local_cluster_.tcp_conn_pool_ .poolReady(mock_upstream_connection_); @@ -202,7 +207,7 @@ class RouterFilterTest : public testing::TestWithParam { upstream_request->generic_upstream_->onUpstreamData(test_buffer, false); } - void notifyDecodingFailure() { + void notifyDecodingFailure(absl::string_view reason) { ASSERT(!filter_->upstreamRequestsForTest().empty()); auto upstream_request = filter_->upstreamRequestsForTest().begin()->get(); @@ -222,7 +227,7 @@ class RouterFilterTest : public testing::TestWithParam { EXPECT_CALL(*mock_client_codec_, decode(BufferStringEqual("test_1"), _)) .WillOnce(Invoke([&](Buffer::Instance& buffer, bool) { buffer.drain(buffer.length()); - client_cb_->onDecodingFailure(); + client_cb_->onDecodingFailure(reason); })); Buffer::OwnedImpl test_buffer; @@ -461,6 +466,8 @@ TEST_P(RouterFilterTest, UpstreamRequestResetBeforePoolCallback) { kickOffNewUpstreamRequest(); if (with_tracing_) { + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().UpstreamAddress, _)); + EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().PeerAddress, _)); EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Error, "true")); EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().ErrorReason, "local_reset")); EXPECT_CALL(*child_span_, setTag(Tracing::Tags::get().Component, "proxy")); @@ -480,7 +487,7 @@ TEST_P(RouterFilterTest, UpstreamRequestResetBeforePoolCallback) { EXPECT_EQ(status.message(), "local_reset"); })); - filter_->upstreamRequestsForTest().begin()->get()->resetStream(StreamResetReason::LocalReset); + filter_->upstreamRequestsForTest().begin()->get()->resetStream(StreamResetReason::LocalReset, {}); EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); // Mock downstream closing. @@ -887,14 +894,35 @@ TEST_P(RouterFilterTest, UpstreamRequestPoolReadyAndResponseDecodingFailure) { EXPECT_NE(nullptr, upstream_request->generic_upstream_->connection().ptr()); EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _, _)) - .WillOnce(Invoke([this](Status status, absl::string_view, ResponseUpdateFunction) { + .WillOnce(Invoke([this](Status status, absl::string_view data, ResponseUpdateFunction) { + EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); + EXPECT_TRUE(status.message() == "protocol_error"); + EXPECT_EQ(data, "decoding-failure"); + })); + + notifyDecodingFailure("decoding-failure"); + + // Mock downstream closing. + mock_downstream_connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); +} + +TEST_P(RouterFilterTest, UpstreamRequestPoolReadyAndRequestEncodingFailure) { + setup(); + kickOffNewUpstreamRequest(); + + EXPECT_CALL(*mock_client_codec_, encode(_, _)) + .WillOnce(Invoke([&](const StreamFrame&, EncodingCallbacks& callback) -> void { + callback.onEncodingFailure("encoding-failure"); + })); + + EXPECT_CALL(mock_filter_callback_, sendLocalReply(_, _, _)) + .WillOnce(Invoke([this](Status status, absl::string_view data, ResponseUpdateFunction) { EXPECT_EQ(0, filter_->upstreamRequestsForTest().size()); - // Decoding error of bound upstream connection will not be notified to every requests - // and will be treated as local reset. - EXPECT_TRUE(status.message() == "protocol_error" || status.message() == "local_reset"); + EXPECT_TRUE(status.message() == "protocol_error"); + EXPECT_EQ(data, "encoding-failure"); })); - notifyDecodingFailure(); + notifyPoolReady(false); // Mock downstream closing. mock_downstream_connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);