Skip to content

Commit 84853c0

Browse files
ext_proc: log gRPC status if openStream failed (#42257)
If openStream() failed, for example, in the case when the ext_proc cluster configuration is wrong, logging the gRPC status in the ExtProcLoggingInfo. --------- Signed-off-by: Yanjun Xiang <yanjunxiang@google.com>
1 parent 2370128 commit 84853c0

File tree

5 files changed

+110
-19
lines changed

5 files changed

+110
-19
lines changed

source/extensions/filters/http/ext_proc/ext_proc.cc

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ constexpr absl::string_view ResponseTrailerLatencyUsField = "response_trailer_la
7373
constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_call_status";
7474
constexpr absl::string_view BytesSentField = "bytes_sent";
7575
constexpr absl::string_view BytesReceivedField = "bytes_received";
76+
constexpr absl::string_view GrpcStatusBeforeFirstCallField = "grpc_status_before_first_call";
7677

7778
absl::optional<ProcessingMode> initProcessingMode(const ExtProcPerRoute& config) {
7879
if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) {
@@ -416,6 +417,8 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const {
416417
static_cast<double>(bytes_sent_));
417418
(*struct_msg->mutable_fields())[BytesReceivedField].set_number_value(
418419
static_cast<double>(bytes_received_));
420+
(*struct_msg->mutable_fields())[GrpcStatusBeforeFirstCallField].set_number_value(
421+
static_cast<double>(static_cast<int>(grpc_status_before_first_call_)));
419422
return struct_msg;
420423
}
421424

@@ -457,6 +460,7 @@ absl::optional<std::string> ExtProcLoggingInfo::serializeAsString() const {
457460
}
458461
parts.push_back(absl::StrCat("bs:", bytes_sent_));
459462
parts.push_back(absl::StrCat("br:", bytes_received_));
463+
parts.push_back(absl::StrCat("os:", static_cast<int>(grpc_status_before_first_call_)));
460464

461465
return absl::StrJoin(parts, ",");
462466
}
@@ -520,6 +524,9 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const {
520524
if (field_name == BytesReceivedField) {
521525
return static_cast<int64_t>(bytes_received_);
522526
}
527+
if (field_name == GrpcStatusBeforeFirstCallField) {
528+
return static_cast<int64_t>(grpc_status_before_first_call_);
529+
}
523530
return {};
524531
}
525532

@@ -639,8 +646,7 @@ void Filter::onError() {
639646
} else {
640647
// Return an error and stop processing the current stream.
641648
processing_complete_ = true;
642-
decoding_state_.onFinishProcessorCall(Grpc::Status::Aborted);
643-
encoding_state_.onFinishProcessorCall(Grpc::Status::Aborted);
649+
onFinishProcessorCalls(Grpc::Status::Aborted);
644650
ImmediateResponse errorResponse;
645651
errorResponse.mutable_status()->set_code(
646652
static_cast<StatusCode>(static_cast<uint32_t>(config_->statusOnError())));
@@ -1911,25 +1917,35 @@ void Filter::onMessageTimeout() {
19111917
// Return an error and stop processing the current stream.
19121918
processing_complete_ = true;
19131919
closeStream();
1914-
decoding_state_.onFinishProcessorCall(Grpc::Status::DeadlineExceeded);
1915-
encoding_state_.onFinishProcessorCall(Grpc::Status::DeadlineExceeded);
1920+
onFinishProcessorCalls(Grpc::Status::DeadlineExceeded);
19161921
ImmediateResponse errorResponse;
19171922
errorResponse.mutable_status()->set_code(StatusCode::GatewayTimeout);
19181923
errorResponse.set_details(absl::StrFormat("%s_per-message_timeout_exceeded", ErrorPrefix));
19191924
sendImmediateResponse(errorResponse);
19201925
}
19211926
}
19221927

1928+
void Filter::recordGrpcStatusBeforeFirstCall(Grpc::Status::GrpcStatus call_status) {
1929+
if (!decoding_state_.getCallStartTime().has_value() &&
1930+
!encoding_state_.getCallStartTime().has_value()) {
1931+
if (loggingInfo() != nullptr) {
1932+
loggingInfo()->recordGrpcStatusBeforeFirstCall(call_status);
1933+
}
1934+
}
1935+
}
1936+
19231937
// Regardless of the current filter state, reset it to "IDLE", continue
19241938
// the current callback, and reset timers. This is used in a few error-handling situations.
19251939
void Filter::clearAsyncState(Grpc::Status::GrpcStatus call_status) {
1940+
recordGrpcStatusBeforeFirstCall(call_status);
19261941
decoding_state_.clearAsyncState(call_status);
19271942
encoding_state_.clearAsyncState(call_status);
19281943
}
19291944

19301945
// Regardless of the current state, ensure that the timers won't fire
19311946
// again.
19321947
void Filter::onFinishProcessorCalls(Grpc::Status::GrpcStatus call_status) {
1948+
recordGrpcStatusBeforeFirstCall(call_status);
19331949
decoding_state_.onFinishProcessorCall(call_status);
19341950
encoding_state_.onFinishProcessorCall(call_status);
19351951
}

source/extensions/filters/http/ext_proc/ext_proc.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,13 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
9696
void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status,
9797
ProcessorState::CallbackState callback_state,
9898
envoy::config::core::v3::TrafficDirection traffic_direction);
99+
void recordGrpcStatusBeforeFirstCall(Grpc::Status::GrpcStatus call_status) {
100+
grpc_status_before_first_call_ = call_status;
101+
}
102+
Grpc::Status::GrpcStatus getGrpcStatusBeforeFirstCall() const {
103+
return grpc_status_before_first_call_;
104+
}
105+
99106
void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; }
100107
void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; }
101108
void setClusterInfo(absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info) {
@@ -144,6 +151,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
144151
Upstream::HostDescriptionConstSharedPtr upstream_host_;
145152
// The status details of the underlying HTTP/2 stream. Envoy gRPC only.
146153
std::string http_response_code_details_;
154+
// The gRPC status when the openStream() operation fails.
155+
Grpc::Status::GrpcStatus grpc_status_before_first_call_ = Grpc::Status::Ok;
147156
};
148157

149158
class ThreadLocalStreamManager;
@@ -548,6 +557,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
548557
void closeStream();
549558
void halfCloseAndWaitForRemoteClose();
550559

560+
void recordGrpcStatusBeforeFirstCall(Grpc::Status::GrpcStatus call_status);
551561
void onFinishProcessorCalls(Grpc::Status::GrpcStatus call_status);
552562
void clearAsyncState(Grpc::Status::GrpcStatus call_status = Grpc::Status::Aborted);
553563
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);

source/extensions/filters/http/ext_proc/processor_state.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
166166
virtual const Http::RequestOrResponseHeaderMap* responseHeaders() const PURE;
167167
const Http::HeaderMap* responseTrailers() const { return trailers_; }
168168

169+
const absl::optional<MonotonicTime>& getCallStartTime() const { return call_start_time_; }
169170
void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout,
170171
CallbackState callback_state);
171172
void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,

test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ using namespace std::chrono_literals;
7373

7474
INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDeferredProcessing, ExtProcIntegrationTest,
7575
GRPC_CLIENT_INTEGRATION_PARAMS);
76-
7776
// Test the filter using the default configuration by connecting to
7877
// an ext_proc server that responds to the request_headers message
7978
// by immediately closing the stream.
@@ -5136,6 +5135,34 @@ TEST_P(ExtProcIntegrationTest, AccessLogExtProcInCompositeFilter) {
51365135
EXPECT_THAT(log_content, testing::HasSubstr("response_header_latency_us"));
51375136
}
51385137

5138+
TEST_P(ExtProcIntegrationTest, ExtProcLoggingInfoWithWrongCluster) {
5139+
if (!IsEnvoyGrpc()) {
5140+
GTEST_SKIP() << "Google gRPC stream open does not fail immediately with wrong ext_proc cluster";
5141+
}
5142+
auto access_log_path = TestEnvironment::temporaryPath("ext_proc_open_stream_wrong_cluster.log");
5143+
config_helper_.addConfigModifier([&](HttpConnectionManager& cm) {
5144+
auto* access_log = cm.add_access_log();
5145+
access_log->set_name("accesslog");
5146+
envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config;
5147+
access_log_config.set_path(access_log_path);
5148+
auto* json_format = access_log_config.mutable_log_format()->mutable_json_format();
5149+
5150+
(*json_format->mutable_fields())["field_grpc_status_before_first_call"].set_string_value(
5151+
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:grpc_status_before_first_call)%");
5152+
access_log->mutable_typed_config()->PackFrom(access_log_config);
5153+
});
5154+
ConfigOptions config_option = {};
5155+
config_option.valid_grpc_server = false;
5156+
initializeConfig(config_option);
5157+
HttpIntegrationTest::initialize();
5158+
auto response = sendDownstreamRequest(absl::nullopt);
5159+
verifyDownstreamResponse(*response, 500);
5160+
std::string log_result = waitForAccessLog(access_log_path, 0, true);
5161+
auto json_log = Json::Factory::loadFromString(log_result).value();
5162+
auto field_request_header_status = json_log->getString("field_grpc_status_before_first_call");
5163+
EXPECT_NE(*field_request_header_status, "0");
5164+
}
5165+
51395166
} // namespace ExternalProcessing
51405167
} // namespace HttpFilters
51415168
} // namespace Extensions

test/extensions/filters/http/ext_proc/filter_test.cc

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ static const std::string filter_config_name = "scooby.dooby.doo";
9797

9898
class HttpFilterTest : public testing::Test {
9999
protected:
100+
enum DoStartOption {
101+
DEFAULT = 1,
102+
ON_GRPC_ERROR = 2,
103+
ON_GRPC_CLOSE = 3,
104+
};
100105
void initialize(std::string&& yaml, bool is_upstream_filter = false) {
101106
scoped_runtime_.mergeValues(
102107
{{"envoy.reloadable_features.ext_proc_stream_close_optimization", "true"}});
@@ -190,6 +195,16 @@ class HttpFilterTest : public testing::Test {
190195
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
191196
const Envoy::Http::AsyncClient::StreamOptions&,
192197
Envoy::Http::StreamFilterSidestreamWatermarkCallbacks&) {
198+
if (do_start_option_ == ON_GRPC_ERROR) {
199+
callbacks.onGrpcError(Grpc::Status::Internal, "foo");
200+
return nullptr;
201+
}
202+
203+
if (do_start_option_ == ON_GRPC_CLOSE) {
204+
callbacks.onGrpcClose();
205+
return nullptr;
206+
}
207+
193208
if (final_expected_grpc_service_.has_value()) {
194209
EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(),
195210
config_with_hash_key.config()));
@@ -460,17 +475,16 @@ class HttpFilterTest : public testing::Test {
460475
stream_callbacks_->onReceiveMessage(std::move(response));
461476
}
462477

478+
const ExtProcLoggingInfo* getExtProcLoggingInfo() {
479+
return stream_info_.filterState()
480+
->getDataReadOnly<Envoy::Extensions::HttpFilters::ExternalProcessing::ExtProcLoggingInfo>(
481+
filter_config_name);
482+
}
483+
463484
// Get the gRPC call stats data from the filter state.
464485
const ExtProcLoggingInfo::GrpcCalls&
465486
getGrpcCalls(const envoy::config::core::v3::TrafficDirection traffic_direction) {
466-
// The number of processor grpc calls made in the encoding and decoding path.
467-
const ExtProcLoggingInfo::GrpcCalls& grpc_calls =
468-
stream_info_.filterState()
469-
->getDataReadOnly<
470-
Envoy::Extensions::HttpFilters::ExternalProcessing::ExtProcLoggingInfo>(
471-
filter_config_name)
472-
->grpcCalls(traffic_direction);
473-
return grpc_calls;
487+
return getExtProcLoggingInfo()->grpcCalls(traffic_direction);
474488
}
475489

476490
// Check gRPC call stats for headers and trailers.
@@ -631,12 +645,7 @@ class HttpFilterTest : public testing::Test {
631645
// The metadata configured as part of ext_proc filter should be in the filter state.
632646
// In addition, bytes sent/received should also be stored.
633647
void expectFilterState(const Envoy::Protobuf::Struct& expected_metadata) {
634-
const auto* filterState =
635-
stream_info_.filterState()
636-
->getDataReadOnly<
637-
Envoy::Extensions::HttpFilters::ExternalProcessing::ExtProcLoggingInfo>(
638-
filter_config_name);
639-
const Envoy::Protobuf::Struct& loggedMetadata = filterState->filterMetadata();
648+
const Envoy::Protobuf::Struct& loggedMetadata = getExtProcLoggingInfo()->filterMetadata();
640649
EXPECT_THAT(loggedMetadata, ProtoEq(expected_metadata));
641650
}
642651

@@ -668,6 +677,7 @@ class HttpFilterTest : public testing::Test {
668677
NiceMock<Server::Configuration::MockServerFactoryContext> factory_context_;
669678
Extensions::Filters::Common::Expr::BuilderInstanceSharedConstPtr builder_;
670679
TestScopedRuntime scoped_runtime_;
680+
DoStartOption do_start_option_ = DEFAULT;
671681
};
672682

673683
// Using the default configuration, test the filter with a processor that
@@ -1773,6 +1783,7 @@ TEST_F(HttpFilterTest, StreamingSendRequestDataGrpcFail) {
17731783
Unused) { modify_headers(immediate_response_headers); }));
17741784
server_closed_stream_ = true;
17751785
stream_callbacks_->onGrpcError(Grpc::Status::Internal, "error message");
1786+
EXPECT_EQ(Grpc::Status::Ok, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall());
17761787

17771788
// Sending another chunk of data. No more gRPC call.
17781789
EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(req_data, false));
@@ -5749,6 +5760,32 @@ TEST_F(HttpFilterTest, FilterMetadataOverridesClusterMetadata) {
57495760
filter_->onDestroy();
57505761
}
57515762

5763+
TEST_F(HttpFilterTest, GrpcErrorOnOpenStream) {
5764+
initialize(R"EOF(
5765+
grpc_service:
5766+
envoy_grpc:
5767+
cluster_name: "ext_proc_server"
5768+
)EOF");
5769+
5770+
do_start_option_ = ON_GRPC_ERROR;
5771+
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
5772+
filter_->onDestroy();
5773+
EXPECT_EQ(Grpc::Status::Internal, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall());
5774+
}
5775+
5776+
TEST_F(HttpFilterTest, GrpcCloseOnOpenStream) {
5777+
initialize(R"EOF(
5778+
grpc_service:
5779+
envoy_grpc:
5780+
cluster_name: "ext_proc_server"
5781+
)EOF");
5782+
5783+
do_start_option_ = ON_GRPC_CLOSE;
5784+
EXPECT_EQ(FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, false));
5785+
filter_->onDestroy();
5786+
EXPECT_EQ(Grpc::Status::Aborted, getExtProcLoggingInfo()->getGrpcStatusBeforeFirstCall());
5787+
}
5788+
57525789
} // namespace
57535790
} // namespace ExternalProcessing
57545791
} // namespace HttpFilters

0 commit comments

Comments
 (0)