Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router: implement RetryHostPredicate #4385

Merged
merged 10 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ class RetryPolicy {
* @return uint32_t a local OR of RETRY_ON values above.
*/
virtual uint32_t retryOn() const PURE;

/**
* Initializes the RetryHostPredicates to be used with this retry attempt.
* @return list of RetryHostPredicates to use
*/
virtual std::vector<Upstream::RetryHostPredicateSharedPtr> retryHostPredicates() const PURE;
};

/**
Expand Down Expand Up @@ -204,6 +210,21 @@ class RetryState {
virtual RetryStatus shouldRetry(const Http::HeaderMap* response_headers,
const absl::optional<Http::StreamResetReason>& reset_reason,
DoRetryCallback callback) PURE;

/**
* Called when a host was attempted but the request failed and is eligible for another retry.
* Should be used to update whatever internal state depends on previously attempted hosts.
* @param host the previously attempted host.
*/
virtual void onHostAttempted(Upstream::HostDescriptionConstSharedPtr host) PURE;

/**
* Determine whether host selection should be reattempted. Applies to host selection during
* retries, and is used to provide configurable host selection for retries.
* @param host the host under consideration
* @return whether host selection should be reattempted
*/
virtual bool shouldSelectAnotherHost(const Upstream::Host& host) PURE;
};

typedef std::unique_ptr<RetryState> RetryStatePtr;
Expand Down
9 changes: 7 additions & 2 deletions include/envoy/upstream/retry.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class RetryPriority {
*
* @param attempted_host the host that was previously attempted.
*/
virtual void onHostAttempted(HostSharedPtr attempted_host) PURE;
virtual void onHostAttempted(HostDescriptionConstSharedPtr attempted_host) PURE;
};

typedef std::shared_ptr<RetryPriority> RetryPrioritySharedPtr;
Expand Down Expand Up @@ -67,7 +67,7 @@ class RetryHostPredicate {
*
* @param attempted_host the host that was previously attempted.
*/
virtual void onHostAttempted(HostSharedPtr attempted_host) PURE;
virtual void onHostAttempted(HostDescriptionConstSharedPtr attempted_host) PURE;
};

typedef std::shared_ptr<RetryHostPredicate> RetryHostPredicateSharedPtr;
Expand Down Expand Up @@ -116,6 +116,11 @@ class RetryHostPredicateFactory {
virtual ~RetryHostPredicateFactory() {}

virtual void createHostPredicate(RetryHostPredicateFactoryCallbacks& callbacks) PURE;

/**
* @return name name of this factory.
*/
virtual std::string name() PURE;
};

} // namespace Upstream
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ class AsyncStreamImpl : public AsyncClient::Stream,
std::chrono::milliseconds perTryTimeout() const override {
return std::chrono::milliseconds(0);
}
std::vector<Upstream::RetryHostPredicateSharedPtr> retryHostPredicates() const override {
return {};
}
uint32_t numRetries() const override { return 0; }
uint32_t retryOn() const override { return 0; }
};
Expand Down
9 changes: 9 additions & 0 deletions source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ RetryPolicyImpl::RetryPolicyImpl(const envoy::api::v2::route::RouteAction& confi
num_retries_ = PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.retry_policy(), num_retries, 1);
retry_on_ = RetryStateImpl::parseRetryOn(config.retry_policy().retry_on());
retry_on_ |= RetryStateImpl::parseRetryGrpcOn(config.retry_policy().retry_on());

for (auto& host_predicate : config.retry_policy().retry_host_predicate()) {
snowp marked this conversation as resolved.
Show resolved Hide resolved
// TODO(snowp): support passing the config Struct during initialization.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable to do in this PR, should only be a few lines?

auto factory = Registry::FactoryRegistry<Upstream::RetryHostPredicateFactory>::getFactory(
host_predicate.name());

ASSERT(factory);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no need to ASSERT if doing an immediate dereference next line.

factory->createHostPredicate(*this);
}
}

CorsPolicyImpl::CorsPolicyImpl(const envoy::api::v2::route::CorsPolicy& config) {
Expand Down
11 changes: 10 additions & 1 deletion source/common/router/config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,28 @@ typedef std::shared_ptr<VirtualHostImpl> VirtualHostSharedPtr;
/**
* Implementation of RetryPolicy that reads from the proto route config.
*/
class RetryPolicyImpl : public RetryPolicy {
class RetryPolicyImpl : public RetryPolicy, Upstream::RetryHostPredicateFactoryCallbacks {
public:
RetryPolicyImpl(const envoy::api::v2::route::RouteAction& config);

// Router::RetryPolicy
std::chrono::milliseconds perTryTimeout() const override { return per_try_timeout_; }
uint32_t numRetries() const override { return num_retries_; }
uint32_t retryOn() const override { return retry_on_; }
std::vector<Upstream::RetryHostPredicateSharedPtr> retryHostPredicates() const override {
return retry_host_predicates_;
}

// Upstream::RetryHostPredicateFactoryCallbacks
void addHostPredicate(Upstream::RetryHostPredicateSharedPtr predicate) override {
retry_host_predicates_.emplace_back(predicate);
}

private:
std::chrono::milliseconds per_try_timeout_{0};
uint32_t num_retries_{};
uint32_t retry_on_{};
std::vector<Upstream::RetryHostPredicateSharedPtr> retry_host_predicates_;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/retry_state_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ RetryStateImpl::RetryStateImpl(const RetryPolicy& route_policy, Http::HeaderMap&
Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher,
Upstream::ResourcePriority priority)
: cluster_(cluster), runtime_(runtime), random_(random), dispatcher_(dispatcher),
priority_(priority) {
priority_(priority), retry_host_predicates_(route_policy.retryHostPredicates()) {

if (request_headers.EnvoyRetryOn()) {
retry_on_ = parseRetryOn(request_headers.EnvoyRetryOn()->value().c_str());
Expand Down
11 changes: 11 additions & 0 deletions source/common/router/retry_state_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class RetryStateImpl : public RetryState {
RetryStatus shouldRetry(const Http::HeaderMap* response_headers,
const absl::optional<Http::StreamResetReason>& reset_reason,
DoRetryCallback callback) override;
bool shouldSelectAnotherHost(const Upstream::Host& host) override {
return std::any_of(
retry_host_predicates_.begin(), retry_host_predicates_.end(),
[&host](auto predicate) { return predicate->shouldSelectAnotherHost(host); });
}

void onHostAttempted(Upstream::HostDescriptionConstSharedPtr host) override {
std::for_each(retry_host_predicates_.begin(), retry_host_predicates_.end(),
[&host](auto predicate) { predicate->onHostAttempted(host); });
}

private:
RetryStateImpl(const RetryPolicy& route_policy, Http::HeaderMap& request_headers,
Expand All @@ -61,6 +71,7 @@ class RetryStateImpl : public RetryState {
Event::TimerPtr retry_timer_;
Upstream::ResourcePriority priority_;
BackOffStrategyPtr backoff_strategy_;
std::vector<Upstream::RetryHostPredicateSharedPtr> retry_host_predicates_;
};

} // namespace Router
Expand Down
7 changes: 7 additions & 0 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ void Filter::onUpstreamReset(UpstreamResetType type,

// We don't retry on a global timeout or if we already started the response.
if (type != UpstreamResetType::GlobalTimeout && !downstream_response_started_ && retry_state_) {
// Notify retry modifiers about the attempted host.
retry_state_->onHostAttempted(upstream_host);

RetryStatus retry_status =
retry_state_->shouldRetry(nullptr, reset_reason, [this]() -> void { doRetry(); });
if (retry_status == RetryStatus::Yes && setupRetry(true)) {
Expand Down Expand Up @@ -589,6 +592,9 @@ void Filter::onUpstreamHeaders(const uint64_t response_code, Http::HeaderMapPtr&
}

if (retry_state_) {
// Notify retry modifiers about the attempted host.
retry_state_->onHostAttempted(upstream_request_->upstream_host_);

RetryStatus retry_status = retry_state_->shouldRetry(
headers.get(), absl::optional<Http::StreamResetReason>(), [this]() -> void { doRetry(); });
// Capture upstream_host since setupRetry() in the following line will clear
Expand Down Expand Up @@ -744,6 +750,7 @@ bool Filter::setupRetry(bool end_stream) {
}

void Filter::doRetry() {
is_retry_ = true;
Http::ConnectionPool::Instance* conn_pool = getConnPool();
if (!conn_pool) {
sendNoHealthyUpstreamResponse();
Expand Down
14 changes: 13 additions & 1 deletion source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
public:
Filter(FilterConfig& config)
: config_(config), downstream_response_started_(false), downstream_end_stream_(false),
do_shadowing_(false) {}
do_shadowing_(false), is_retry_(false) {}

~Filter();

Expand Down Expand Up @@ -204,6 +204,17 @@ class Filter : Logger::Loggable<Logger::Id::router>,
}
const Http::HeaderMap* downstreamHeaders() const override { return downstream_headers_; }

bool shouldSelectAnotherHost(const Upstream::Host& host) override {
// We only care about host selection when performing a retry, at which point we consult the
// RetryState to see if we're configured to avoid certain hosts during retries.
if (!is_retry_) {
return false;
}

ASSERT(retry_state_);
return retry_state_->shouldSelectAnotherHost(host);
}

/**
* Set a computed cookie to be sent with the downstream headers.
* @param key supplies the size of the cookie
Expand Down Expand Up @@ -377,6 +388,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
bool downstream_response_started_ : 1;
bool downstream_end_stream_ : 1;
bool do_shadowing_ : 1;
bool is_retry_ : 1;
};

class ProdFilter : public Filter {
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class LoadBalancerContextBase : public LoadBalancerContext {

bool shouldSelectAnotherHost(const Host&) override { return false; }

uint32_t hostSelectionRetryCount() const override { return 0; }
uint32_t hostSelectionRetryCount() const override { return 1; }
};

/**
Expand Down
2 changes: 1 addition & 1 deletion test/common/router/retry_state_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class RouterRetryStateImplTest : public testing::Test {
EXPECT_CALL(*retry_timer_, enableTimer(_));
}

TestRetryPolicy policy_;
NiceMock<TestRetryPolicy> policy_;
NiceMock<Upstream::MockClusterInfo> cluster_;
NiceMock<Runtime::MockLoader> runtime_;
NiceMock<Runtime::MockRandomGenerator> random_;
Expand Down
71 changes: 71 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class TestFilter : public Filter {
Upstream::ResourcePriority) override {
EXPECT_EQ(nullptr, retry_state_);
retry_state_ = new NiceMock<MockRetryState>();
if (reject_all_hosts_) {
// Set up RetryState to always reject the host
ON_CALL(*retry_state_, shouldSelectAnotherHost(_)).WillByDefault(Return(true));
}
return RetryStatePtr{retry_state_};
}

Expand All @@ -63,6 +67,7 @@ class TestFilter : public Filter {

NiceMock<Network::MockConnection> downstream_connection_;
MockRetryState* retry_state_{};
bool reject_all_hosts_;
};

class RouterTestBase : public testing::Test {
Expand Down Expand Up @@ -1692,6 +1697,72 @@ TEST_F(RouterTest, RetryUpstreamGrpcCancelled) {
EXPECT_TRUE(verifyHostUpstreamStats(1, 1));
}

// Verifies that the initial request accepts any host, but during retries
// RetryPolicy will be consulted.
TEST_F(RouterTest, RetryRespectsRetryHostPredicate) {
router_.reject_all_hosts_ = true;

NiceMock<Http::MockStreamEncoder> encoder1;
Http::StreamDecoder* response_decoder = nullptr;
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_);
return nullptr;
}));
expectResponseTimerCreate();

Http::TestHeaderMapImpl headers{{"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}};
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);

NiceMock<Upstream::MockHost> host;
// The router should accept any host at this point, since we're not in a retry.
EXPECT_FALSE(router_.shouldSelectAnotherHost(host));

Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello"));
EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true));
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false));

Http::TestHeaderMapImpl trailers{{"some", "trailer"}};
router_.decodeTrailers(trailers);

// 5xx response.
router_.retry_state_->expectRetry();
Http::HeaderMapPtr response_headers1(new Http::TestHeaderMapImpl{{":status", "503"}});
EXPECT_CALL(encoder1.stream_, resetStream(Http::StreamResetReason::LocalReset));
EXPECT_CALL(cm_.conn_pool_.host_->outlier_detector_, putHttpResponseCode(503));
response_decoder->decodeHeaders(std::move(response_headers1), false);
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));

// We expect the 5xx response to kick off a new request.
NiceMock<Http::MockStreamEncoder> encoder2;
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
callbacks.onPoolReady(encoder2, cm_.conn_pool_.host_);
return nullptr;
}));
ON_CALL(callbacks_, decodingBuffer()).WillByDefault(Return(body_data.get()));
EXPECT_CALL(encoder2, encodeHeaders(_, false));
EXPECT_CALL(encoder2, encodeData(_, false));
EXPECT_CALL(encoder2, encodeTrailers(_));
router_.retry_state_->callback_();

// Now that we're triggered a retry, we should see the router reject hosts.
EXPECT_TRUE(router_.shouldSelectAnotherHost(host));

// Normal response.
EXPECT_CALL(*router_.retry_state_, shouldRetry(_, _, _)).WillOnce(Return(RetryStatus::No));
EXPECT_CALL(cm_.conn_pool_.host_->health_checker_, setUnhealthy()).Times(0);
Http::HeaderMapPtr response_headers2(new Http::TestHeaderMapImpl{{":status", "200"}});
EXPECT_CALL(cm_.conn_pool_.host_->outlier_detector_, putHttpResponseCode(200));
response_decoder->decodeHeaders(std::move(response_headers2), true);
EXPECT_TRUE(verifyHostUpstreamStats(1, 1));
}

TEST_F(RouterTest, Shadow) {
callbacks_.route_->route_entry_.shadow_policy_.cluster_ = "foo";
callbacks_.route_->route_entry_.shadow_policy_.runtime_key_ = "bar";
Expand Down
4 changes: 3 additions & 1 deletion test/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ void ConfigHelper::setConnectTimeout(std::chrono::milliseconds timeout) {
void ConfigHelper::addRoute(const std::string& domains, const std::string& prefix,
const std::string& cluster, bool validate_clusters,
envoy::api::v2::route::RouteAction::ClusterNotFoundResponseCode code,
envoy::api::v2::route::VirtualHost::TlsRequirementType type) {
envoy::api::v2::route::VirtualHost::TlsRequirementType type,
envoy::api::v2::route::RouteAction::RetryPolicy retry_policy) {
RELEASE_ASSERT(!finalized_, "");
envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager hcm_config;
loadHttpConnectionManager(hcm_config);
Expand All @@ -324,6 +325,7 @@ void ConfigHelper::addRoute(const std::string& domains, const std::string& prefi
virtual_host->add_routes()->mutable_match()->set_prefix(prefix);
virtual_host->mutable_routes(0)->mutable_route()->set_cluster(cluster);
virtual_host->mutable_routes(0)->mutable_route()->set_cluster_not_found_response_code(code);
virtual_host->mutable_routes(0)->mutable_route()->mutable_retry_policy()->Swap(&retry_policy);
virtual_host->set_require_tls(type);

storeHttpConnectionManager(hcm_config);
Expand Down
3 changes: 2 additions & 1 deletion test/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class ConfigHelper {
bool validate_clusters,
envoy::api::v2::route::RouteAction::ClusterNotFoundResponseCode code,
envoy::api::v2::route::VirtualHost::TlsRequirementType type =
envoy::api::v2::route::VirtualHost::NONE);
envoy::api::v2::route::VirtualHost::NONE,
envoy::api::v2::route::RouteAction::RetryPolicy retry_policy = {});

// Add an HTTP filter prior to existing filters.
void addFilter(const std::string& filter_yaml);
Expand Down
14 changes: 14 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ envoy_cc_test(
],
)

envoy_cc_test_library(
name = "test_host_predicate_lib",
srcs = [
"test_host_predicate.h",
"test_host_predicate_config.cc",
"test_host_predicate_config.h",
],
deps = [
"//include/envoy/registry",
"//include/envoy/upstream:retry_interface",
],
)

envoy_cc_test_library(
name = "add_trailers_filter_config_lib",
srcs = [
Expand Down Expand Up @@ -205,6 +218,7 @@ envoy_cc_test_library(
deps = [
":add_trailers_filter_config_lib",
":integration_lib",
":test_host_predicate_lib",
"//source/extensions/filters/http/router:config",
"//source/extensions/filters/network/http_connection_manager:config",
"//test/common/upstream:utility_lib",
Expand Down
2 changes: 2 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ TEST_P(Http2IntegrationTest, HittingEncoderFilterLimit) { testHittingEncoderFilt

TEST_P(Http2IntegrationTest, GrpcRouterNotFound) { testGrpcRouterNotFound(); }

TEST_P(Http2IntegrationTest, RetryHostPredicateFilter) { testRetryHostPredicateFilter(); }

TEST_P(Http2IntegrationTest, GrpcRetry) { testGrpcRetry(); }

// Send a request with overly large headers, and ensure it results in stream reset.
Expand Down
Loading