Skip to content

Commit

Permalink
Second attempt: client channel: don't hold mutexes while calling the …
Browse files Browse the repository at this point in the history
…ConfigSelector or the LB picker (#32326)

Original attempt was #31973, reverted in #32324 due to test flakiness.

There were two problems causing test flakiness here.

The first problem was that, upon resolver error, we were dispatching an
async callback to re-process each of the queued picks *before* we
updated the channel's connectivity state, which meant that the queued
picks might be re-processed in another thread before the new
connectivity state was set, so tests that expected the state to be
TRANSIENT_FAILURE once RPCs failed might not see the expected state.

The second problem affected the xDS ring hash tests, and it's a bit more
involved to explain.

We have an e2e test that simulates an aggregate cluster failover from a
primary cluster using ring_hash at startup. The primary cluster has two
addresses, both of which are unreachable when the client starts up, so
the client should immediately fail over to the secondary cluster, which
does have reachable endpoints. The test requires that no RPCs are failed
while this failover occurs. The original PR made this test flaky.

The problem here was caused by a combination of two factors:

1. Prior to the original PR, when the picker was updated (which happens
inside the WorkSerializer), we re-processed previously queued picks
synchronously, so it was not possible for another subchannel
connectivity state update (which also happens in the WorkSerializer) to
be processed between the time that we updated the picker and the time
that we re-processed the previously queued picks. The original PR
changed this such that the queued picks are re-processed asynchronously
(outside of the WorkSerializer), so it is now possible for a subchannel
connectivity state update to be processed between when the picker is
updated and when we re-process the previously queued picks.

2. Unlike most LB policies, where the picker does not see updated
subchannel connectivity states until a new picker is created, the
ring_hash picker gets the subchannel connectivity states from the LB
policy via a lock, so it can wind up seeing the new states before it
gets updated. This means that when a subchannel connectivity state
update is processed by the ring_hash policy in the WorkSerializer, it
will immediately be seen by the existing picker, even without a picker
update.

With those two points in mind, the sequence of events in the failing
test were as follows:

1. The pick is attempted in the ring_hash picker for the primary
cluster. This causes the first subchannel to attempt to connect.
2. The subchannel transitions from IDLE to CONNECTING. A new picker is
returned due to the subchannel connectivity state change, and the
channel retries the queued pick. The retried pick is done
asynchronously, but in this case it does not matter: the call will be
re-queued.
3. The connection attempt fails, and the subchannel reports
TRANSIENT_FAILURE. A new picker is again returned, and the channel
retries the queued pick. The retried pick is done asynchronously, but in
this case it does not matter: this causes the picker to trigger a
connection attempt for the second subchannel.
4. The second subchannel transitions from IDLE to CONNECTING. A new
picker is again returned, and the channel retries the queued pick. The
retried pick is done asynchronously, and in this case it *does* matter.
5. The second subchannel now transitions to TRANSIENT_FAILURE. The
ring_hash policy will now report TRANSIENT_FAILURE, but before it can
finish that...
6. ...In another thread, the channel now tries to re-process the queued
pick using the CONNECTING picker from step 4. However, because the
ring_hash policy has already seen the TRANSIENT_FAILURE report from the
second subchannel, that picker will now fail the pick instead of queuing
it.

After discussion with @ejona86 and @dfawley (since this bug actually
exists in Java and Go as well), we agreed that the right solution is to
change the ring_hash picker to contain its own copy of the subchannel
connectivity state information, rather than sharing that information
with the LB policy using synchronization.
  • Loading branch information
markdroth committed Feb 16, 2023
1 parent 6589340 commit 8249fc1
Show file tree
Hide file tree
Showing 14 changed files with 579 additions and 691 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2785,6 +2785,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_set",
"absl/container:inlined_vector",
"absl/status",
"absl/status:statusor",
Expand Down
6 changes: 6 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2495,6 +2495,7 @@ grpc_cc_library(
srcs = ["lib/load_balancing/lb_policy.cc"],
hdrs = ["lib/load_balancing/lb_policy.h"],
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/status:statusor",
"absl/strings",
Expand All @@ -2514,6 +2515,7 @@ grpc_cc_library(
"//:debug_location",
"//:event_engine_base_hdrs",
"//:exec_ctx",
"//:gpr",
"//:gpr_platform",
"//:grpc_trace",
"//:orphanable",
Expand Down Expand Up @@ -3849,6 +3851,7 @@ grpc_cc_library(
"absl/base:core_headers",
"absl/functional:bind_front",
"absl/memory",
"absl/random",
"absl/status",
"absl/status:statusor",
"absl/strings",
Expand Down Expand Up @@ -4421,6 +4424,7 @@ grpc_cc_library(
"json_object_loader",
"lb_policy",
"lb_policy_factory",
"ref_counted",
"subchannel_interface",
"unique_type_name",
"validation_errors",
Expand All @@ -4445,6 +4449,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/round_robin/round_robin.cc",
],
external_deps = [
"absl/random",
"absl/status",
"absl/status:statusor",
"absl/strings",
Expand Down Expand Up @@ -4631,6 +4636,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc",
],
external_deps = [
"absl/base:core_headers",
"absl/random",
"absl/status",
"absl/status:statusor",
Expand Down
781 changes: 352 additions & 429 deletions src/core/ext/filters/client_channel/client_channel.cc

Large diffs are not rendered by default.

85 changes: 27 additions & 58 deletions src/core/ext/filters/client_channel/client_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
#include <atomic>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>

#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
Expand Down Expand Up @@ -222,15 +222,6 @@ class ClientChannel {
std::atomic<bool> done_{false};
};

struct ResolverQueuedCall {
grpc_call_element* elem;
ResolverQueuedCall* next = nullptr;
};
struct LbQueuedCall {
LoadBalancedCall* lb_call;
LbQueuedCall* next = nullptr;
};

ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error);
~ClientChannel();

Expand All @@ -246,6 +237,9 @@ class ClientChannel {
// Note: All methods with "Locked" suffix must be invoked from within
// work_serializer_.

void ReprocessQueuedResolverCalls()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&resolution_mu_);

void OnResolverResultChangedLocked(Resolver::Result result)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void OnResolverErrorLocked(absl::Status status)
Expand Down Expand Up @@ -284,20 +278,6 @@ class ClientChannel {

void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);

// These methods all require holding resolution_mu_.
void AddResolverQueuedCall(ResolverQueuedCall* call,
grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);

// These methods all require holding data_plane_mu_.
void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);

//
// Fields set at construction and never modified.
//
Expand All @@ -316,9 +296,9 @@ class ClientChannel {
// Fields related to name resolution. Guarded by resolution_mu_.
//
mutable Mutex resolution_mu_;
// Linked list of calls queued waiting for resolver result.
ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) =
nullptr;
// List of calls queued waiting for resolver result.
absl::flat_hash_set<grpc_call_element*> resolver_queued_calls_
ABSL_GUARDED_BY(resolution_mu_);
// Data from service config.
absl::Status resolver_transient_failure_error_
ABSL_GUARDED_BY(resolution_mu_);
Expand All @@ -330,13 +310,13 @@ class ClientChannel {
ABSL_GUARDED_BY(resolution_mu_);

//
// Fields used in the data plane. Guarded by data_plane_mu_.
// Fields related to LB picks. Guarded by lb_mu_.
//
mutable Mutex data_plane_mu_;
mutable Mutex lb_mu_;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
ABSL_GUARDED_BY(data_plane_mu_);
// Linked list of calls queued waiting for LB pick.
LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr;
ABSL_GUARDED_BY(lb_mu_);
absl::flat_hash_set<LoadBalancedCall*> lb_queued_calls_
ABSL_GUARDED_BY(lb_mu_);

//
// Fields used in the control plane. Guarded by work_serializer.
Expand All @@ -360,7 +340,7 @@ class ClientChannel {
// The set of SubchannelWrappers that currently exist.
// No need to hold a ref, since the map is updated in the control-plane
// work_serializer when the SubchannelWrappers are created and destroyed.
std::set<SubchannelWrapper*> subchannel_wrappers_
absl::flat_hash_set<SubchannelWrapper*> subchannel_wrappers_
ABSL_GUARDED_BY(*work_serializer_);
int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1;
grpc_error_handle disconnect_error_ ABSL_GUARDED_BY(*work_serializer_);
Expand Down Expand Up @@ -422,16 +402,11 @@ class ClientChannel::LoadBalancedCall

void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);

// Invoked by channel for queued LB picks when the picker is updated.
static void PickSubchannel(void* arg, grpc_error_handle error);
// Helper function for performing an LB pick while holding the data plane
// mutex. Returns true if the pick is complete, in which case the caller
// must invoke PickDone() or AsyncPickDone() with the returned error.
bool PickSubchannelLocked(grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
// Schedules a callback to process the completed pick. The callback
// will not run until after this method returns.
void AsyncPickDone(grpc_error_handle error);
void PickSubchannel(bool was_queued);

// Called by channel when removing a call from the list of queued calls.
void RemoveCallFromLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);

RefCountedPtr<SubchannelCall> subchannel_call() const {
return subchannel_call_;
Expand Down Expand Up @@ -479,14 +454,14 @@ class ClientChannel::LoadBalancedCall
void RecordCallCompletion(absl::Status status);

void CreateSubchannelCall();
// Invoked when a pick is completed, on both success or failure.
static void PickDone(void* arg, grpc_error_handle error);
// Removes the call from the channel's list of queued picks if present.
void MaybeRemoveCallFromLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);

// Helper function for performing an LB pick with a specified picker.
// Returns true if the pick is complete.
bool PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker* picker,
grpc_error_handle* error);
// Adds the call to the channel's list of queued picks if not already present.
void MaybeAddCallToLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
void AddCallToLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);

ClientChannel* chand_;

Expand All @@ -513,15 +488,9 @@ class ClientChannel::LoadBalancedCall
// Set when we fail inside the LB call.
grpc_error_handle failure_error_;

grpc_closure pick_closure_;

// Accessed while holding ClientChannel::data_plane_mu_.
ClientChannel::LbQueuedCall queued_call_
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_);
bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) =
false;
// Accessed while holding ClientChannel::lb_mu_.
LbQueuedCallCanceller* lb_call_canceller_
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr;
ABSL_GUARDED_BY(&ClientChannel::lb_mu_) = nullptr;

RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
const BackendMetricData* backend_metric_data_ = nullptr;
Expand Down
17 changes: 7 additions & 10 deletions src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include <string.h>

#include <algorithm>
#include <atomic>
#include <initializer_list>
#include <map>
#include <memory>
Expand Down Expand Up @@ -389,19 +390,15 @@ class GrpcLb : public LoadBalancingPolicy {
// Returns the LB token to use for a drop, or null if the call
// should not be dropped.
//
// Note: This is called from the picker, so it will be invoked in
// the channel's data plane mutex, NOT the control plane
// work_serializer. It should not be accessed by any other part of the LB
// policy.
// Note: This is called from the picker, NOT from inside the control
// plane work_serializer.
const char* ShouldDrop();

private:
std::vector<GrpcLbServer> serverlist_;

// Guarded by the channel's data plane mutex, NOT the control
// plane work_serializer. It should not be accessed by anything but the
// picker via the ShouldDrop() method.
size_t drop_index_ = 0;
// Accessed from the picker, so needs synchronization.
std::atomic<size_t> drop_index_{0};
};

class Picker : public SubchannelPicker {
Expand Down Expand Up @@ -717,8 +714,8 @@ bool GrpcLb::Serverlist::ContainsAllDropEntries() const {

const char* GrpcLb::Serverlist::ShouldDrop() {
if (serverlist_.empty()) return nullptr;
GrpcLbServer& server = serverlist_[drop_index_];
drop_index_ = (drop_index_ + 1) % serverlist_.size();
size_t index = drop_index_.fetch_add(1, std::memory_order_relaxed);
GrpcLbServer& server = serverlist_[index % serverlist_.size()];
return server.drop ? server.load_balance_token : nullptr;
}

Expand Down
Loading

0 comments on commit 8249fc1

Please sign in to comment.