Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load shed: add load shed check in downstream decoder filters #33366

Merged
merged 11 commits into from
Apr 15, 2024
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,10 @@ new_features:
Added maximum gRPC message size that is allowed to be received in Envoy gRPC. If a message over this limit is received,
the gRPC stream is terminated with the RESOURCE_EXHAUSTED error. This limit is applied to individual messages in the
streaming response and not the total size of streaming response. Defaults to 0, which means unlimited.
- area: load shed point
change: |
Added load shed point ``envoy.load_shed_points.http_downstream_filter_check`` that makes load shed check availabe in HTTP filters,
and right now it is avaiable in router. It will send local reply directly when Envoy is under pressure, typically memory.
botengyao marked this conversation as resolved.
Show resolved Hide resolved

deprecated:
- area: listener
Expand Down
5 changes: 5 additions & 0 deletions envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,11 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks,
*/
virtual absl::optional<Upstream::LoadBalancerContext::OverrideHost>
upstreamOverrideHost() const PURE;

/**
* @return true if the filter should shed load based on the system pressure, typically memory.
*/
virtual bool shouldLoadShed() const { return false; };
botengyao marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down
3 changes: 3 additions & 0 deletions envoy/server/overload/load_shed_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class LoadShedPointNameValues {
// Envoy will close the connections before creating codec if Envoy is under pressure,
// typically memory. This happens once geting data from the connection.
const std::string HcmCodecCreation = "envoy.load_shed_points.hcm_ondata_creating_codec";

const std::string HttpDownstreamFilterCheck =
"envoy.load_shed_points.http_downstream_filter_check";
};

using LoadShedPointName = ConstSingleton<LoadShedPointNameValues>;
Expand Down
16 changes: 8 additions & 8 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -794,14 +794,14 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect
: makeOptRef<const TracingConnectionManagerConfig>(
*connection_manager_.config_.tracingConfig())),
stream_id_(connection_manager.random_generator_.random()),
filter_manager_(*this, *connection_manager_.dispatcher_,
connection_manager_.read_callbacks_->connection(), stream_id_,
std::move(account), connection_manager_.config_.proxy100Continue(),
buffer_limit, connection_manager_.config_.filterFactory(),
connection_manager_.config_.localReply(),
connection_manager_.codec_->protocol(), connection_manager_.timeSource(),
connection_manager_.read_callbacks_->connection().streamInfo().filterState(),
StreamInfo::FilterState::LifeSpan::Connection),
filter_manager_(
*this, *connection_manager_.dispatcher_,
connection_manager_.read_callbacks_->connection(), stream_id_, std::move(account),
connection_manager_.config_.proxy100Continue(), buffer_limit,
connection_manager_.config_.filterFactory(), connection_manager_.config_.localReply(),
connection_manager_.codec_->protocol(), connection_manager_.timeSource(),
connection_manager_.read_callbacks_->connection().streamInfo().filterState(),
StreamInfo::FilterState::LifeSpan::Connection, connection_manager_.overload_manager_),
request_response_timespan_(new Stats::HistogramCompletableTimespanImpl(
connection_manager_.stats_.named_.downstream_rq_time_, connection_manager_.timeSource())),
header_validator_(
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ const Buffer::Instance* ActiveStreamDecoderFilter::decodingBuffer() {
return parent_.buffered_request_data_.get();
}

bool ActiveStreamDecoderFilter::shouldLoadShed() const { return parent_.shouldLoadShed(); }

void ActiveStreamDecoderFilter::modifyDecodingBuffer(
std::function<void(Buffer::Instance&)> callback) {
ASSERT(parent_.state_.latest_data_decoding_filter_ == this);
Expand Down
23 changes: 21 additions & 2 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,
Buffer::BufferMemoryAccountSharedPtr account() const override;
void setUpstreamOverrideHost(Upstream::LoadBalancerContext::OverrideHost) override;
absl::optional<Upstream::LoadBalancerContext::OverrideHost> upstreamOverrideHost() const override;
bool shouldLoadShed() const override;

// Each decoder filter instance checks if the request passed to the filter is gRPC
// so that we can issue gRPC local responses to gRPC requests. Filter's decodeHeaders()
Expand Down Expand Up @@ -859,6 +860,8 @@ class FilterManager : public ScopeTrackedObject,
void onDownstreamReset() { state_.saw_downstream_reset_ = true; }
bool sawDownstreamReset() { return state_.saw_downstream_reset_; }

virtual bool shouldLoadShed() { return false; };

protected:
struct State {
State()
Expand Down Expand Up @@ -1098,12 +1101,20 @@ class DownstreamFilterManager : public FilterManager {
const LocalReply::LocalReply& local_reply, Http::Protocol protocol,
TimeSource& time_source,
StreamInfo::FilterStateSharedPtr parent_filter_state,
StreamInfo::FilterState::LifeSpan filter_state_life_span)
StreamInfo::FilterState::LifeSpan filter_state_life_span,
Server::OverloadManager& overload_manager)
: FilterManager(filter_manager_callbacks, dispatcher, connection, stream_id, account,
proxy_100_continue, buffer_limit, filter_chain_factory),
stream_info_(protocol, time_source, connection.connectionInfoProviderSharedPtr(),
parent_filter_state, filter_state_life_span),
local_reply_(local_reply) {}
local_reply_(local_reply),
downstream_filter_load_shed_point_(overload_manager.getLoadShedPoint(
Server::LoadShedPointName::get().HttpDownstreamFilterCheck)) {
ENVOY_LOG_ONCE_IF(
trace, downstream_filter_load_shed_point_ == nullptr,
"LoadShedPoint envoy.load_shed_points.http_downstream_filter_check is not found. "
"Is it configured?");
}
~DownstreamFilterManager() override {
ASSERT(prepared_local_reply_ == nullptr,
"Filter Manager destroyed without executing prepared local reply");
Expand Down Expand Up @@ -1139,6 +1150,13 @@ class DownstreamFilterManager : public FilterManager {
streamInfo().downstreamTiming()->lastDownstreamRxByteReceived().has_value();
}

bool shouldLoadShed() override {
if (nullptr != downstream_filter_load_shed_point_) {
return downstream_filter_load_shed_point_->shouldShedLoad();
}
return false;
botengyao marked this conversation as resolved.
Show resolved Hide resolved
}

private:
/**
* Sends a local reply by constructing a response and passing it through all the encoder
Expand Down Expand Up @@ -1177,6 +1195,7 @@ class DownstreamFilterManager : public FilterManager {
OverridableRemoteConnectionInfoSetterStreamInfo stream_info_;
const LocalReply::LocalReply& local_reply_;
Utility::PreparedLocalReplyPtr prepared_local_reply_{nullptr};
Server::LoadShedPoint* downstream_filter_load_shed_point_{nullptr};
};

} // namespace Http
Expand Down
1 change: 1 addition & 0 deletions source/common/router/context_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace Router {
COUNTER(passthrough_internal_redirect_too_many_redirects) \
COUNTER(passthrough_internal_redirect_unsafe_scheme) \
COUNTER(rq_direct_response) \
COUNTER(rq_overload_local_reply) \
COUNTER(rq_redirect) \
COUNTER(rq_reset_after_downstream_response_started) \
COUNTER(rq_total) \
Expand Down
7 changes: 7 additions & 0 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,13 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
return Http::FilterHeadersStatus::StopIteration;
}

if (callbacks_->shouldLoadShed()) {
callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "envoy overloaded", nullptr,
absl::nullopt, StreamInfo::ResponseCodeDetails::get().Overload);
stats_.rq_overload_local_reply_.inc();
return Http::FilterHeadersStatus::StopIteration;
}

hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers);

timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_,
Expand Down
1 change: 1 addition & 0 deletions test/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ envoy_cc_test(
"//test/mocks/http:http_mocks",
"//test/mocks/local_reply:local_reply_mocks",
"//test/mocks/network:network_mocks",
"//test/mocks/server:overload_manager_mocks",
"//test/test_common:test_runtime_lib",
],
)
Expand Down
6 changes: 6 additions & 0 deletions test/common/http/conn_manager_impl_test_2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2634,6 +2634,9 @@ TEST_F(HttpConnectionManagerImplTest, CodecCreationLoadShedPointBypasscheck) {
EXPECT_CALL(overload_manager_,
getLoadShedPoint(Server::LoadShedPointName::get().HcmDecodeHeaders))
.WillOnce(Return(nullptr));
EXPECT_CALL(overload_manager_,
getLoadShedPoint(Server::LoadShedPointName::get().HttpDownstreamFilterCheck))
.WillOnce(Return(nullptr));

setup(false, "");

Expand Down Expand Up @@ -2661,6 +2664,9 @@ TEST_F(HttpConnectionManagerImplTest, DecodeHeaderLoadShedPointCanRejectNewStrea
EXPECT_CALL(overload_manager_,
getLoadShedPoint(Server::LoadShedPointName::get().HcmCodecCreation))
.WillOnce(Return(nullptr));
EXPECT_CALL(overload_manager_,
getLoadShedPoint(Server::LoadShedPointName::get().HttpDownstreamFilterCheck))
.WillRepeatedly(Return(nullptr));

setup(false, "");
setupFilterChain(1, 0);
Expand Down
4 changes: 3 additions & 1 deletion test/common/http/filter_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "test/mocks/http/mocks.h"
#include "test/mocks/local_reply/mocks.h"
#include "test/mocks/network/mocks.h"
#include "test/mocks/server/overload_manager.h"
#include "test/test_common/test_runtime.h"

#include "gtest/gtest.h"
Expand All @@ -35,7 +36,7 @@ class FilterManagerTest : public testing::Test {
filter_manager_ = std::make_unique<DownstreamFilterManager>(
filter_manager_callbacks_, dispatcher_, connection_, 0, nullptr, true, 10000,
filter_factory_, local_reply_, protocol_, time_source_, filter_state_,
StreamInfo::FilterState::LifeSpan::Connection);
StreamInfo::FilterState::LifeSpan::Connection, overload_manager_);
}

// Simple helper to wrapper filter to the factory function.
Expand Down Expand Up @@ -81,6 +82,7 @@ class FilterManagerTest : public testing::Test {
NiceMock<MockTimeSystem> time_source_;
StreamInfo::FilterStateSharedPtr filter_state_ =
std::make_shared<StreamInfo::FilterStateImpl>(StreamInfo::FilterState::LifeSpan::Connection);
NiceMock<Server::MockOverloadManager> overload_manager_;
};

TEST_F(FilterManagerTest, RequestHeadersOrResponseHeadersAccess) {
Expand Down
10 changes: 10 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,16 @@ TEST_F(RouterTest, NoHost) {
EXPECT_EQ(callbacks_.details(), "no_healthy_upstream");
}

TEST_F(RouterTest, RouterLoadShedTest) {
Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
ON_CALL(callbacks_, shouldLoadShed()).WillByDefault(Return(true));
router_->decodeHeaders(headers, true);
EXPECT_TRUE(verifyHostUpstreamStats(0, 0));
EXPECT_EQ(callbacks_.details(), "overload");
EXPECT_EQ(1UL, router_->stats().rq_overload_local_reply_.value());
}

TEST_F(RouterTest, MaintenanceMode) {
EXPECT_CALL(*cm_.thread_local_cluster_.cluster_.info_, maintenanceMode()).WillOnce(Return(true));

Expand Down
33 changes: 33 additions & 0 deletions test/integration/overload_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -849,4 +849,37 @@ TEST_P(LoadShedPointIntegrationTest, HttpConnectionMnagerCloseConnectionCreating
EXPECT_EQ(response->headers().getStatusValue(), "200");
}

TEST_P(LoadShedPointIntegrationTest, HttpDownstreamFilterLoadShed) {
if (downstreamProtocol() == Http::CodecClient::Type::HTTP3) {
botengyao marked this conversation as resolved.
Show resolved Hide resolved
return;
}
autonomous_upstream_ = true;
initializeOverloadManager(
TestUtility::parseYaml<envoy::config::overload::v3::LoadShedPoint>(R"EOF(
name: "envoy.load_shed_points.http_downstream_filter_check"
triggers:
- name: "envoy.resource_monitors.testonly.fake_resource_monitor"
threshold:
value: 0.90
)EOF"));

codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http"))));

// Put envoy in overloaded state and check that it sends a local reply from router.
updateResource(0.95);
test_server_->waitForGaugeEq(
"overload.envoy.load_shed_points.http_downstream_filter_check.scale_percent", 100);
auto response_with_local_reply = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
ASSERT_TRUE(response_with_local_reply->waitForEndStream());
EXPECT_EQ(response_with_local_reply->headers().getStatusValue(), "503");

updateResource(0.80);
test_server_->waitForGaugeEq(
"overload.envoy.load_shed_points.http_downstream_filter_check.scale_percent", 0);

auto response_that_is_proxied = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
ASSERT_TRUE(response_that_is_proxied->waitForEndStream());
EXPECT_EQ(response_that_is_proxied->headers().getStatusValue(), "200");
}

} // namespace Envoy
1 change: 1 addition & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,
MOCK_METHOD(void, setUpstreamOverrideHost, (Upstream::LoadBalancerContext::OverrideHost));
MOCK_METHOD(absl::optional<Upstream::LoadBalancerContext::OverrideHost>, upstreamOverrideHost, (),
(const));
MOCK_METHOD(bool, shouldLoadShed, (), (const));

Buffer::InstancePtr buffer_;
std::list<DownstreamWatermarkCallbacks*> callbacks_{};
Expand Down