Skip to content

Commit

Permalink
OTel AccessLogger: support log_written/dropped for OTel partial rejec…
Browse files Browse the repository at this point in the history
…ted log records (#34072)


---------

Signed-off-by: Xuyang Tao <taoxuy@google.com>
  • Loading branch information
TAOXUY committed May 16, 2024
1 parent 3e5412c commit f3613b4
Show file tree
Hide file tree
Showing 11 changed files with 432 additions and 167 deletions.
19 changes: 19 additions & 0 deletions source/extensions/access_loggers/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,29 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "grpc_access_logger_clients_lib",
hdrs = ["grpc_access_logger_clients.h"],
deps = [
":grpc_access_logger_utils_lib",
"//envoy/event:dispatcher_interface",
"//envoy/grpc:async_client_manager_interface",
"//envoy/singleton:instance_interface",
"//envoy/stats:stats_interface",
"//envoy/thread_local:thread_local_interface",
"//source/common/common:assert_lib",
"//source/common/grpc:typed_async_client_lib",
"//source/common/protobuf:utility_lib",
"@com_google_absl//absl/types:optional",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "grpc_access_logger",
hdrs = ["grpc_access_logger.h"],
deps = [
":grpc_access_logger_clients_lib",
":grpc_access_logger_utils_lib",
"//envoy/event:dispatcher_interface",
"//envoy/grpc:async_client_manager_interface",
Expand Down
175 changes: 36 additions & 139 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "source/common/http/utility.h"
#include "source/common/protobuf/utility.h"
#include "source/common/tracing/null_span_impl.h"
#include "source/extensions/access_loggers/common/grpc_access_logger_clients.h"
#include "source/extensions/access_loggers/common/grpc_access_logger_utils.h"

#include "absl/container/flat_hash_map.h"
Expand Down Expand Up @@ -75,126 +76,6 @@ template <typename GrpcAccessLogger, typename ConfigProto> class GrpcAccessLogge
getOrCreateLogger(const ConfigProto& config, GrpcAccessLoggerType logger_type) PURE;
};

template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
public:
virtual ~GrpcAccessLogClient() = default;
virtual bool isConnected() PURE;
virtual bool log(const LogRequest& request) PURE;

protected:
GrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
const Protobuf::MethodDescriptor& service_method,
OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
: client_(client), service_method_(service_method),
opts_(createRequestOptionsForRetry(retry_policy)) {}

Grpc::AsyncClient<LogRequest, LogResponse> client_;
const Protobuf::MethodDescriptor& service_method_;
const Http::AsyncClient::RequestOptions opts_;

private:
Http::AsyncClient::RequestOptions
createRequestOptionsForRetry(OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy) {
auto opt = Http::AsyncClient::RequestOptions();

if (!retry_policy) {
return opt;
}

const auto grpc_retry_policy =
Http::Utility::convertCoreToRouteRetryPolicy(*retry_policy, "connect-failure");
opt.setBufferBodyForRetry(true);
opt.setRetryPolicy(grpc_retry_policy);
return opt;
}
};

template <typename LogRequest, typename LogResponse>
class UnaryGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> {
public:
UnaryGrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
const Protobuf::MethodDescriptor& service_method,
OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
: GrpcAccessLogClient<LogRequest, LogResponse>(client, service_method, retry_policy) {}

bool isConnected() override { return false; }

bool log(const LogRequest& request) override {
GrpcAccessLogClient<LogRequest, LogResponse>::client_->send(
GrpcAccessLogClient<LogRequest, LogResponse>::service_method_, request, request_cb_,
Tracing::NullSpan::instance(), GrpcAccessLogClient<LogRequest, LogResponse>::opts_);
return true;
}

struct RequestCallbacks : public Grpc::AsyncRequestCallbacks<LogResponse> {
// Grpc::AsyncRequestCallbacks
void onSuccess(Grpc::ResponsePtr<LogResponse>&&, Tracing::Span&) override {}
void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
void onFailure(Grpc::Status::GrpcStatus, const std::string&, Tracing::Span&) override {}
};

private:
RequestCallbacks request_cb_;
};

template <typename LogRequest, typename LogResponse>
class StreamingGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> {
public:
StreamingGrpcAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client,
const Protobuf::MethodDescriptor& service_method,
OptRef<const envoy::config::core::v3::RetryPolicy> retry_policy)
: GrpcAccessLogClient<LogRequest, LogResponse>(client, service_method, retry_policy) {}

public:
struct LocalStream : public Grpc::AsyncStreamCallbacks<LogResponse> {
LocalStream(StreamingGrpcAccessLogClient& parent) : parent_(parent) {}

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
void onReceiveMessage(std::unique_ptr<LogResponse>&&) override {}
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {
ASSERT(parent_.stream_ != nullptr);
if (parent_.stream_->stream_ != nullptr) {
// Only reset if we have a stream. Otherwise we had an inline failure and we will clear the
// stream data in send().
parent_.stream_.reset();
}
}

StreamingGrpcAccessLogClient& parent_;
Grpc::AsyncStream<LogRequest> stream_{};
};

bool isConnected() override { return stream_ != nullptr && stream_->stream_ != nullptr; }

bool log(const LogRequest& request) override {
if (!stream_) {
stream_ = std::make_unique<LocalStream>(*this);
}

if (stream_->stream_ == nullptr) {
stream_->stream_ = GrpcAccessLogClient<LogRequest, LogResponse>::client_->start(
GrpcAccessLogClient<LogRequest, LogResponse>::service_method_, *stream_,
GrpcAccessLogClient<LogRequest, LogResponse>::opts_);
}

if (stream_->stream_ != nullptr) {
if (stream_->stream_->isAboveWriteBufferHighWatermark()) {
return false;
}
stream_->stream_->sendMessage(request, false);
} else {
// Clear out the stream data due to stream creation failure.
stream_.reset();
}
return true;
}

std::unique_ptr<LocalStream> stream_;
};

} // namespace Detail

/**
Expand Down Expand Up @@ -222,26 +103,22 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
public:
using Interface = Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto>;
GrpcAccessLogger(
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
Event::Dispatcher& dispatcher, Stats::Scope& scope, std::string access_log_prefix,
const Protobuf::MethodDescriptor& service_method, bool stream = true)
: buffer_flush_interval_msec_(
PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
Event::Dispatcher& dispatcher, Stats::Scope& scope,
absl::optional<std::string> access_log_prefix,
std::unique_ptr<GrpcAccessLogClient<LogRequest, LogResponse>> client)
: client_(std::move(client)), buffer_flush_interval_msec_(PROTOBUF_GET_MS_OR_DEFAULT(
config, buffer_flush_interval, 1000)),
flush_timer_(dispatcher.createTimer([this]() {
flush();
flush_timer_->enableTimer(buffer_flush_interval_msec_);
})),
max_buffer_size_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384)),
stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix))}) {
if (stream) {
client_ = std::make_unique<Detail::StreamingGrpcAccessLogClient<LogRequest, LogResponse>>(
client, service_method, GrpcCommon::optionalRetryPolicy(config));
} else {
client_ = std::make_unique<Detail::UnaryGrpcAccessLogClient<LogRequest, LogResponse>>(
client, service_method, GrpcCommon::optionalRetryPolicy(config));
}
max_buffer_size_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384)) {
flush_timer_->enableTimer(buffer_flush_interval_msec_);
if (access_log_prefix.has_value()) {
stats_ = std::make_unique<GrpcAccessLoggerStats>(GrpcAccessLoggerStats{
ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix.value()))});
}
}

void log(HttpLogProto&& entry) override {
Expand All @@ -264,7 +141,7 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
}

protected:
std::unique_ptr<Detail::GrpcAccessLogClient<LogRequest, LogResponse>> client_;
std::unique_ptr<GrpcAccessLogClient<LogRequest, LogResponse>> client_;
LogRequest message_;

private:
Expand All @@ -291,25 +168,45 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
}
}

// `canLogMore()` is only for streaming gRPC client only which could run into
// AboveWriteBufferHighWatermark during `flush()` so that we tracks the log entries dropped caused
// by that.
//
// For unary gRPC client, `canLogMore` always returns True[1][2] so `stats_` here is meaningless.
//
// [1]https://github.com/envoyproxy/envoy/blob/cd5ef906026160ec2cd766d8d18217e668c256d8/source/extensions/access_loggers/common/grpc_access_logger.h#L287.
// [2]https://github.com/envoyproxy/envoy/blob/cd5ef906026160ec2cd766d8d18217e668c256d8/source/extensions/access_loggers/common/grpc_access_logger.h#L126
bool canLogMore() {
if (max_buffer_size_bytes_ == 0 || approximate_message_size_bytes_ < max_buffer_size_bytes_) {
stats_.logs_written_.inc();
incLogsWrittenStats();
return true;
}
flush();
if (approximate_message_size_bytes_ < max_buffer_size_bytes_) {
stats_.logs_written_.inc();
incLogsWrittenStats();
return true;
}
stats_.logs_dropped_.inc();
incLogsDroppedStats();
return false;
}

void incLogsDroppedStats() {
if (stats_) {
stats_->logs_dropped_.inc();
}
}

void incLogsWrittenStats() {
if (stats_) {
stats_->logs_written_.inc();
}
}

const std::chrono::milliseconds buffer_flush_interval_msec_;
const Event::TimerPtr flush_timer_;
const uint64_t max_buffer_size_bytes_;
uint64_t approximate_message_size_bytes_ = 0;
GrpcAccessLoggerStats stats_;
std::unique_ptr<GrpcAccessLoggerStats> stats_ = nullptr;
};

/**
Expand Down

0 comments on commit f3613b4

Please sign in to comment.