Skip to content

Commit

Permalink
ext_proc: Stop timers during ExternalProcessing::Filter::onDestroy (#…
Browse files Browse the repository at this point in the history
…22541)

If a timer fires after onDestroy is called, the ext_proc filter may crash while
trying to access invalid encoder or decoder filter callbacks. The documentation
for StreamFilterBase::onDestroy() says, "Filters must not invoke either
encoder or decoder filter callbacks after having onDestroy() invoked."

Signed-off-by: Rick Stewart <ristewart@google.com>
  • Loading branch information
ristewar committed Aug 12, 2022
1 parent a602db7 commit 073ed39
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 19 deletions.
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ void Filter::onDestroy() {
// per the filter contract.
processing_complete_ = true;
closeStream();
decoding_state_.stopMessageTimer();
encoding_state_.stopMessageTimer();
}

FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
Expand Down
11 changes: 8 additions & 3 deletions source/extensions/filters/http/ext_proc/processor_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::millis

void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
CallbackState next_state) {
if (message_timer_) {
message_timer_->disableTimer();
}
stopMessageTimer();

if (call_start_time_.has_value()) {
std::chrono::microseconds duration = std::chrono::duration_cast<std::chrono::microseconds>(
filter_callbacks_->dispatcher().timeSource().monotonicTime() - call_start_time_.value());
Expand All @@ -43,6 +42,12 @@ void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
callback_state_ = next_state;
}

void ProcessorState::stopMessageTimer() {
if (message_timer_) {
message_timer_->disableTimer();
}
}

absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& response) {
if (callback_state_ == CallbackState::HeadersCallback) {
ENVOY_LOG(debug, "applying headers response. body mode = {}",
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
CallbackState callback_state);
void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
CallbackState next_state = CallbackState::Idle);
void stopMessageTimer();

// Idempotent methods for watermarking the body
virtual void requestWatermark() PURE;
Expand Down
22 changes: 22 additions & 0 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,28 @@ TEST_P(ExtProcIntegrationTest, ResponseMessageTimeoutIgnoreError) {
verifyDownstreamResponse(*response, 200);
}

// While waiting for a response from the external processor, trigger a
// downstream disconnect followed by a response message timeout.
TEST_P(ExtProcIntegrationTest, ResponseMessageTimeoutDownstreamDisconnect) {
proto_config_.set_failure_mode_allow(true);
proto_config_.mutable_message_timeout()->set_nanos(200000000);
initializeConfig();
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequest(absl::nullopt);
processRequestHeadersMessage(*grpc_upstreams_[0], true, absl::nullopt);
handleUpstreamRequest();
processResponseHeadersMessage(*grpc_upstreams_[0], false,
[this](const HttpHeaders&, HeadersResponse&) {
// Downstream disconnect
codec_client_->close();
// Travel forward 400 ms
timeSystem().advanceTimeWaitImpl(400ms);
return false;
});

ASSERT_TRUE(response->waitForReset());
}

// Test how the filter responds when asked to buffer a request body for a POST
// request with an empty body. We should get an empty body message because
// the Envoy filter stream received the body after all the headers.
Expand Down
42 changes: 38 additions & 4 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,19 @@ class HttpFilterTest : public testing::Test {
}

void TearDown() override {
// This will fail if, at the end of the test, we left any timers enabled.
// (This particular test suite does not actually let timers expire,
// although other test suites do.)
EXPECT_TRUE(allTimersDisabled());
}

bool allTimersDisabled() {
for (auto* t : timers_) {
// This will fail if, at the end of the test, we left any timers enabled.
// (This particular test suite does not actually let timers expire,
// although other test suites do.)
EXPECT_FALSE(t->enabled_);
if (t->enabled_) {
return false;
}
}
return true;
}

ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, testing::Unused,
Expand Down Expand Up @@ -1734,6 +1741,33 @@ TEST_F(HttpFilterTest, PostAndClose) {
EXPECT_EQ(1, config_->stats().streams_closed_.value());
}

// Mimic a downstream client reset while the filter waits for a response from
// the processor.
TEST_F(HttpFilterTest, PostAndDownstreamReset) {
initialize(R"EOF(
grpc_service:
envoy_grpc:
cluster_name: "ext_proc_server"
)EOF");

EXPECT_FALSE(config_->failureModeAllow());

// Create synthetic HTTP request
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));

EXPECT_FALSE(last_request_.async_mode());
ASSERT_TRUE(last_request_.has_request_headers());
EXPECT_FALSE(allTimersDisabled());

// Call onDestroy to mimic a downstream client reset.
filter_->onDestroy();

EXPECT_TRUE(allTimersDisabled());
EXPECT_EQ(1, config_->stats().streams_started_.value());
EXPECT_EQ(1, config_->stats().stream_msgs_sent_.value());
EXPECT_EQ(1, config_->stats().streams_closed_.value());
}

// Using a processing mode, configure the filter to only send the request_headers
// message.
TEST_F(HttpFilterTest, ProcessingModeRequestHeadersOnly) {
Expand Down
24 changes: 12 additions & 12 deletions test/extensions/filters/http/ext_proc/ordering_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,15 @@ TEST_F(OrderingTest, DefaultOrderingGetWithTimer) {
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersGet(true);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
EXPECT_CALL(*request_timer, disableTimer());
EXPECT_CALL(*request_timer, disableTimer()).Times(2);
sendRequestHeadersReply();

MockTimer* response_timer = new MockTimer(&dispatcher_);
EXPECT_CALL(*response_timer, enableTimer(kMessageTimeout, nullptr));
EXPECT_CALL(stream_delegate_, send(_, false));
sendResponseHeaders(true);
EXPECT_CALL(encoder_callbacks_, continueEncoding());
EXPECT_CALL(*response_timer, disableTimer());
EXPECT_CALL(*response_timer, disableTimer()).Times(2);
sendResponseHeadersReply();
Buffer::OwnedImpl req_body("Hello!");
EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(req_body, true));
Expand Down Expand Up @@ -547,7 +547,7 @@ TEST_F(OrderingTest, ImmediateResponseOnRequest) {
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersGet(true);
EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _));
EXPECT_CALL(*request_timer, disableTimer());
EXPECT_CALL(*request_timer, disableTimer()).Times(2);
sendImmediateResponse500();
// The rest of the filter isn't necessarily called after this.
}
Expand All @@ -562,7 +562,7 @@ TEST_F(OrderingTest, ImmediateResponseOnResponse) {
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersGet(true);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
EXPECT_CALL(*request_timer, disableTimer()).Times(2);
EXPECT_CALL(*request_timer, disableTimer()).Times(3);
sendRequestHeadersReply();

MockTimer* response_timer = new MockTimer(&dispatcher_);
Expand All @@ -571,7 +571,7 @@ TEST_F(OrderingTest, ImmediateResponseOnResponse) {
EXPECT_CALL(stream_delegate_, send(_, false));
sendResponseHeaders(true);
EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _));
EXPECT_CALL(*response_timer, disableTimer());
EXPECT_CALL(*response_timer, disableTimer()).Times(2);
sendImmediateResponse500();
Buffer::OwnedImpl resp_body("Hello!");
EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_body, true));
Expand Down Expand Up @@ -710,7 +710,7 @@ TEST_F(OrderingTest, GrpcErrorInline) {
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersGet(true);
EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _));
EXPECT_CALL(*request_timer, disableTimer());
EXPECT_CALL(*request_timer, disableTimer()).Times(2);
sendGrpcError();
// The rest of the filter isn't called after this.
}
Expand All @@ -728,7 +728,7 @@ TEST_F(OrderingTest, GrpcErrorInlineIgnored) {
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersGet(true);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
EXPECT_CALL(*request_timer, disableTimer());
EXPECT_CALL(*request_timer, disableTimer()).Times(2);
sendGrpcError();

// After that we ignore the processor
Expand All @@ -748,7 +748,7 @@ TEST_F(OrderingTest, GrpcErrorOutOfLine) {
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersGet(true);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
EXPECT_CALL(*request_timer, disableTimer()).Times(2);
EXPECT_CALL(*request_timer, disableTimer()).Times(3);
sendRequestHeadersReply();

EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _));
Expand Down Expand Up @@ -782,7 +782,7 @@ TEST_F(OrderingTest, GrpcErrorAfterTimeout) {
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersGet(true);
EXPECT_CALL(encoder_callbacks_, sendLocalReply(Http::Code::InternalServerError, _, _, _, _));
EXPECT_CALL(*request_timer, disableTimer());
EXPECT_CALL(*request_timer, disableTimer()).Times(2);
request_timer->invokeCallback();
// Nothing should happen now despite the gRPC error
sendGrpcError();
Expand Down Expand Up @@ -812,7 +812,7 @@ TEST_F(OrderingTest, TimeoutOnResponseBody) {
EXPECT_CALL(*request_timer, enableTimer(kMessageTimeout, nullptr));
EXPECT_CALL(stream_delegate_, send(_, false));
EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(req_body, true));
EXPECT_CALL(*request_timer, disableTimer()).Times(2);
EXPECT_CALL(*request_timer, disableTimer()).Times(3);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
sendRequestBodyReply();

Expand All @@ -827,7 +827,7 @@ TEST_F(OrderingTest, TimeoutOnResponseBody) {
EXPECT_CALL(*response_timer, enableTimer(kMessageTimeout, nullptr));
EXPECT_CALL(stream_delegate_, send(_, false));
sendResponseHeaders(true);
EXPECT_CALL(*response_timer, disableTimer()).Times(2);
EXPECT_CALL(*response_timer, disableTimer()).Times(3);
sendResponseHeadersReply();
EXPECT_CALL(*response_timer, enableTimer(kMessageTimeout, nullptr));
EXPECT_CALL(stream_delegate_, send(_, false));
Expand All @@ -851,7 +851,7 @@ TEST_F(OrderingTest, TimeoutOnRequestBody) {
EXPECT_CALL(*request_timer, enableTimer(kMessageTimeout, nullptr));
EXPECT_CALL(stream_delegate_, send(_, false));
sendRequestHeadersPost(true);
EXPECT_CALL(*request_timer, disableTimer()).Times(2);
EXPECT_CALL(*request_timer, disableTimer()).Times(3);
sendRequestHeadersReply();

Buffer::OwnedImpl req_body;
Expand Down

0 comments on commit 073ed39

Please sign in to comment.