From e2613cabc11dd13bb2185e65f7e6d5d13e426bb0 Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Wed, 22 May 2024 15:53:48 -0500 Subject: [PATCH 1/7] mobile: Wait until the Engine is ready before calling terminate (#34287) When the Engine is not fully initialized, calling terminate() will cause an assertion failure. [2024-05-21 21:28:16.718][18][critical][assert] [external/envoy/source/common/stats/thread_local_store.cc:49] assert failure: scopes_.empty(). This PR adds a workaround to wait until the Engine before proceeding to call terminate(). Risk Level: low Testing: unit tests Docs Changes: n/a Release Notes: n/a Platform Specific Features: mobile Signed-off-by: Fredy Wijaya --- mobile/library/common/internal_engine.cc | 8 ++++ mobile/library/common/internal_engine.h | 2 + mobile/test/cc/engine_test.cc | 52 ++++++++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/mobile/library/common/internal_engine.cc b/mobile/library/common/internal_engine.cc index b3a0d668657e..46d6a09e2e61 100644 --- a/mobile/library/common/internal_engine.cc +++ b/mobile/library/common/internal_engine.cc @@ -11,6 +11,9 @@ #include "library/common/stats/utility.h" namespace Envoy { +namespace { +constexpr absl::Duration ENGINE_RUNNING_TIMEOUT = absl::Seconds(3); +} // namespace static std::atomic current_stream_handle_{0}; @@ -173,6 +176,7 @@ envoy_status_t InternalEngine::main(std::shared_ptr opti server_->serverFactoryContext().scope(), server_->api().randomGenerator()); dispatcher_->drain(server_->dispatcher()); + engine_running_.Notify(); callbacks_->on_engine_running_(); }); } // mutex_ @@ -212,6 +216,10 @@ envoy_status_t InternalEngine::terminate() { return ENVOY_FAILURE; } + // Wait until the Engine is ready before calling terminate to avoid assertion failures. + // TODO(fredyw): Fix this without having to wait. + ASSERT(engine_running_.WaitForNotificationWithTimeout(ENGINE_RUNNING_TIMEOUT)); + // We need to be sure that MainCommon is finished being constructed so we can dispatch shutdown. { Thread::LockGuard lock(mutex_); diff --git a/mobile/library/common/internal_engine.h b/mobile/library/common/internal_engine.h index 6a247719f785..f70af3a2a31a 100644 --- a/mobile/library/common/internal_engine.h +++ b/mobile/library/common/internal_engine.h @@ -7,6 +7,7 @@ #include "source/common/common/posix/thread_impl.h" #include "source/common/common/thread.h" +#include "absl/synchronization/notification.h" #include "absl/types/optional.h" #include "extension_registry.h" #include "library/common/engine_common.h" @@ -163,6 +164,7 @@ class InternalEngine : public Logger::Loggable { // instructions scheduled on the main_thread_ need to have a longer lifetime. Thread::PosixThreadPtr main_thread_{nullptr}; // Empty placeholder to be populated later. bool terminated_{false}; + absl::Notification engine_running_; }; } // namespace Envoy diff --git a/mobile/test/cc/engine_test.cc b/mobile/test/cc/engine_test.cc index 3bd96e8bc9f1..b8d111bce024 100644 --- a/mobile/test/cc/engine_test.cc +++ b/mobile/test/cc/engine_test.cc @@ -141,4 +141,56 @@ TEST(EngineTest, SetEventTracker) { EXPECT_TRUE(on_track.WaitForNotificationWithTimeout(absl::Seconds(3))); } +TEST(EngineTest, DontWaitForOnEngineRunning) { + Platform::EngineBuilder engine_builder; + engine_builder.setLogLevel(Logger::Logger::debug).enforceTrustChainVerification(false); + EngineWithTestServer engine_with_test_server(engine_builder, TestServerType::HTTP2_WITH_TLS); + + std::string actual_status_code; + bool actual_end_stream = false; + absl::Notification stream_complete; + EnvoyStreamCallbacks stream_callbacks; + stream_callbacks.on_headers_ = [&](const Http::ResponseHeaderMap& headers, bool end_stream, + envoy_stream_intel) { + actual_status_code = headers.getStatusValue(); + actual_end_stream = end_stream; + }; + stream_callbacks.on_data_ = [&](const Buffer::Instance&, uint64_t /* length */, bool end_stream, + envoy_stream_intel) { actual_end_stream = end_stream; }; + stream_callbacks.on_complete_ = [&](envoy_stream_intel, envoy_final_stream_intel) { + stream_complete.Notify(); + }; + stream_callbacks.on_error_ = [&](EnvoyError, envoy_stream_intel, envoy_final_stream_intel) { + stream_complete.Notify(); + }; + stream_callbacks.on_cancel_ = [&](envoy_stream_intel, envoy_final_stream_intel) { + stream_complete.Notify(); + }; + auto stream_prototype = engine_with_test_server.engine()->streamClient()->newStreamPrototype(); + Platform::StreamSharedPtr stream = stream_prototype->start(std::move(stream_callbacks)); + + auto headers = Http::Utility::createRequestHeaderMapPtr(); + headers->addCopy(Http::LowerCaseString(":method"), "GET"); + headers->addCopy(Http::LowerCaseString(":scheme"), "https"); + headers->addCopy(Http::LowerCaseString(":authority"), + engine_with_test_server.testServer().getAddress()); + headers->addCopy(Http::LowerCaseString(":path"), "/"); + stream->sendHeaders(std::move(headers), true); + stream_complete.WaitForNotification(); + + EXPECT_EQ(actual_status_code, "200"); + EXPECT_TRUE(actual_end_stream); +} + +TEST(EngineTest, TerminateWithoutWaitingForOnEngineRunning) { + absl::Notification engine_running; + auto engine_callbacks = std::make_unique(); + engine_callbacks->on_engine_running_ = [&] { engine_running.Notify(); }; + + Platform::EngineBuilder engine_builder; + auto engine = engine_builder.setLogLevel(Logger::Logger::debug).build(); + + engine->terminate(); +} + } // namespace Envoy From 5e7e6d6af92a73d336b45faff2c62ca85b78e6a6 Mon Sep 17 00:00:00 2001 From: Fernando Cainelli Date: Thu, 23 May 2024 01:48:00 +0200 Subject: [PATCH 2/7] ext_proc: propagate tracing context (#33665) --------- Signed-off-by: Fernando Cainelli --- changelogs/current.yaml | 5 + envoy/grpc/async_client.h | 1 - envoy/http/async_client.h | 107 +++---------- source/common/grpc/async_client_impl.cc | 48 +++++- source/common/grpc/async_client_impl.h | 5 + .../extensions/filters/http/ext_proc/client.h | 2 +- .../filters/http/ext_proc/client_impl.cc | 15 +- .../filters/http/ext_proc/client_impl.h | 6 +- .../filters/http/ext_proc/ext_proc.cc | 10 +- .../filters/http/gcp_authn/gcp_authn_impl.cc | 2 +- test/common/grpc/async_client_impl_test.cc | 19 ++- test/extensions/filters/http/ext_proc/BUILD | 23 +++ .../filters/http/ext_proc/client_test.cc | 30 +++- .../ext_proc/ext_proc_integration_test.cc | 113 +++++++++++++ .../filters/http/ext_proc/filter_test.cc | 2 +- .../filters/http/ext_proc/mock_server.h | 2 +- .../filters/http/ext_proc/ordering_test.cc | 4 +- .../http/ext_proc/tracer_test_filter.cc | 148 ++++++++++++++++++ .../http/ext_proc/tracer_test_filter.proto | 14 ++ .../unit_test_fuzz/ext_proc_unit_test_fuzz.cc | 35 ++--- .../http/ext_proc/unit_test_fuzz/mocks.h | 2 +- 21 files changed, 457 insertions(+), 136 deletions(-) create mode 100644 test/extensions/filters/http/ext_proc/tracer_test_filter.cc create mode 100644 test/extensions/filters/http/ext_proc/tracer_test_filter.proto diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 000fdd5faa8a..44cf3692d08e 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -25,6 +25,11 @@ behavior_changes: minor_behavior_changes: # *Changes that may cause incompatibilities for some users, but should not for most* +- area: grpc + change: | + Changes in ``AsyncStreamImpl`` now propagate tracing context headers in bidirectional streams when using + :ref:`Envoy gRPC client `. Previously, tracing context headers + were not being set when calling external services such as ``ext_proc``. - area: tracers change: | Set status code for OpenTelemetry tracers (previously unset). diff --git a/envoy/grpc/async_client.h b/envoy/grpc/async_client.h index d286d686e8f5..a8f505edcc04 100644 --- a/envoy/grpc/async_client.h +++ b/envoy/grpc/async_client.h @@ -175,7 +175,6 @@ class RawAsyncClient { /** * Start a gRPC stream asynchronously. - * TODO(mattklein123): Determine if tracing should be added to streaming requests. * @param service_full_name full name of the service (i.e. service_method.service()->full_name()). * @param method_name name of the method (i.e. service_method.name()). * @param callbacks the callbacks to be notified of stream status. diff --git a/envoy/http/async_client.h b/envoy/http/async_client.h index f16769c0451b..3b495a795aae 100644 --- a/envoy/http/async_client.h +++ b/envoy/http/async_client.h @@ -321,10 +321,25 @@ class AsyncClient { return *this; } + StreamOptions& setParentSpan(Tracing::Span& parent_span) { + parent_span_ = &parent_span; + return *this; + } + StreamOptions& setChildSpanName(const std::string& child_span_name) { + child_span_name_ = child_span_name; + return *this; + } + StreamOptions& setSampled(absl::optional sampled) { + sampled_ = sampled; + return *this; + } + // For gmock test bool operator==(const StreamOptions& src) const { return timeout == src.timeout && buffer_body_for_retry == src.buffer_body_for_retry && - send_xff == src.send_xff && send_internal == src.send_internal; + send_xff == src.send_xff && send_internal == src.send_internal && + parent_span_ == src.parent_span_ && child_span_name_ == src.child_span_name_ && + sampled_ == src.sampled_; } // The timeout supplies the stream timeout, measured since when the frame with @@ -365,91 +380,6 @@ class AsyncClient { bool is_shadow{false}; bool is_shadow_suffixed_disabled{false}; - }; - - /** - * A structure to hold the options for AsyncRequest object. - */ - struct RequestOptions : public StreamOptions { - RequestOptions& setTimeout(const absl::optional& v) { - StreamOptions::setTimeout(v); - return *this; - } - RequestOptions& setTimeout(const std::chrono::milliseconds& v) { - StreamOptions::setTimeout(v); - return *this; - } - RequestOptions& setBufferBodyForRetry(bool v) { - StreamOptions::setBufferBodyForRetry(v); - return *this; - } - RequestOptions& setSendXff(bool v) { - StreamOptions::setSendXff(v); - return *this; - } - RequestOptions& setSendInternal(bool v) { - StreamOptions::setSendInternal(v); - return *this; - } - RequestOptions& setHashPolicy( - const Protobuf::RepeatedPtrField& v) { - StreamOptions::setHashPolicy(v); - return *this; - } - RequestOptions& setParentContext(const ParentContext& v) { - StreamOptions::setParentContext(v); - return *this; - } - RequestOptions& setMetadata(const envoy::config::core::v3::Metadata& m) { - StreamOptions::setMetadata(m); - return *this; - } - RequestOptions& setFilterState(Envoy::StreamInfo::FilterStateSharedPtr fs) { - StreamOptions::setFilterState(fs); - return *this; - } - RequestOptions& setRetryPolicy(const envoy::config::route::v3::RetryPolicy& p) { - StreamOptions::setRetryPolicy(p); - return *this; - } - RequestOptions& setRetryPolicy(const Router::RetryPolicy& p) { - StreamOptions::setRetryPolicy(p); - return *this; - } - RequestOptions& setIsShadow(bool s) { - StreamOptions::setIsShadow(s); - return *this; - } - RequestOptions& setIsShadowSuffixDisabled(bool d) { - StreamOptions::setIsShadowSuffixDisabled(d); - return *this; - } - RequestOptions& setParentSpan(Tracing::Span& parent_span) { - parent_span_ = &parent_span; - return *this; - } - RequestOptions& setChildSpanName(const std::string& child_span_name) { - child_span_name_ = child_span_name; - return *this; - } - RequestOptions& setSampled(absl::optional sampled) { - sampled_ = sampled; - return *this; - } - RequestOptions& setBufferAccount(const Buffer::BufferMemoryAccountSharedPtr& account) { - account_ = account; - return *this; - } - RequestOptions& setBufferLimit(uint32_t limit) { - buffer_limit_ = limit; - return *this; - } - - // For gmock test - bool operator==(const RequestOptions& src) const { - return StreamOptions::operator==(src) && parent_span_ == src.parent_span_ && - child_span_name_ == src.child_span_name_ && sampled_ == src.sampled_; - } // The parent span that child spans are created under to trace egress requests/responses. // If not set, requests will not be traced. @@ -462,6 +392,11 @@ class AsyncClient { absl::optional sampled_{true}; }; + /** + * A structure to hold the options for AsyncRequest object. + */ + using RequestOptions = StreamOptions; + /** * Send an HTTP request asynchronously * @param request the request to send. diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index ca2e1e02177e..542db9af115f 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -85,12 +85,33 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view serv } // Configure the maximum frame length decoder_.setMaxFrameLength(parent_.max_recv_message_length_); + + if (nullptr != options.parent_span_) { + const std::string child_span_name = + options.child_span_name_.empty() + ? absl::StrCat("async ", service_full_name, ".", method_name, " egress") + : options.child_span_name_; + + current_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name, + parent.time_source_.systemTime()); + current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.remote_cluster_name_); + current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.host_name_.empty() + ? parent.remote_cluster_name_ + : parent.host_name_); + current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy); + } else { + current_span_ = std::make_unique(); + } + + if (options.sampled_.has_value()) { + current_span_->setSampled(options.sampled_.value()); + } } void AsyncStreamImpl::initialize(bool buffer_body_for_retry) { const auto thread_local_cluster = parent_.cm_.getThreadLocalCluster(parent_.remote_cluster_name_); if (thread_local_cluster == nullptr) { - callbacks_.onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available"); + notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available"); http_reset_ = true; return; } @@ -100,7 +121,7 @@ void AsyncStreamImpl::initialize(bool buffer_body_for_retry) { dispatcher_ = &http_async_client.dispatcher(); stream_ = http_async_client.start(*this, options_.setBufferBodyForRetry(buffer_body_for_retry)); if (stream_ == nullptr) { - callbacks_.onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING); + notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING); http_reset_ = true; return; } @@ -118,6 +139,13 @@ void AsyncStreamImpl::initialize(bool buffer_body_for_retry) { parent_.metadata_parser_->evaluateHeaders(headers_message_->headers(), options_.parent_context.stream_info); + Tracing::HttpTraceContext trace_context(headers_message_->headers()); + Tracing::UpstreamContext upstream_context(nullptr, // host_ + cluster_info_.get(), // cluster_ + Tracing::ServiceType::EnvoyGrpc, // service_type_ + true // async_client_span_ + ); + current_span_->injectContext(trace_context, upstream_context); callbacks_.onCreateInitialMetadata(headers_message_->headers()); stream_->sendHeaders(headers_message_->headers(), false); } @@ -198,16 +226,26 @@ void AsyncStreamImpl::onTrailers(Http::ResponseTrailerMapPtr&& trailers) { if (!grpc_status) { grpc_status = Status::WellKnownGrpcStatus::Unknown; } - callbacks_.onRemoteClose(grpc_status.value(), grpc_message); + notifyRemoteClose(grpc_status.value(), grpc_message); cleanup(); } void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::string& message) { callbacks_.onReceiveTrailingMetadata(Http::ResponseTrailerMapImpl::create()); - callbacks_.onRemoteClose(grpc_status, message); + notifyRemoteClose(grpc_status, message); resetStream(); } +void AsyncStreamImpl::notifyRemoteClose(Grpc::Status::GrpcStatus status, + const std::string& message) { + current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status)); + if (status != Grpc::Status::WellKnownGrpcStatus::Ok) { + current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); + } + current_span_->finishSpan(); + callbacks_.onRemoteClose(status, message); +} + void AsyncStreamImpl::onComplete() { // No-op since stream completion is handled within other callbacks. } @@ -229,6 +267,8 @@ void AsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& buffer, bool end_stre void AsyncStreamImpl::closeStream() { Buffer::OwnedImpl empty_buffer; stream_->sendData(empty_buffer, true); + current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled); + current_span_->finishSpan(); } void AsyncStreamImpl::resetStream() { cleanup(); } diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 78635134e41d..b088f34dc637 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -98,11 +98,16 @@ class AsyncStreamImpl : public RawAsyncStream, void trailerResponse(absl::optional grpc_status, const std::string& grpc_message); + // Deliver notification and update span when the connection closes. + void notifyRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message); + Event::Dispatcher* dispatcher_{}; Http::RequestMessagePtr headers_message_; AsyncClientImpl& parent_; std::string service_full_name_; std::string method_name_; + Tracing::SpanPtr current_span_; + RawAsyncStreamCallbacks& callbacks_; Http::AsyncClient::StreamOptions options_; bool http_reset_{}; diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 25594b3748e4..787d946e442e 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -42,7 +42,7 @@ class ExternalProcessorClient { virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks, const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, - const StreamInfo::StreamInfo& stream_info) PURE; + const Http::AsyncClient::StreamOptions& options) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.cc b/source/extensions/filters/http/ext_proc/client_impl.cc index 1796e857eea7..51e60ddc423f 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.cc +++ b/source/extensions/filters/http/ext_proc/client_impl.cc @@ -14,21 +14,21 @@ ExternalProcessorClientImpl::ExternalProcessorClientImpl(Grpc::AsyncClientManage ExternalProcessorStreamPtr ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks, const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, - const StreamInfo::StreamInfo& stream_info) { + const Http::AsyncClient::StreamOptions& options) { auto client_or_error = client_manager_.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, scope_, true); THROW_IF_STATUS_NOT_OK(client_or_error, throw); Grpc::AsyncClient grpcClient(client_or_error.value()); - return ExternalProcessorStreamImpl::create(std::move(grpcClient), callbacks, stream_info); + return ExternalProcessorStreamImpl::create(std::move(grpcClient), callbacks, options); } ExternalProcessorStreamPtr ExternalProcessorStreamImpl::create( Grpc::AsyncClient&& client, - ExternalProcessorCallbacks& callbacks, const StreamInfo::StreamInfo& stream_info) { + ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options) { auto stream = std::unique_ptr(new ExternalProcessorStreamImpl(callbacks)); - if (stream->startStream(std::move(client), stream_info)) { + if (stream->startStream(std::move(client), options)) { return stream; } // Return nullptr on the start failure. @@ -37,13 +37,10 @@ ExternalProcessorStreamPtr ExternalProcessorStreamImpl::create( bool ExternalProcessorStreamImpl::startStream( Grpc::AsyncClient&& client, - const StreamInfo::StreamInfo& stream_info) { + const Http::AsyncClient::StreamOptions& options) { client_ = std::move(client); auto descriptor = Protobuf::DescriptorPool::generated_pool()->FindMethodByName(kExternalMethod); - grpc_context_.stream_info = &stream_info; - Http::AsyncClient::StreamOptions options; - options.setParentContext(grpc_context_); - options.setBufferBodyForRetry(true); + grpc_context_ = options.parent_context; stream_ = client_.start(*descriptor, *this, options); // Returns true if the start succeeded and returns false on start failure. return stream_ != nullptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 45c86fc9da5c..c0ed32363247 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -28,7 +28,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks, const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, - const StreamInfo::StreamInfo& stream_info) override; + const Http::AsyncClient::StreamOptions& options) override; private: Grpc::AsyncClientManager& client_manager_; @@ -42,7 +42,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream, // Factory method: create and return `ExternalProcessorStreamPtr`; return nullptr on failure. static ExternalProcessorStreamPtr create(Grpc::AsyncClient&& client, - ExternalProcessorCallbacks& callbacks, const StreamInfo::StreamInfo& stream_info); + ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options); void send(ProcessingRequest&& request, bool end_stream) override; // Close the stream. This is idempotent and will return true if we @@ -66,7 +66,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream, // Start the gRPC async stream: It returns true if the start succeeded. Otherwise it returns false // if it failed to start. bool startStream(Grpc::AsyncClient&& client, - const StreamInfo::StreamInfo& stream_info); + const Http::AsyncClient::StreamOptions& options); ExternalProcessorCallbacks& callbacks_; Grpc::AsyncClient client_; Grpc::AsyncStream stream_; diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 9b15ddf55418..c2910bf2ec2a 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -288,7 +288,15 @@ Filter::StreamOpenState Filter::openStream() { } if (!stream_) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); - stream_ = client_->start(*this, config_with_hash_key_, decoder_callbacks_->streamInfo()); + + Http::AsyncClient::ParentContext grpc_context; + grpc_context.stream_info = &decoder_callbacks_->streamInfo(); + auto options = Http::AsyncClient::StreamOptions() + .setParentSpan(decoder_callbacks_->activeSpan()) + .setParentContext(grpc_context) + .setBufferBodyForRetry(true); + + stream_ = client_->start(*this, config_with_hash_key_, options); if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called // Asserts that `stream_` is nullptr since it is not valid to be used any further diff --git a/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc b/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc index d4eb3e52a79d..ed16283f308c 100644 --- a/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc +++ b/source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc @@ -48,7 +48,7 @@ void GcpAuthnClient::fetchToken(RequestCallbacks& callbacks, Http::RequestMessag } // Set up the request options. - struct Envoy::Http::AsyncClient::RequestOptions options = + Envoy::Http::AsyncClient::RequestOptions options = Envoy::Http::AsyncClient::RequestOptions() .setTimeout(std::chrono::milliseconds( DurationUtil::durationToMilliseconds(config_.http_uri().timeout()))) diff --git a/test/common/grpc/async_client_impl_test.cc b/test/common/grpc/async_client_impl_test.cc index 055c618ae00b..f6ff36a9db35 100644 --- a/test/common/grpc/async_client_impl_test.cc +++ b/test/common/grpc/async_client_impl_test.cc @@ -172,7 +172,24 @@ TEST_F(EnvoyAsyncClientImplTest, MetadataIsInitializedWithoutStreamInfo) { EXPECT_CALL(http_stream, sendHeaders(_, _)) .WillOnce(Invoke([&http_callbacks](Http::HeaderMap&, bool) { http_callbacks->onReset(); })); - Http::AsyncClient::StreamOptions stream_options; + Tracing::MockSpan parent_span; + Tracing::MockSpan* child_span{new Tracing::MockSpan()}; + + EXPECT_CALL(parent_span, spawnChild_(_, "async helloworld.Greeter.SayHello egress", _)) + .WillOnce(Return(child_span)); + EXPECT_CALL(*child_span, + setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy))); + EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamCluster), Eq("test_cluster"))); + EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamAddress), Eq("test_cluster"))); + EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().GrpcStatusCode), Eq("13"))); + EXPECT_CALL(*child_span, injectContext(_, _)); + EXPECT_CALL(*child_span, finishSpan()); + EXPECT_CALL(*child_span, setSampled(true)); + EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().Error), Eq(Tracing::Tags::get().True))); + + auto stream_options = + Http::AsyncClient::StreamOptions().setParentSpan(parent_span).setSampled(true); + auto grpc_stream = grpc_client_->start(*method_descriptor_, grpc_callbacks, stream_options); EXPECT_EQ(grpc_stream, nullptr); } diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index fc1b02bf0166..4f3a82ff7f01 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -150,6 +150,7 @@ envoy_extension_cc_test( ], deps = [ ":logging_test_filter_lib", + ":tracer_test_filter_lib", ":utils_lib", "//source/extensions/filters/http/ext_proc:config", "//source/extensions/filters/http/set_metadata:config", @@ -161,6 +162,7 @@ envoy_extension_cc_test( "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/config/trace/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/http/set_metadata/v3:pkg_cc_proto", "@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto", @@ -345,3 +347,24 @@ envoy_extension_cc_test_library( "//test/test_common:utility_lib", ], ) + +envoy_proto_library( + name = "tracer_test_filter_proto", + srcs = [":tracer_test_filter.proto"], +) + +envoy_extension_cc_test_library( + name = "tracer_test_filter_lib", + srcs = ["tracer_test_filter.cc"], + extension_names = ["envoy.filters.http.ext_proc"], + external_deps = [ + ], + deps = [ + ":tracer_test_filter_proto_cc_proto", + "//source/common/config:utility_lib", + "//source/common/protobuf", + "//source/common/tracing:http_tracer_lib", + "//source/common/tracing:trace_context_lib", + "//source/extensions/tracers/common:factory_base_lib", + ], +) diff --git a/test/extensions/filters/http/ext_proc/client_test.cc b/test/extensions/filters/http/ext_proc/client_test.cc index 63bd95f2b1aa..7e3c36d7a6fa 100644 --- a/test/extensions/filters/http/ext_proc/client_test.cc +++ b/test/extensions/filters/http/ext_proc/client_test.cc @@ -79,14 +79,20 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback }; TEST_F(ExtProcStreamTest, OpenCloseStream) { - auto stream = client_->start(*this, config_with_hash_key_, stream_info_); + Http::AsyncClient::ParentContext parent_context; + parent_context.stream_info = &stream_info_; + auto options = Http::AsyncClient::StreamOptions().setParentContext(parent_context); + auto stream = client_->start(*this, config_with_hash_key_, options); EXPECT_CALL(stream_, closeStream()); EXPECT_CALL(stream_, resetStream()); stream->close(); } TEST_F(ExtProcStreamTest, SendToStream) { - auto stream = client_->start(*this, config_with_hash_key_, stream_info_); + Http::AsyncClient::ParentContext parent_context; + parent_context.stream_info = &stream_info_; + auto options = Http::AsyncClient::StreamOptions().setParentContext(parent_context); + auto stream = client_->start(*this, config_with_hash_key_, options); // Send something and ensure that we get it. Doesn't really matter what. EXPECT_CALL(stream_, sendMessageRaw_(_, false)); ProcessingRequest req; @@ -97,14 +103,20 @@ TEST_F(ExtProcStreamTest, SendToStream) { } TEST_F(ExtProcStreamTest, SendAndClose) { - auto stream = client_->start(*this, config_with_hash_key_, stream_info_); + Http::AsyncClient::ParentContext parent_context; + parent_context.stream_info = &stream_info_; + auto options = Http::AsyncClient::StreamOptions().setParentContext(parent_context); + auto stream = client_->start(*this, config_with_hash_key_, options); EXPECT_CALL(stream_, sendMessageRaw_(_, true)); ProcessingRequest req; stream->send(std::move(req), true); } TEST_F(ExtProcStreamTest, ReceiveFromStream) { - auto stream = client_->start(*this, config_with_hash_key_, stream_info_); + Http::AsyncClient::ParentContext parent_context; + parent_context.stream_info = &stream_info_; + auto options = Http::AsyncClient::StreamOptions().setParentContext(parent_context); + auto stream = client_->start(*this, config_with_hash_key_, options); ASSERT_NE(stream_callbacks_, nullptr); // Send something and ensure that we get it. Doesn't really matter what. ProcessingResponse resp; @@ -134,7 +146,10 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) { } TEST_F(ExtProcStreamTest, StreamClosed) { - auto stream = client_->start(*this, config_with_hash_key_, stream_info_); + Http::AsyncClient::ParentContext parent_context; + parent_context.stream_info = &stream_info_; + auto options = Http::AsyncClient::StreamOptions().setParentContext(parent_context); + auto stream = client_->start(*this, config_with_hash_key_, options); ASSERT_NE(stream_callbacks_, nullptr); EXPECT_FALSE(last_response_); EXPECT_FALSE(grpc_closed_); @@ -147,7 +162,10 @@ TEST_F(ExtProcStreamTest, StreamClosed) { } TEST_F(ExtProcStreamTest, StreamError) { - auto stream = client_->start(*this, config_with_hash_key_, stream_info_); + Http::AsyncClient::ParentContext parent_context; + parent_context.stream_info = &stream_info_; + auto options = Http::AsyncClient::StreamOptions().setParentContext(parent_context); + auto stream = client_->start(*this, config_with_hash_key_, options); ASSERT_NE(stream_callbacks_, nullptr); EXPECT_FALSE(last_response_); EXPECT_FALSE(grpc_closed_); diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 30ed198ba266..f5ab80b2b707 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -2,6 +2,7 @@ #include #include "envoy/config/core/v3/base.pb.h" +#include "envoy/config/trace/v3/opentelemetry.pb.h" #include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" #include "envoy/extensions/filters/http/set_metadata/v3/set_metadata.pb.h" #include "envoy/network/address.h" @@ -12,6 +13,8 @@ #include "test/common/http/common.h" #include "test/extensions/filters/http/ext_proc/logging_test_filter.pb.h" #include "test/extensions/filters/http/ext_proc/logging_test_filter.pb.validate.h" +#include "test/extensions/filters/http/ext_proc/tracer_test_filter.pb.h" +#include "test/extensions/filters/http/ext_proc/tracer_test_filter.pb.validate.h" #include "test/extensions/filters/http/ext_proc/utils.h" #include "test/integration/http_integration.h" #include "test/test_common/test_runtime.h" @@ -626,6 +629,41 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStream) { verifyDownstreamResponse(*response, 200); } +TEST_P(ExtProcIntegrationTest, GetAndCloseStreamWithTracing) { + if (!IsEnvoyGrpc()) { + GTEST_SKIP() << "Tracing is currently only supported for Envoy gRPC"; + } + initializeConfig(); + config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { + test::integration::filters::ExpectSpan ext_proc_span; + ext_proc_span.set_operation_name( + "async envoy.service.ext_proc.v3.ExternalProcessor.Process egress"); + ext_proc_span.set_context_injected(true); + ext_proc_span.set_sampled(true); + ext_proc_span.mutable_tags()->insert({"grpc.status_code", "0"}); + ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"}); + ext_proc_span.mutable_tags()->insert({"upstream_cluster", "ext_proc_server_0"}); + + test::integration::filters::TracerTestConfig test_config; + test_config.mutable_expect_spans()->Add()->CopyFrom(ext_proc_span); + + auto* tracing = cm.mutable_tracing(); + tracing->mutable_provider()->set_name("tracer-test-filter"); + tracing->mutable_provider()->mutable_typed_config()->PackFrom(test_config); + }); + + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(*grpc_upstreams_[0], request_headers_msg); + + processor_stream_->startGrpcStream(); + processor_stream_->finishGrpcStream(Grpc::Status::Ok); + handleUpstreamRequest(); + verifyDownstreamResponse(*response, 200); +} + TEST_P(ExtProcIntegrationTest, GetAndCloseStreamWithLogging) { ConfigOptions config_option = {}; config_option.add_logging_filter = true; @@ -658,6 +696,40 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStream) { verifyDownstreamResponse(*response, 500); } +TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithTracing) { + if (!IsEnvoyGrpc()) { + GTEST_SKIP() << "Tracing is currently only supported for Envoy gRPC"; + } + initializeConfig(); + config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { + test::integration::filters::ExpectSpan ext_proc_span; + ext_proc_span.set_operation_name( + "async envoy.service.ext_proc.v3.ExternalProcessor.Process egress"); + ext_proc_span.set_context_injected(true); + ext_proc_span.set_sampled(true); + ext_proc_span.mutable_tags()->insert({"grpc.status_code", "2"}); + ext_proc_span.mutable_tags()->insert({"error", "true"}); + ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"}); + ext_proc_span.mutable_tags()->insert({"upstream_cluster", "ext_proc_server_0"}); + + test::integration::filters::TracerTestConfig test_config; + test_config.mutable_expect_spans()->Add()->CopyFrom(ext_proc_span); + + auto* tracing = cm.mutable_tracing(); + tracing->mutable_provider()->set_name("tracer-test-filter"); + tracing->mutable_provider()->mutable_typed_config()->PackFrom(test_config); + }); + + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + ProcessingRequest request_headers_msg; + waitForFirstMessage(*grpc_upstreams_[0], request_headers_msg); + // Fail the stream immediately + processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "500"}}, true); + verifyDownstreamResponse(*response, 500); +} + TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithLogging) { ConfigOptions config_option = {}; config_option.add_logging_filter = true; @@ -2282,6 +2354,47 @@ TEST_P(ExtProcIntegrationTest, RequestMessageTimeout) { verifyDownstreamResponse(*response, 500); } +TEST_P(ExtProcIntegrationTest, RequestMessageTimeoutWithTracing) { + if (!IsEnvoyGrpc()) { + GTEST_SKIP() << "Tracing is currently only supported for Envoy gRPC"; + } + + // ensure 200 ms timeout + proto_config_.mutable_message_timeout()->set_nanos(200000000); + initializeConfig(); + + config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { + test::integration::filters::ExpectSpan ext_proc_span; + ext_proc_span.set_operation_name( + "async envoy.service.ext_proc.v3.ExternalProcessor.Process egress"); + ext_proc_span.set_context_injected(true); + ext_proc_span.set_sampled(true); + ext_proc_span.mutable_tags()->insert({"status", "canceled"}); + ext_proc_span.mutable_tags()->insert({"error", ""}); // not an error + ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"}); + ext_proc_span.mutable_tags()->insert({"upstream_cluster", "ext_proc_server_0"}); + + test::integration::filters::TracerTestConfig test_config; + test_config.mutable_expect_spans()->Add()->CopyFrom(ext_proc_span); + + auto* tracing = cm.mutable_tracing(); + tracing->mutable_provider()->set_name("tracer-test-filter"); + tracing->mutable_provider()->mutable_typed_config()->PackFrom(test_config); + }); + + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + processRequestHeadersMessage(*grpc_upstreams_[0], true, + [this](const HttpHeaders&, HeadersResponse&) { + // Travel forward 400 ms + timeSystem().advanceTimeWaitImpl(400ms); + return false; + }); + + // We should immediately have an error response now + verifyDownstreamResponse(*response, 500); +} + TEST_P(ExtProcIntegrationTest, RequestMessageTimeoutWithLogging) { // ensure 200 ms timeout proto_config_.mutable_message_timeout()->set_nanos(200000000); diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index b362ca708143..eadedf1f26e7 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -179,7 +179,7 @@ class HttpFilterTest : public testing::Test { ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, - testing::Unused) { + const Envoy::Http::AsyncClient::StreamOptions&) { if (final_expected_grpc_service_.has_value()) { EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), config_with_hash_key.config())); diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index 5f4dedb90173..5368d12de145 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -15,7 +15,7 @@ class MockClient : public ExternalProcessorClient { ~MockClient() override; MOCK_METHOD(ExternalProcessorStreamPtr, start, (ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, - const StreamInfo::StreamInfo&)); + const Envoy::Http::AsyncClient::StreamOptions&)); }; class MockStream : public ExternalProcessorStream { diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index 74c559b8b330..273a5cfab987 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -86,7 +86,7 @@ class OrderingTest : public testing::Test { // Called by the "start" method on the stream by the filter virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, const Grpc::GrpcServiceConfigWithHashKey&, - const StreamInfo::StreamInfo&) { + const Envoy::Http::AsyncClient::StreamOptions&) { stream_callbacks_ = &callbacks; auto stream = std::make_unique(); EXPECT_CALL(*stream, send(_, _)).WillRepeatedly(Invoke(this, &OrderingTest::doSend)); @@ -224,7 +224,7 @@ class FastFailOrderingTest : public OrderingTest { // All tests using this class have gRPC streams that will fail while being opened. ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, const Grpc::GrpcServiceConfigWithHashKey&, - const StreamInfo::StreamInfo&) override { + const Envoy::Http::AsyncClient::StreamOptions&) override { callbacks.onGrpcError(Grpc::Status::Internal); // Returns nullptr on start stream failure. return nullptr; diff --git a/test/extensions/filters/http/ext_proc/tracer_test_filter.cc b/test/extensions/filters/http/ext_proc/tracer_test_filter.cc new file mode 100644 index 000000000000..ec59c159c555 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/tracer_test_filter.cc @@ -0,0 +1,148 @@ +#include "envoy/registry/registry.h" +#include "envoy/tracing/trace_context.h" + +#include "source/common/tracing/trace_context_impl.h" +#include "source/extensions/tracers/common/factory_base.h" + +#include "test/extensions/filters/http/ext_proc/tracer_test_filter.pb.h" +#include "test/extensions/filters/http/ext_proc/tracer_test_filter.pb.validate.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace ExternalProcessing { + +struct ExpectedSpan { + std::string operation_name; + bool sampled; + bool context_injected; + std::map tags; + bool tested; +}; + +using ExpectedSpansSharedPtr = std::shared_ptr>; + +class Span : public Tracing::Span { +public: + Span(const std::string& operation_name, ExpectedSpansSharedPtr& expected_spans) + : operation_name_(operation_name), expected_spans_(expected_spans){}; + + ~Span() { + EXPECT_TRUE(finished_) << fmt::format("span not finished in operation: {}", operation_name_); + for (auto& expect_span : *expected_spans_) { + if (expect_span.operation_name != operation_name_) { + continue; + } + EXPECT_EQ(expect_span.sampled, sampled_) << fmt::format("operation: {}", operation_name_); + EXPECT_EQ(expect_span.context_injected, context_injected_) + << fmt::format("operation: {}", operation_name_); + + std::string all_tags; + for (const auto& [key, value] : tags_) { + all_tags += fmt::format("{}: {}\n", key, value); + } + for (const auto& [key, want] : expect_span.tags) { + absl::string_view got = tags_[key]; + EXPECT_EQ(want, got) << fmt::format("{}: {} not found in tags:\n{}", key, want, all_tags); + } + expect_span.tested = true; + break; + } + } + + void setTag(absl::string_view name, absl::string_view value) { + tags_.insert_or_assign(name.data(), value.data()); + } + void setOperation(absl::string_view operation_name) { operation_name_ = operation_name; } + void setSampled(bool do_sample) { sampled_ = do_sample; } + + void injectContext(Tracing::TraceContext&, const Tracing::UpstreamContext&) { + context_injected_ = true; + } + void setBaggage(absl::string_view, absl::string_view) { /* not implemented */ + } + void log(SystemTime, const std::string&) { /* not implemented */ + } + std::string getBaggage(absl::string_view) { + /* not implemented */ + return EMPTY_STRING; + }; + std::string getTraceIdAsHex() const { + /* not implemented */ + return EMPTY_STRING; + }; + + Tracing::SpanPtr spawnChild(const Tracing::Config&, const std::string& operation_name, + SystemTime) { + return std::make_unique(operation_name, expected_spans_); + } + + void finishSpan() { finished_ = true; } + +private: + std::string operation_name_; + ExpectedSpansSharedPtr expected_spans_; + + std::map tags_; + bool context_injected_; + bool sampled_; + bool finished_; +}; + +class Driver : public Tracing::Driver, Logger::Loggable { +public: + Driver(const test::integration::filters::TracerTestConfig& test_config, + Server::Configuration::CommonFactoryContext&) + : expected_spans_(std::make_shared>()) { + for (auto expected_span : test_config.expect_spans()) { + ExpectedSpan span; + span.operation_name = expected_span.operation_name(); + span.sampled = expected_span.sampled(); + span.context_injected = expected_span.context_injected(); + span.tags.insert(expected_span.tags().begin(), expected_span.tags().end()); + expected_spans_->push_back(span); + }; + }; + // Tracing::Driver + Tracing::SpanPtr startSpan(const Tracing::Config&, Tracing::TraceContext&, + const StreamInfo::StreamInfo&, const std::string& operation_name, + Tracing::Decision) override { + + return std::make_unique(operation_name, expected_spans_); + }; + + ~Driver() { + for (auto& span : *expected_spans_) { + EXPECT_TRUE(span.tested) << fmt::format("missing span with operation '{}'", + span.operation_name); + } + }; + +private: + ExpectedSpansSharedPtr expected_spans_; +}; + +class TracerTestFactory + : public Tracers::Common::FactoryBase { +public: + TracerTestFactory(); + +private: + // FactoryBase + Tracing::DriverSharedPtr + createTracerDriverTyped(const test::integration::filters::TracerTestConfig& test_config, + Server::Configuration::TracerFactoryContext& context) override { + return std::make_shared(test_config, context.serverFactoryContext()); + }; +}; + +TracerTestFactory::TracerTestFactory() : FactoryBase("tracer-test-filter") {} + +REGISTER_FACTORY(TracerTestFactory, Server::Configuration::TracerFactory); + +} // namespace ExternalProcessing +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/tracer_test_filter.proto b/test/extensions/filters/http/ext_proc/tracer_test_filter.proto new file mode 100644 index 000000000000..5f323ff8d254 --- /dev/null +++ b/test/extensions/filters/http/ext_proc/tracer_test_filter.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package test.integration.filters; + +message ExpectSpan { + string operation_name = 1; + bool sampled = 2; + bool context_injected = 3; + map tags = 4; +} + +message TracerTestConfig { + repeated ExpectSpan expect_spans = 1; +} diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index 464c271452d0..547f0ae2ea7a 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -95,24 +95,23 @@ DEFINE_PROTO_FUZZER( filter->setEncoderFilterCallbacks(mocks.encoder_callbacks_); EXPECT_CALL(*client, start(_, _, _)) - .WillRepeatedly(Invoke( - [&](ExternalProcessing::ExternalProcessorCallbacks&, - const Grpc::GrpcServiceConfigWithHashKey&, - const StreamInfo::StreamInfo&) -> ExternalProcessing::ExternalProcessorStreamPtr { - auto stream = std::make_unique(); - EXPECT_CALL(*stream, send(_, _)) - .WillRepeatedly( - Invoke([&](envoy::service::ext_proc::v3::ProcessingRequest&&, bool) -> void { - auto response = - std::make_unique( - input.response()); - filter->onReceiveMessage(std::move(response)); - })); - EXPECT_CALL(*stream, streamInfo()) - .WillRepeatedly(ReturnRef(mocks.async_client_stream_info_)); - EXPECT_CALL(*stream, close()).WillRepeatedly(Return(false)); - return stream; - })); + .WillRepeatedly(Invoke([&](ExternalProcessing::ExternalProcessorCallbacks&, + const Grpc::GrpcServiceConfigWithHashKey&, + const Envoy::Http::AsyncClient::StreamOptions&) + -> ExternalProcessing::ExternalProcessorStreamPtr { + auto stream = std::make_unique(); + EXPECT_CALL(*stream, send(_, _)) + .WillRepeatedly(Invoke([&](envoy::service::ext_proc::v3::ProcessingRequest&&, + bool) -> void { + auto response = std::make_unique( + input.response()); + filter->onReceiveMessage(std::move(response)); + })); + EXPECT_CALL(*stream, streamInfo()) + .WillRepeatedly(ReturnRef(mocks.async_client_stream_info_)); + EXPECT_CALL(*stream, close()).WillRepeatedly(Return(false)); + return stream; + })); Envoy::Extensions::HttpFilters::HttpFilterFuzzer fuzzer; fuzzer.runData(static_cast(filter.get()), input.request()); diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h index 207b820e21df..17c31784d9e4 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h @@ -31,7 +31,7 @@ class MockClient : public ExternalProcessing::ExternalProcessorClient { MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start, (ExternalProcessing::ExternalProcessorCallbacks & callbacks, const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, - const StreamInfo::StreamInfo& stream_info)); + const Envoy::Http::AsyncClient::StreamOptions&)); }; } // namespace UnitTestFuzz From e383e8e13987118dbf89ec6e77b0cd627648fb93 Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Wed, 22 May 2024 18:53:08 -0500 Subject: [PATCH 3/7] mobile: Fix xDS integration tests (#34309) This fixes the xDS related tests due to a missing SSL context. Risk Level: low Testing: integration tests Docs Changes: n/a Release Notes: n/a Platform Specific Features: n/a Signed-off-by: Fredy Wijaya --- mobile/test/common/integration/xds_integration_test.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mobile/test/common/integration/xds_integration_test.cc b/mobile/test/common/integration/xds_integration_test.cc index 1ede54812e4b..605936636741 100644 --- a/mobile/test/common/integration/xds_integration_test.cc +++ b/mobile/test/common/integration/xds_integration_test.cc @@ -3,6 +3,8 @@ #include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/config/cluster/v3/cluster.pb.h" +#include "source/common/tls/server_context_impl.h" + #include "test/common/grpc/grpc_client_integration.h" #include "test/common/integration/base_client_integration_test.h" #include "test/test_common/environment.h" @@ -26,6 +28,8 @@ void XdsIntegrationTest::initialize() { // Register the extensions required for Envoy Mobile. ExtensionRegistry::registerFactories(); + // For server TLS. + Extensions::TransportSockets::Tls::forceRegisterServerContextFactoryImpl(); if (sotw_or_delta_ == Grpc::SotwOrDelta::UnifiedSotw || sotw_or_delta_ == Grpc::SotwOrDelta::UnifiedDelta) { From c28f01c37a0ebd1cb48cab194ea75c1a5c63fbf0 Mon Sep 17 00:00:00 2001 From: zirain Date: Thu, 23 May 2024 08:30:12 +0800 Subject: [PATCH 4/7] accesslog: add %TRACE_ID% formatter (#34198) Risk Level: low Testing: unit test Docs Changes: formatter documented Release Notes: add Fixes #34102 Signed-off-by: zirain --- changelogs/current.yaml | 3 ++ .../observability/access_log/usage.rst | 6 ++++ .../formatter/http_specific_formatter.cc | 23 +++++++++++++ .../formatter/http_specific_formatter.h | 13 ++++++++ .../formatter/substitution_formatter_test.cc | 32 +++++++++++++++++++ 5 files changed, 77 insertions(+) diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 44cf3692d08e..54f77ee15a56 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -182,6 +182,9 @@ new_features: change: | added support for :ref:`%UPSTREAM_HOST_NAME% ` for the upstream host identifier. +- area: access_loggers + change: | + Added ``TRACE_ID`` :ref:`access log formatter `. - area: healthcheck change: | Added support to healthcheck with ProxyProtocol in TCP Healthcheck by setting diff --git a/docs/root/configuration/observability/access_log/usage.rst b/docs/root/configuration/observability/access_log/usage.rst index 1b0440a1c607..cd2a68ad104f 100644 --- a/docs/root/configuration/observability/access_log/usage.rst +++ b/docs/root/configuration/observability/access_log/usage.rst @@ -1216,3 +1216,9 @@ UDP %ENVIRONMENT(X):Z% Environment value of environment variable X. If no valid environment variable X, '-' symbol will be used. Z is an optional parameter denoting string truncation up to Z characters long. + +%TRACE_ID% + HTTP + The trace ID of the request. If the request does not have a trace ID, this will be an empty string. + TCP/UDP + Not implemented ("-"). diff --git a/source/common/formatter/http_specific_formatter.cc b/source/common/formatter/http_specific_formatter.cc index 4ff9dd4fceae..7623c4b95ac9 100644 --- a/source/common/formatter/http_specific_formatter.cc +++ b/source/common/formatter/http_specific_formatter.cc @@ -197,6 +197,25 @@ HeadersByteSizeFormatter::formatValueWithContext(const HttpFormatterContext& con context.requestHeaders(), context.responseHeaders(), context.responseTrailers())); } +ProtobufWkt::Value TraceIDFormatter::formatValueWithContext(const HttpFormatterContext& context, + const StreamInfo::StreamInfo&) const { + auto trace_id = context.activeSpan().getTraceIdAsHex(); + if (trace_id.empty()) { + return SubstitutionFormatUtils::unspecifiedValue(); + } + return ValueUtil::stringValue(trace_id); +} + +absl::optional +TraceIDFormatter::formatWithContext(const HttpFormatterContext& context, + const StreamInfo::StreamInfo&) const { + auto trace_id = context.activeSpan().getTraceIdAsHex(); + if (trace_id.empty()) { + return absl::nullopt; + } + return trace_id; +} + GrpcStatusFormatter::Format GrpcStatusFormatter::parseFormat(absl::string_view format) { if (format.empty() || format == "CAMEL_STRING") { return GrpcStatusFormatter::CamelString; @@ -390,6 +409,10 @@ HttpBuiltInCommandParser::getKnownFormatters() { return std::make_unique(main_header, alternative_header, max_length); + }}}, + {"TRACE_ID", + {CommandSyntaxChecker::COMMAND_ONLY, [](const std::string&, absl::optional&) { + return std::make_unique(); }}}}); } diff --git a/source/common/formatter/http_specific_formatter.h b/source/common/formatter/http_specific_formatter.h index 106355aeb449..890322ff0c04 100644 --- a/source/common/formatter/http_specific_formatter.h +++ b/source/common/formatter/http_specific_formatter.h @@ -143,6 +143,19 @@ class ResponseTrailerFormatter : public FormatterProvider, HeaderFormatter { const StreamInfo::StreamInfo& stream_info) const override; }; +/** + * FormatterProvider for trace ID. + */ +class TraceIDFormatter : public FormatterProvider { +public: + absl::optional + formatWithContext(const HttpFormatterContext& context, + const StreamInfo::StreamInfo& stream_info) const override; + ProtobufWkt::Value + formatValueWithContext(const HttpFormatterContext& context, + const StreamInfo::StreamInfo& stream_info) const override; +}; + class GrpcStatusFormatter : public FormatterProvider, HeaderFormatter { public: enum Format { diff --git a/test/common/formatter/substitution_formatter_test.cc b/test/common/formatter/substitution_formatter_test.cc index b24cec5eae96..ef0b1704d9b4 100644 --- a/test/common/formatter/substitution_formatter_test.cc +++ b/test/common/formatter/substitution_formatter_test.cc @@ -25,6 +25,7 @@ #include "test/mocks/network/mocks.h" #include "test/mocks/ssl/mocks.h" #include "test/mocks/stream_info/mocks.h" +#include "test/mocks/tracing/mocks.h" #include "test/mocks/upstream/cluster_info.h" #include "test/test_common/environment.h" #include "test/test_common/printers.h" @@ -2144,6 +2145,37 @@ TEST(SubstitutionFormatterTest, responseTrailerFormatter) { } } +TEST(SubstitutionFormatterTest, TraceIDFormatter) { + StreamInfo::MockStreamInfo stream_info; + Http::TestRequestHeaderMapImpl request_header{}; + Http::TestResponseHeaderMapImpl response_header{}; + Http::TestResponseTrailerMapImpl response_trailer{}; + std::string body; + + Tracing::MockSpan active_span; + EXPECT_CALL(active_span, getTraceIdAsHex()) + .WillRepeatedly(Return("ae0046f9075194306d7de2931bd38ce3")); + + { + HttpFormatterContext formatter_context(&request_header, &response_header, &response_trailer, + body, AccessLogType::NotSet, &active_span); + TraceIDFormatter formatter{}; + EXPECT_EQ("ae0046f9075194306d7de2931bd38ce3", + formatter.formatWithContext(formatter_context, stream_info)); + EXPECT_THAT(formatter.formatValueWithContext(formatter_context, stream_info), + ProtoEq(ValueUtil::stringValue("ae0046f9075194306d7de2931bd38ce3"))); + } + + { + HttpFormatterContext formatter_context(&request_header, &response_header, &response_trailer, + body); + TraceIDFormatter formatter{}; + EXPECT_EQ(absl::nullopt, formatter.formatWithContext(formatter_context, stream_info)); + EXPECT_THAT(formatter.formatValueWithContext(formatter_context, stream_info), + ProtoEq(ValueUtil::nullValue())); + } +} + /** * Populate a metadata object with the following test data: * "com.test": {"test_key":"test_value","test_obj":{"inner_key":"inner_value"}} From ec198284cc6acac997a2f6ebf3d1afef3618ba2e Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Wed, 22 May 2024 21:03:25 -0500 Subject: [PATCH 5/7] mobile: Update a comment about on_data_ callback (#34312) This PR updates the comment for on_data_ callback related to the length. Risk Level: low (comment update only) Testing: n/a Docs Changes: n/a Release Notes: n/a Platform Specific Features: n/a Signed-off-by: Fredy Wijaya --- mobile/library/common/engine_types.h | 1 + 1 file changed, 1 insertion(+) diff --git a/mobile/library/common/engine_types.h b/mobile/library/common/engine_types.h index 34c32675af4c..a923751d69c0 100644 --- a/mobile/library/common/engine_types.h +++ b/mobile/library/common/engine_types.h @@ -64,6 +64,7 @@ struct EnvoyStreamCallbacks { * * The callback function pases the following parameters. * - buffer: the data received. + * - length: the length of data to read. It will always be <= `buffer.length()` * - end_stream: whether the data is the last data frame. * - stream_intel: contains internal stream metrics. */ From c7ce725c781966fa3fb391b3017a033fb50ce57e Mon Sep 17 00:00:00 2001 From: Raven Black Date: Thu, 23 May 2024 08:39:24 -0400 Subject: [PATCH 6/7] "Compiled" static header maps instead of big trie (#33932) Commit Message: "Compiled" static header maps instead of big trie Additional Description: This is slightly slower for "misses" but significantly faster for "hits". Given that the majority of headers are expected to be matches for the static table, this should be a big win. (Each faster hit pays for 30 slower misses!) This structure also uses significantly less memory than the big trie. Performance benchmarks detailed in https://github.com/envoyproxy/envoy/pull/33932 Risk Level: Could be high since headers are parsed millions of times per second. But also there's a lot of existing test cases. Testing: Added tests, existing tests involving headers will also provide coverage. Docs Changes: n/a Release Notes: Not yet Platform Specific Features: n/a Signed-off-by: Raven Black --- source/common/common/BUILD | 12 + source/common/common/compiled_string_map.h | 224 ++++++++++++++++++ source/common/common/compiled_string_map.md | 144 +++++++++++ source/common/http/BUILD | 1 + source/common/http/header_map_impl.cc | 16 +- source/common/http/header_map_impl.h | 13 +- test/common/common/BUILD | 5 + .../common/common/compiled_string_map_test.cc | 38 +++ 8 files changed, 443 insertions(+), 10 deletions(-) create mode 100644 source/common/common/compiled_string_map.h create mode 100644 source/common/common/compiled_string_map.md create mode 100644 test/common/common/compiled_string_map_test.cc diff --git a/source/common/common/BUILD b/source/common/common/BUILD index 67d035aed07b..cb4cb765b110 100644 --- a/source/common/common/BUILD +++ b/source/common/common/BUILD @@ -473,6 +473,18 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "compiled_string_map_lib", + hdrs = ["compiled_string_map.h"], + external_deps = [ + "abseil_strings", + ], + deps = [ + "//envoy/common:pure_lib", + "//source/common/common:assert_lib", + ], +) + envoy_cc_library( name = "packed_struct_lib", hdrs = ["packed_struct.h"], diff --git a/source/common/common/compiled_string_map.h b/source/common/common/compiled_string_map.h new file mode 100644 index 000000000000..a1f687ea173c --- /dev/null +++ b/source/common/common/compiled_string_map.h @@ -0,0 +1,224 @@ +#pragma once + +#include +#include +#include +#include + +#include "envoy/common/pure.h" + +#include "source/common/common/assert.h" + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" + +namespace Envoy { + +/** + * This is a specialized structure intended for static header maps, but + * there may be other use cases. + * + * See `compiled_string_map.md` for details. + */ +template class CompiledStringMap { + class Node { + public: + // While it is usual to take a string_view by value, in this + // performance-critical context with repeatedly passing the same + // value, passing it by reference benchmarks out slightly faster. + virtual Value find(const absl::string_view& key) PURE; + virtual ~Node() = default; + }; + + class LeafNode : public Node { + public: + LeafNode(absl::string_view key, Value&& value) : key_(key), value_(std::move(value)) {} + Value find(const absl::string_view& key) override { + // String comparison unnecessarily checks size equality first, we can skip + // to memcmp here because we already know the sizes are equal. + // Since this is a super-hot path we don't even ASSERT here, to avoid adding + // slowdown in debug builds. + if (memcmp(key.data(), key_.data(), key.size())) { + return {}; + } + return value_; + } + + private: + const std::string key_; + const Value value_; + }; + + class BranchNode : public Node { + public: + BranchNode(size_t index, uint8_t min, std::vector>&& branches) + : index_(index), min_(min), branches_(std::move(branches)) {} + Value find(const absl::string_view& key) override { + const uint8_t k = static_cast(key[index_]); + // Possible optimization was tried here, populating empty nodes with + // a function that returns {} to reduce branching vs checking for null + // nodes. Checking for null nodes benchmarked faster. + if (k < min_ || k >= min_ + branches_.size() || branches_[k - min_] == nullptr) { + return {}; + } + return branches_[k - min_]->find(key); + } + + private: + const size_t index_; + const uint8_t min_; + // Possible optimization was tried here, using std::array, 256> + // rather than a smaller-range vector with bounds, to keep locality and reduce + // comparisons. It didn't help. + const std::vector> branches_; + }; + +public: + // The caller owns the string-views during `compile`. Ownership of the passed in + // Values is transferred to the CompiledStringMap. + using KV = std::pair; + /** + * Returns the value with a matching key, or the default value + * (typically nullptr) if the key was not present. + * @param key the key to look up. + */ + Value find(absl::string_view key) const { + const size_t key_size = key.size(); + // Theoretically we could also bottom-cap the size range, but the + // cost of the extra comparison and operation would almost certainly + // outweigh the benefit of omitting 4 or 5 entries. + if (key_size >= table_.size() || table_[key_size] == nullptr) { + return {}; + } + return table_[key_size]->find(key); + }; + /** + * Construct the lookup table. This can be a somewhat slow multi-pass + * operation if the input table is large. + * @param contents a vector of key->value pairs. This is taken by value because + * we're going to modify it. If the caller still wants the original + * then it can be copied in, if not it can be moved in. + * Note that the keys are string_views - the base string data must + * exist for the duration of compile(). The leaf nodes take copies + * of the key strings, so the string_views can be invalidated once + * compile has completed. + */ + void compile(std::vector contents) { + if (contents.empty()) { + return; + } + std::sort(contents.begin(), contents.end(), + [](const KV& a, const KV& b) { return a.first.size() < b.first.size(); }); + const size_t longest = contents.back().first.size(); + // A key length of 0 is possible, and also we don't want to have to + // subtract [min length] every time we index, so the table size must + // be one larger than the longest key. + table_.resize(longest + 1); + auto range_start = contents.begin(); + // Populate the sub-nodes for each length of key that exists. + while (range_start != contents.end()) { + // Find the first key whose length differs from the current key length. + // Everything in between is keys with the same length. + const auto range_end = + std::find_if(range_start, contents.end(), [len = range_start->first.size()](const KV& e) { + return e.first.size() != len; + }); + std::vector node_contents; + // Populate a Node for the entries in that range. + node_contents.reserve(range_end - range_start); + std::move(range_start, range_end, std::back_inserter(node_contents)); + table_[range_start->first.size()] = createEqualLengthNode(node_contents); + range_start = range_end; + } + } + +private: + /** + * Details of a node branch point; the index into the string at which + * characters should be looked up, the lowest valued character in the + * branch, the highest valued character in the branch, and how many + * branches there are. + */ + struct IndexSplitInfo { + // The index to the character being considered for this split. + size_t index_; + // The smallest character value that appears at this index. + uint8_t min_; + // The largest character value that appears at this index. + uint8_t max_; + // The number of distinct characters that appear at this index. + uint8_t count_; + size_t size() const { return max_ - min_ + 1; } + size_t offsetOf(uint8_t c) const { return c - min_; } + }; + + /** + * @param node_contents the key-value pairs to be branched upon. + * @return details of the index on which the node should branch + * - the index which produces the most child branches. + */ + static IndexSplitInfo findBestSplitPoint(const std::vector& node_contents) { + ASSERT(node_contents.size() > 1); + IndexSplitInfo best{0, 0, 0, 0}; + const size_t key_length = node_contents[0].first.size(); + for (size_t i = 0; i < key_length; i++) { + std::array hits{}; + IndexSplitInfo info{static_cast(i), 255, 0, 0}; + for (const KV& pair : node_contents) { + uint8_t v = pair.first[i]; + if (!hits[v]) { + hits[v] = true; + info.count_++; + info.min_ = std::min(v, info.min_); + info.max_ = std::max(v, info.max_); + } + } + if (info.count_ > best.count_) { + best = info; + } + } + ASSERT(best.count_ > 1, absl::StrCat("duplicate key: ", node_contents[0].first)); + return best; + } + + /* + * @param node_contents the set of key-value pairs that will be children of + * this node. + * @return the recursively generated tree node that leads to all of node_contents. + * If there is only one entry in node_contents then a LeafNode, otherwise a BranchNode. + */ + static std::unique_ptr createEqualLengthNode(std::vector node_contents) { + if (node_contents.size() == 1) { + return std::make_unique(node_contents[0].first, std::move(node_contents[0].second)); + } + // best contains the index at which this node should be split, + // and the smallest and largest character values that occur at + // that index across all the keys in node_contents. + const IndexSplitInfo best = findBestSplitPoint(node_contents); + std::vector> nodes; + nodes.resize(best.size()); + std::sort(node_contents.begin(), node_contents.end(), + [index = best.index_](const KV& a, const KV& b) { + return a.first[index] < b.first[index]; + }); + auto range_start = node_contents.begin(); + // Populate the sub-nodes for each character-branch. + while (range_start != node_contents.end()) { + // Find the first key whose character at position [best.index_] differs from the + // character of the current range. + // Everything in the range has keys with the same character at this index. + auto range_end = std::find_if(range_start, node_contents.end(), + [index = best.index_, c = range_start->first[best.index_]]( + const KV& e) { return e.first[index] != c; }); + std::vector next_contents; + next_contents.reserve(range_end - range_start); + std::move(range_start, range_end, std::back_inserter(next_contents)); + nodes[best.offsetOf(range_start->first[best.index_])] = createEqualLengthNode(next_contents); + range_start = range_end; + } + return std::make_unique(best.index_, best.min_, std::move(nodes)); + } + std::vector> table_; +}; + +} // namespace Envoy diff --git a/source/common/common/compiled_string_map.md b/source/common/common/compiled_string_map.md new file mode 100644 index 000000000000..4740f3b80ed7 --- /dev/null +++ b/source/common/common/compiled_string_map.md @@ -0,0 +1,144 @@ +# Compiled string map algorithm + +## The trie-like structure + +The data structure consists of: +1. a length branch node - strings are grouped by length and a zero-indexed length lookup table is generated. Entries in the table may be nullptr, indicating there are no strings of that length, or a pointer to another node. +2. a branch node - similar to a standard trie, a node branches by a set of characters at an index. Unlike a standard trie, the index is not the "first" index, but instead is the index at which the most branches would be generated. The node contains the index to be branched on, the lowest character value in the branches, and a vector from lowest-to-highest character value, e.g. if branches c and f exist, a vector representing [c][d][e][f] will be in the node, with the [d] and [e] branches being nullptr. +3. a leaf node. Leaf nodes contain the entire string for final validation, and the value to be returned if the search key matches the string. + +## The compile step + +``` + ┌───────────────────┐ + │ x-envoy-banana │ + │ x-envoy-pineapple │ + │ x-envoy-babana │ + │ x-envoy-grape │ + │ x-envoy-bacana ├──┐ + │ x-envoy-banara │ │ + │ something-else │ │ + └───────────────────┘ │ + ▼ + split by length + │ + │ + │ ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ + └─►│ 0│ 1│ 2│ 3│ 4│ 5│ 6│ 7│ 8│ 9│10│11│12│13│14│15│16│17│ + └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴┬─┴┬─┴──┴──┴┬─┘ + │ │ │ + ┌────────────────────────────────────────────────────────────┘ │ │ + │ │ │ + ▼ ┌──────────────────────────────────────────────┘ │ +┌─────────────┐ │ ▼ +│x-envoy-grape│ │ ┌─────────────────┐ +└─────────────┘ │ │x-envoy-pineapple│ + ▼ └─────────────────┘ + ┌────────────────┐ + │ x-envoy-banana │ + │ x-envoy-babana │ + │ x-envoy-bacana │ + │ x-envoy-banara │ + │ something-else │ + └─┬──────────────┘ + │ + │ Find best branch index (maximum unique branches) + │ + │ + ▼ + + x-envoy-banana + b r + c + something-else + + 22222222224232 + + ^ best index is here with 4 branches, n b c and e + │ + │ + │ + ▼ branch node at position 10, index 0 = b + ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ + │b │c │d │e │f │g │h │i │j │k │l │m │n │ + └┬─┴┬─┴──┴┬─┴──┴──┴──┴──┴──┴──┴──┴──┴┬─┘ + │ │ │ │ + ▼ │ ▼ ▼ +x-envoy-babana │ something-else ┌────────────────┐ + ▼ │ x-envoy-banana │ + x-envoy-bacana │ x-envoy-banara │ + └───────┬────────┘ + │ + ▼ + Find best index + it's position 12 with 2 branches + │ + branch node at position 12, index 0 = n ▼ + ┌──┬──┬──┬──┬──┐ + │n │o │p │q │r │ + └┬─┴──┴──┴──┴┬─┘ + │ │ + ▼ ▼ + x-envoy-banana x-envoy-banara +``` + +## The lookup + +A lookup operation simply walks the generated tree in much the same way as a regular trie. +For example, given the tree generated above, if you were to search for the header `sponge`, +the length lookup would find nullptr at index 6, and the resulting value would be the null +value. + +If you were to search for `x-envoy-banaka`, the length would find the branch with most of +the entries, the index 10 branch would take branch `n`, and the index 12 branch would see that +`k` is lower than the minimum index in the vector, so the result would be the null value. + +If you were to search for `y-envoy-banara`, the length would find the branch with most of +the entries, the index 10 branch would take branch `n`, the index 12 branch would take +branch `r`, and the leaf node would do a final string compare, see that +`y-envoy-barana` != `x-envoy-banara`, and the result would be the null value. + +## Performance + +In most cases this will be faster than a regular trie, especially for hits - for example, +instead of 14 steps to match `x-envoy-banana` in the example above, it takes 3 and a +final memory comparison (which is much faster than one character at a time). For misses, +some cases will be faster (when the target shares a prefix with an entry), but some will +be very slightly slower due to more expensive comparisons and dynamic function selection +in the compiled version. + +Benchmark versus a fixed-size 256-branch trie, for the static header map - hit benchmark +searches for the full range of static headers, miss benchmark used a small set of arbitrary +non-matching headers: + +Benchmark | trie | compiled | trieStdDev | compiledStdDev | change +-- | -- | -- | -- | -- | -- +bmHeaderMapImplRequestStaticLookupHits | 47.2ns | 16.4ns | 0.629 | 0.378 | -65.3% +bmHeaderMapImplResponseStaticLookupHits | 34.7ns | 14.3ns | 0.571 | 0.085 | -58.8% +bmHeaderMapImplRequestStaticLookupMisses | 6.89ns | 6.83ns | 0.044 | 0.034 | -0.01% +bmHeaderMapImplResponseStaticLookupMisses | 6.40ns | 7.31ns | 0.028 | 0.057 | +14.2% + +Versus an `absl::flat_hash_map`, hits are slightly faster and misses are significantly +faster. (Benchmark has been lost to time.) + +In terms of memory, unlike a regular trie, this structure must contain the full keys +in the leaf nodes, which could potentially make it larger due to, e.g. containing +`x-envoy-` multiple times where a regular trie only has one 'branch' for that. +However, it also contains fewer nodes due to not having a node for every character, +and compared to the prior fixed-size-node 256-branch trie, each node is a lot smaller; +each node in the prior trie used 8kb, versus an average node in this trie uses less +than 60 bytes. + +## Limitations + +This structure is only useful for non-dynamic data - the cost of the compile step +will outweigh any lookup benefits if the contents need to be modified. + +Unlike a regular trie, this structure does not facilitate prefix-matching - you +can't find a "nearest prefix" or a "longest common prefix". + +A `gperf` hash would likely be faster than this for hits and slightly slower for +misses, but also has the additional constraint that the contents must be known +before binary-compile-time; envoy's use-case supports extensions dynamically +adding to the contents, which therefore precludes `gperf`, or at least makes it +impractical. diff --git a/source/common/http/BUILD b/source/common/http/BUILD index c7d29d9bcd15..0ea7a19241e7 100644 --- a/source/common/http/BUILD +++ b/source/common/http/BUILD @@ -449,6 +449,7 @@ envoy_cc_library( ":headers_lib", "//envoy/http:header_map_interface", "//source/common/common:assert_lib", + "//source/common/common:compiled_string_map_lib", "//source/common/common:dump_state_utils", "//source/common/common:empty_string", "//source/common/common:non_copyable", diff --git a/source/common/http/header_map_impl.cc b/source/common/http/header_map_impl.cc index 7b4827e673c9..4a6e8ac2c824 100644 --- a/source/common/http/header_map_impl.cc +++ b/source/common/http/header_map_impl.cc @@ -119,19 +119,21 @@ template <> HeaderMapImpl::StaticLookupTable::StaticLookupTabl INLINE_REQ_HEADERS(REGISTER_DEFAULT_REQUEST_HEADER) INLINE_REQ_RESP_HEADERS(REGISTER_DEFAULT_REQUEST_HEADER) - finalizeTable(); + auto input = finalizedTable(); // Special case where we map a legacy host header to :authority. const auto handle = CustomInlineHeaderRegistry::getInlineHeader( Headers::get().Host); - add(Headers::get().HostLegacy.get().c_str(), [handle](HeaderMapImpl& h) -> StaticLookupResponse { - return {&h.inlineHeaders()[handle.value().it_->second], &handle.value().it_->first}; - }); + input.emplace_back( + Headers::get().HostLegacy.get(), [handle](HeaderMapImpl& h) -> StaticLookupResponse { + return {&h.inlineHeaders()[handle.value().it_->second], &handle.value().it_->first}; + }); + compile(std::move(input)); } template <> HeaderMapImpl::StaticLookupTable::StaticLookupTable() { - finalizeTable(); + compile(finalizedTable()); } template <> HeaderMapImpl::StaticLookupTable::StaticLookupTable() { @@ -142,7 +144,7 @@ template <> HeaderMapImpl::StaticLookupTable::StaticLookupTab INLINE_REQ_RESP_HEADERS(REGISTER_RESPONSE_HEADER) INLINE_RESP_HEADERS_TRAILERS(REGISTER_RESPONSE_HEADER) - finalizeTable(); + compile(finalizedTable()); } template <> HeaderMapImpl::StaticLookupTable::StaticLookupTable() { @@ -151,7 +153,7 @@ template <> HeaderMapImpl::StaticLookupTable::StaticLookupTa Headers::get().name); INLINE_RESP_HEADERS_TRAILERS(REGISTER_RESPONSE_TRAILER) - finalizeTable(); + compile(finalizedTable()); } uint64_t HeaderMapImpl::appendToHeader(HeaderString& header, absl::string_view data, diff --git a/source/common/http/header_map_impl.h b/source/common/http/header_map_impl.h index 0b61be52c3e3..24ebc79bd414 100644 --- a/source/common/http/header_map_impl.h +++ b/source/common/http/header_map_impl.h @@ -11,6 +11,7 @@ #include "envoy/config/core/v3/base.pb.h" #include "envoy/http/header_map.h" +#include "source/common/common/compiled_string_map.h" #include "source/common/common/non_copyable.h" #include "source/common/common/utility.h" #include "source/common/http/headers.h" @@ -146,18 +147,21 @@ class HeaderMapImpl : NonCopyable { */ template struct StaticLookupTable - : public TrieLookupTable> { + : public CompiledStringMap> { StaticLookupTable(); - void finalizeTable() { + std::vector finalizedTable() { CustomInlineHeaderRegistry::finalize(); auto& headers = CustomInlineHeaderRegistry::headers(); size_ = headers.size(); + std::vector input; + input.reserve(size_); for (const auto& header : headers) { - this->add(header.first.get().c_str(), [&header](HeaderMapImpl& h) -> StaticLookupResponse { + input.emplace_back(header.first.get(), [&header](HeaderMapImpl& h) -> StaticLookupResponse { return {&h.inlineHeaders()[header.second], &header.first}; }); } + return input; } static size_t size() { @@ -181,6 +185,9 @@ class HeaderMapImpl : NonCopyable { } } + // This is the size of the number of callbacks; in the case of Requests, + // this is one smaller than the number of entries in the lookup table, + // because of legacy `host` mapping to the same thing as `:authority`. size_t size_; }; diff --git a/test/common/common/BUILD b/test/common/common/BUILD index ede00f4bc149..f5876ec08f19 100644 --- a/test/common/common/BUILD +++ b/test/common/common/BUILD @@ -281,6 +281,11 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "compiled_string_map_test", + srcs = ["compiled_string_map_test.cc"], +) + envoy_cc_test( name = "packed_struct_test", srcs = ["packed_struct_test.cc"], diff --git a/test/common/common/compiled_string_map_test.cc b/test/common/common/compiled_string_map_test.cc new file mode 100644 index 000000000000..7d9ab360beef --- /dev/null +++ b/test/common/common/compiled_string_map_test.cc @@ -0,0 +1,38 @@ +#include "source/common/common/compiled_string_map.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { + +using testing::IsNull; + +TEST(CompiledStringMapTest, FindsEntriesCorrectly) { + CompiledStringMap map; + map.compile({ + {"key-1", "value-1"}, + {"key-2", "value-2"}, + {"longer-key", "value-3"}, + {"bonger-key", "value-4"}, + {"bonger-bey", "value-5"}, + {"only-key-of-this-length", "value-6"}, + }); + EXPECT_EQ(map.find("key-1"), "value-1"); + EXPECT_EQ(map.find("key-2"), "value-2"); + EXPECT_THAT(map.find("key-0"), IsNull()); + EXPECT_THAT(map.find("key-3"), IsNull()); + EXPECT_EQ(map.find("longer-key"), "value-3"); + EXPECT_EQ(map.find("bonger-key"), "value-4"); + EXPECT_EQ(map.find("bonger-bey"), "value-5"); + EXPECT_EQ(map.find("only-key-of-this-length"), "value-6"); + EXPECT_THAT(map.find("songer-key"), IsNull()); + EXPECT_THAT(map.find("absent-length-key"), IsNull()); +} + +TEST(CompiledStringMapTest, EmptyMapReturnsNull) { + CompiledStringMap map; + map.compile({}); + EXPECT_THAT(map.find("key-1"), IsNull()); +} + +} // namespace Envoy From eda7d32bd54d95008b82f2cc7327d63dbd887df0 Mon Sep 17 00:00:00 2001 From: alyssawilk Date: Thu, 23 May 2024 08:39:53 -0400 Subject: [PATCH 7/7] clusters: cleaning up some exceptions (#34248) Risk Level: low Testing: updated tests Docs Changes: n/a Release Notes: n/a envoyproxy/envoy-mobile#176 Signed-off-by: Alyssa Wilk --- source/common/upstream/upstream_impl.cc | 117 ++++++++++-------- source/common/upstream/upstream_impl.h | 16 +-- source/extensions/clusters/eds/eds.cc | 10 +- .../clusters/static/static_cluster.cc | 6 +- .../clusters/strict_dns/strict_dns_cluster.cc | 2 +- 5 files changed, 86 insertions(+), 65 deletions(-) diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 3a7715aab12a..2f59eaf52ffa 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -97,7 +97,7 @@ createProtocolOptionsConfig(const std::string& name, const ProtobufWkt::Any& typ } if (factory == nullptr) { - throwEnvoyExceptionOrPanic( + return absl::InvalidArgumentError( fmt::format("Didn't find a registered network or http filter or protocol " "options implementation for name: '{}'", name)); @@ -106,7 +106,8 @@ createProtocolOptionsConfig(const std::string& name, const ProtobufWkt::Any& typ ProtobufTypes::MessagePtr proto_config = factory->createEmptyProtocolOptionsProto(); if (proto_config == nullptr) { - throwEnvoyExceptionOrPanic(fmt::format("filter {} does not support protocol options", name)); + return absl::InvalidArgumentError( + fmt::format("filter {} does not support protocol options", name)); } Envoy::Config::Utility::translateOpaqueConfig( @@ -219,7 +220,7 @@ buildClusterSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_con return cluster_options; } -std::vector<::Envoy::Upstream::UpstreamLocalAddress> +absl::StatusOr> parseBindConfig(::Envoy::OptRef bind_config, const absl::optional& cluster_name, Network::ConnectionSocket::OptionsSharedPtr base_socket_options, @@ -233,7 +234,7 @@ parseBindConfig(::Envoy::OptRef bind_ auto address_or_error = ::Envoy::Network::Address::resolveProtoSocketAddress(bind_config->source_address()); - THROW_IF_STATUS_NOT_OK(address_or_error, throw); + RETURN_IF_STATUS_NOT_OK(address_or_error); upstream_local_address.address_ = address_or_error.value(); } upstream_local_address.socket_options_ = std::make_shared(); @@ -249,7 +250,7 @@ parseBindConfig(::Envoy::OptRef bind_ UpstreamLocalAddress extra_upstream_local_address; auto address_or_error = ::Envoy::Network::Address::resolveProtoSocketAddress(extra_source_address.address()); - THROW_IF_STATUS_NOT_OK(address_or_error, throw); + RETURN_IF_STATUS_NOT_OK(address_or_error); extra_upstream_local_address.address_ = address_or_error.value(); extra_upstream_local_address.socket_options_ = @@ -273,7 +274,7 @@ parseBindConfig(::Envoy::OptRef bind_ UpstreamLocalAddress additional_upstream_local_address; auto address_or_error = ::Envoy::Network::Address::resolveProtoSocketAddress(additional_source_address); - THROW_IF_STATUS_NOT_OK(address_or_error, throw); + RETURN_IF_STATUS_NOT_OK(address_or_error); additional_upstream_local_address.address_ = address_or_error.value(); additional_upstream_local_address.socket_options_ = std::make_shared<::Envoy::Network::ConnectionSocket::Options>(); @@ -297,7 +298,7 @@ parseBindConfig(::Envoy::OptRef bind_ if (upstream_local_addresses.size() > 1) { for (auto const& upstream_local_address : upstream_local_addresses) { if (upstream_local_address.address_ == nullptr) { - throwEnvoyExceptionOrPanic(fmt::format( + return absl::InvalidArgumentError(fmt::format( "{}'s upstream binding config has invalid IP addresses.", !(cluster_name.has_value()) ? "Bootstrap" : fmt::format("Cluster {}", cluster_name.value()))); @@ -308,7 +309,8 @@ parseBindConfig(::Envoy::OptRef bind_ return upstream_local_addresses; } -Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr createUpstreamLocalAddressSelector( +absl::StatusOr +createUpstreamLocalAddressSelector( const envoy::config::cluster::v3::Cluster& cluster_config, const absl::optional& bootstrap_bind_config) { @@ -327,7 +329,7 @@ Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr createUpstreamLocalA if (bind_config.has_value()) { if (bind_config->additional_source_addresses_size() > 0 && bind_config->extra_source_addresses_size() > 0) { - throwEnvoyExceptionOrPanic(fmt::format( + return absl::InvalidArgumentError(fmt::format( "Can't specify both `extra_source_addresses` and `additional_source_addresses` " "in the {}'s upstream binding config", !(cluster_name.has_value()) ? "Bootstrap" @@ -337,7 +339,7 @@ Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr createUpstreamLocalA if (!bind_config->has_source_address() && (bind_config->extra_source_addresses_size() > 0 || bind_config->additional_source_addresses_size() > 0)) { - throwEnvoyExceptionOrPanic(fmt::format( + return absl::InvalidArgumentError(fmt::format( "{}'s upstream binding config has extra/additional source addresses but no " "source_address. Extra/additional addresses cannot be specified if " "source_address is not set.", @@ -363,15 +365,17 @@ Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr createUpstreamLocalA Config::Utility::getAndCheckFactory(typed_extension, false); } - auto selector_or_error = local_address_selector_factory->createLocalAddressSelector( + absl::StatusOr> config_or_error = parseBindConfig( bind_config, cluster_name, buildBaseSocketOptions(cluster_config, bootstrap_bind_config.value_or( envoy::config::core::v3::BindConfig{})), buildClusterSocketOptions(cluster_config, bootstrap_bind_config.value_or( - envoy::config::core::v3::BindConfig{}))), - cluster_name); - THROW_IF_STATUS_NOT_OK(selector_or_error, throw); + envoy::config::core::v3::BindConfig{}))); + RETURN_IF_STATUS_NOT_OK(config_or_error); + auto selector_or_error = local_address_selector_factory->createLocalAddressSelector( + config_or_error.value(), cluster_name); + RETURN_IF_STATUS_NOT_OK(selector_or_error); return selector_or_error.value(); } @@ -944,7 +948,7 @@ ClusterInfoImpl::generateTimeoutBudgetStats(Stats::Scope& scope, return {stat_names, scope}; } -std::shared_ptr +absl::StatusOr> createOptions(const envoy::config::cluster::v3::Cluster& config, std::shared_ptr&& options, ProtobufMessage::ValidationVisitor& validation_visitor) { @@ -955,7 +959,7 @@ createOptions(const envoy::config::cluster::v3::Cluster& config, if (config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_CONFIGURED_PROTOCOL) { // Make sure multiple protocol configurations are not present if (config.has_http_protocol_options() && config.has_http2_protocol_options()) { - throwEnvoyExceptionOrPanic( + return absl::InvalidArgumentError( fmt::format("cluster: Both HTTP1 and HTTP2 options may only be " "configured with non-default 'protocol_selection' values")); } @@ -972,7 +976,7 @@ createOptions(const envoy::config::cluster::v3::Cluster& config, config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_DOWNSTREAM_PROTOCOL, config.has_http2_protocol_options(), validation_visitor); - THROW_IF_STATUS_NOT_OK(options_or_error, throw); + RETURN_IF_STATUS_NOT_OK(options_or_error); return options_or_error.value(); } @@ -1060,11 +1064,12 @@ ClusterInfoImpl::ClusterInfoImpl( ? std::make_unique(config.eds_cluster_config().service_name()) : nullptr), extension_protocol_options_(parseExtensionProtocolOptions(config, factory_context)), - http_protocol_options_( + http_protocol_options_(THROW_OR_RETURN_VALUE( createOptions(config, extensionProtocolOptionsTyped( "envoy.extensions.upstreams.http.v3.HttpProtocolOptions"), - factory_context.messageValidationVisitor())), + factory_context.messageValidationVisitor()), + std::shared_ptr)), tcp_protocol_options_(extensionProtocolOptionsTyped( "envoy.extensions.upstreams.tcp.v3.TcpProtocolOptions")), max_requests_per_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( @@ -1097,7 +1102,9 @@ ClusterInfoImpl::ClusterInfoImpl( resource_managers_(config, runtime, name_, *stats_scope_, factory_context.clusterManager().clusterCircuitBreakersStatNames()), maintenance_mode_runtime_key_(absl::StrCat("upstream.maintenance_mode.", name_)), - upstream_local_address_selector_(createUpstreamLocalAddressSelector(config, bind_config)), + upstream_local_address_selector_( + THROW_OR_RETURN_VALUE(createUpstreamLocalAddressSelector(config, bind_config), + Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr)), upstream_config_(config.has_upstream_config() ? std::make_unique( config.upstream_config()) @@ -1170,7 +1177,7 @@ ClusterInfoImpl::ClusterInfoImpl( config.lb_policy() == envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG) { // If load_balancing_policy is set we will use it directly, ignoring lb_policy. - configureLbPolicies(config, server_context); + THROW_IF_NOT_OK(configureLbPolicies(config, server_context)); } else { // If load_balancing_policy is not set, we will try to convert legacy lb_policy // to load_balancing_policy and use it. @@ -1306,15 +1313,16 @@ ClusterInfoImpl::ClusterInfoImpl( } // Configures the load balancer based on config.load_balancing_policy -void ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Cluster& config, - Server::Configuration::ServerFactoryContext& context) { +absl::Status +ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Cluster& config, + Server::Configuration::ServerFactoryContext& context) { // Check if load_balancing_policy is set first. if (!config.has_load_balancing_policy()) { - throwEnvoyExceptionOrPanic("cluster: field load_balancing_policy need to be set"); + return absl::InvalidArgumentError("cluster: field load_balancing_policy need to be set"); } if (config.has_lb_subset_config()) { - throwEnvoyExceptionOrPanic( + return absl::InvalidArgumentError( "cluster: load_balancing_policy cannot be combined with lb_subset_config"); } @@ -1322,7 +1330,7 @@ void ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Clus const auto& lb_config = config.common_lb_config(); if (lb_config.has_zone_aware_lb_config() || lb_config.has_locality_weighted_lb_config() || lb_config.has_consistent_hashing_lb_config()) { - throwEnvoyExceptionOrPanic( + return absl::InvalidArgumentError( "cluster: load_balancing_policy cannot be combined with partial fields " "(zone_aware_lb_config, " "locality_weighted_lb_config, consistent_hashing_lb_config) of common_lb_config"); @@ -1350,11 +1358,12 @@ void ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Clus } if (load_balancer_factory_ == nullptr) { - throwEnvoyExceptionOrPanic( + return absl::InvalidArgumentError( fmt::format("cluster: didn't find a registered load balancer factory " "implementation for cluster: '{}' with names from [{}]", name_, absl::StrJoin(missing_policies, ", "))); } + return absl::OkStatus(); } ProtocolOptionsConfigConstSharedPtr @@ -1426,7 +1435,7 @@ ClusterInfoImpl::upstreamHttpProtocol(absl::optional downstream_ : Http::Protocol::Http11}; } -bool validateTransportSocketSupportsQuic( +absl::StatusOr validateTransportSocketSupportsQuic( const envoy::config::core::v3::TransportSocket& transport_socket) { // The transport socket is valid for QUIC if it is either a QUIC transport socket, // or if it is a QUIC transport socket wrapped in an HTTP/1.1 proxy socket. @@ -1438,7 +1447,7 @@ bool validateTransportSocketSupportsQuic( } envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport http11_socket; - THROW_IF_NOT_OK(MessageUtil::unpackTo(transport_socket.typed_config(), http11_socket)); + RETURN_IF_NOT_OK(MessageUtil::unpackTo(transport_socket.typed_config(), http11_socket)); return http11_socket.transport_socket().name() == "envoy.transport_sockets.quic"; } @@ -1500,7 +1509,8 @@ ClusterImplBase::ClusterImplBase(const envoy::config::cluster::v3::Cluster& clus if (info_->features() & ClusterInfoImpl::Features::HTTP3) { #if defined(ENVOY_ENABLE_QUIC) - if (!validateTransportSocketSupportsQuic(cluster.transport_socket())) { + if (!THROW_OR_RETURN_VALUE(validateTransportSocketSupportsQuic(cluster.transport_socket()), + bool)) { throwEnvoyExceptionOrPanic( fmt::format("HTTP3 requires a QuicUpstreamTransport transport socket: {} {}", cluster.name(), cluster.transport_socket().DebugString())); @@ -1772,32 +1782,35 @@ void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) { } } -const Network::Address::InstanceConstSharedPtr +absl::StatusOr ClusterImplBase::resolveProtoAddress(const envoy::config::core::v3::Address& address) { + absl::Status resolve_status; TRY_ASSERT_MAIN_THREAD { auto address_or_error = Network::Address::resolveProtoAddress(address); - THROW_IF_STATUS_NOT_OK(address_or_error, throw); - return address_or_error.value(); + if (address_or_error.value()) { + return address_or_error.value(); + } + resolve_status = address_or_error.status(); } END_TRY - CATCH(EnvoyException & e, { - if (info_->type() == envoy::config::cluster::v3::Cluster::STATIC || - info_->type() == envoy::config::cluster::v3::Cluster::EDS) { - throwEnvoyExceptionOrPanic( - fmt::format("{}. Consider setting resolver_name or setting cluster type " - "to 'STRICT_DNS' or 'LOGICAL_DNS'", - e.what())); - } - throw e; - }); + CATCH(EnvoyException & e, { resolve_status = absl::InvalidArgumentError(e.what()); }); + if (info_->type() == envoy::config::cluster::v3::Cluster::STATIC || + info_->type() == envoy::config::cluster::v3::Cluster::EDS) { + return absl::InvalidArgumentError( + fmt::format("{}. Consider setting resolver_name or setting cluster type " + "to 'STRICT_DNS' or 'LOGICAL_DNS'", + resolve_status.message())); + } + return resolve_status; } -void ClusterImplBase::validateEndpointsForZoneAwareRouting( +absl::Status ClusterImplBase::validateEndpointsForZoneAwareRouting( const envoy::config::endpoint::v3::LocalityLbEndpoints& endpoints) const { if (local_cluster_ && endpoints.priority() > 0) { - throwEnvoyExceptionOrPanic( + return absl::InvalidArgumentError( fmt::format("Unexpected non-zero priority for local cluster '{}'.", info()->name())); } + return absl::OkStatus(); } ClusterInfoImpl::OptionalClusterStats::OptionalClusterStats( @@ -1819,10 +1832,12 @@ ClusterInfoImpl::ResourceManagers::ResourceManagers( const std::string& cluster_name, Stats::Scope& stats_scope, const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names) : circuit_breakers_stat_names_(circuit_breakers_stat_names) { - managers_[enumToInt(ResourcePriority::Default)] = - load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::DEFAULT); - managers_[enumToInt(ResourcePriority::High)] = - load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::HIGH); + managers_[enumToInt(ResourcePriority::Default)] = THROW_OR_RETURN_VALUE( + load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::DEFAULT), + ResourceManagerImplPtr); + managers_[enumToInt(ResourcePriority::High)] = THROW_OR_RETURN_VALUE( + load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::HIGH), + ResourceManagerImplPtr); } ClusterCircuitBreakersStats @@ -1922,7 +1937,7 @@ ClusterInfoImpl::createSingletonUpstreamNetworkFilterConfigProviderManager( [] { return std::make_shared(); }); } -ResourceManagerImplPtr +absl::StatusOr ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime, const std::string& cluster_name, Stats::Scope& stats_scope, @@ -1983,7 +1998,7 @@ ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluste if (per_host_it->has_max_pending_requests() || per_host_it->has_max_requests() || per_host_it->has_max_retries() || per_host_it->has_max_connection_pools() || per_host_it->has_retry_budget()) { - throwEnvoyExceptionOrPanic("Unsupported field in per_host_thresholds"); + return absl::InvalidArgumentError("Unsupported field in per_host_thresholds"); } if (per_host_it->has_max_connections()) { max_connections_per_host = per_host_it->max_connections().value(); diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index f40ce0c35617..5537f13e8fbd 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -874,8 +874,8 @@ class ClusterInfoImpl : public ClusterInfo, const envoy::config::core::v3::HttpProtocolOptions& commonHttpProtocolOptions() const override { return http_protocol_options_->common_http_protocol_options_; } - void configureLbPolicies(const envoy::config::cluster::v3::Cluster& config, - Server::Configuration::ServerFactoryContext& context); + absl::Status configureLbPolicies(const envoy::config::cluster::v3::Cluster& config, + Server::Configuration::ServerFactoryContext& context); ProtocolOptionsConfigConstSharedPtr extensionProtocolOptions(const std::string& name) const override; envoy::config::cluster::v3::Cluster::DiscoveryType type() const override { return type_; } @@ -1020,10 +1020,10 @@ class ClusterInfoImpl : public ClusterInfo, ResourceManagers(const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime, const std::string& cluster_name, Stats::Scope& stats_scope, const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names); - ResourceManagerImplPtr load(const envoy::config::cluster::v3::Cluster& config, - Runtime::Loader& runtime, const std::string& cluster_name, - Stats::Scope& stats_scope, - const envoy::config::core::v3::RoutingPriority& priority); + absl::StatusOr + load(const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime, + const std::string& cluster_name, Stats::Scope& stats_scope, + const envoy::config::core::v3::RoutingPriority& priority); using Managers = std::array; @@ -1147,7 +1147,7 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable resolveProtoAddress(const envoy::config::core::v3::Address& address); // Partitions the provided list of hosts into three new lists containing the healthy, degraded @@ -1226,7 +1226,7 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable& all_new_hosts) { - const auto address = parent_.resolveProtoAddress(lb_endpoint.endpoint().address()); + const auto address = + THROW_OR_RETURN_VALUE(parent_.resolveProtoAddress(lb_endpoint.endpoint().address()), + const Network::Address::InstanceConstSharedPtr); std::vector address_list; if (!lb_endpoint.endpoint().additional_addresses().empty()) { address_list.push_back(address); for (const auto& additional_address : lb_endpoint.endpoint().additional_addresses()) { - address_list.emplace_back(parent_.resolveProtoAddress(additional_address.address())); + address_list.emplace_back( + THROW_OR_RETURN_VALUE(parent_.resolveProtoAddress(additional_address.address()), + const Network::Address::InstanceConstSharedPtr)); } } diff --git a/source/extensions/clusters/static/static_cluster.cc b/source/extensions/clusters/static/static_cluster.cc index 37ee4c968e20..704926c9ea17 100644 --- a/source/extensions/clusters/static/static_cluster.cc +++ b/source/extensions/clusters/static/static_cluster.cc @@ -21,11 +21,13 @@ StaticClusterImpl::StaticClusterImpl(const envoy::config::cluster::v3::Cluster& Event::Dispatcher& dispatcher = context.serverFactoryContext().mainThreadDispatcher(); for (const auto& locality_lb_endpoint : cluster_load_assignment.endpoints()) { - validateEndpointsForZoneAwareRouting(locality_lb_endpoint); + THROW_IF_NOT_OK(validateEndpointsForZoneAwareRouting(locality_lb_endpoint)); priority_state_manager_->initializePriorityFor(locality_lb_endpoint); for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { priority_state_manager_->registerHostForPriority( - lb_endpoint.endpoint().hostname(), resolveProtoAddress(lb_endpoint.endpoint().address()), + lb_endpoint.endpoint().hostname(), + THROW_OR_RETURN_VALUE(resolveProtoAddress(lb_endpoint.endpoint().address()), + const Network::Address::InstanceConstSharedPtr), {}, locality_lb_endpoint, lb_endpoint, dispatcher.timeSource()); } } diff --git a/source/extensions/clusters/strict_dns/strict_dns_cluster.cc b/source/extensions/clusters/strict_dns/strict_dns_cluster.cc index dcb0c10f783d..66da67794d77 100644 --- a/source/extensions/clusters/strict_dns/strict_dns_cluster.cc +++ b/source/extensions/clusters/strict_dns/strict_dns_cluster.cc @@ -24,7 +24,7 @@ StrictDnsClusterImpl::StrictDnsClusterImpl(const envoy::config::cluster::v3::Clu std::list resolve_targets; const auto& locality_lb_endpoints = load_assignment_.endpoints(); for (const auto& locality_lb_endpoint : locality_lb_endpoints) { - validateEndpointsForZoneAwareRouting(locality_lb_endpoint); + THROW_IF_NOT_OK(validateEndpointsForZoneAwareRouting(locality_lb_endpoint)); for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { const auto& socket_address = lb_endpoint.endpoint().address().socket_address();