Skip to content

Commit

Permalink
kvcoord: retry ReplicaUnavailableError against other replicas
Browse files Browse the repository at this point in the history
Previously, a `ReplicaUnavailableError` returned by a tripped replica
circuit breaker was not retried by the DistSender, instead failing fast
back to the client. This was sort of by design, since these errors are
intended to avoid clients getting stuck in infinite retry loops.

However, a single replica having a tripped circuit breaker does not
imply that the range is unavailable -- it's possible that e.g. this
replica is partially partitioned away from the quorum, or is a stale
replica waiting to be caught up. If a DistSender attempts this replica
first, e.g. because it was the last-known leaseholder or it's the lowest
latency replica, then it will always error out, giving the appearance of
an unavailable range to the client.

This patch adds DistSender retry handling of `ReplicaUnavailableError`,
which attempts to contact other replicas in case a functional lease
exists elsewhere, while also taking care to avoid retry loops where
other replicas return a `NotLeaseHolderError` pointing back to the
unavailable replica.

Epic: none
Release note (bug fix): if an individual replica's circuit breaker had
tripped but the range was otherwise functional, e.g. because the replica
was partially partitioned away from the leaseholder, it was possible for
a gateway to persistently error when contacting this replica instead of
retrying against a functional leaseholder elsewhere. The gateway will
now retry such errors against other replicas once.
  • Loading branch information
erikgrinaker committed Feb 19, 2024
1 parent 78485a3 commit f94e018
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 6 deletions.
90 changes: 84 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2205,11 +2205,14 @@ func maybeSetResumeSpan(
// ambiguousErr, if not nil, is the error we got from the first attempt when the
// success of the request cannot be ruled out by the error. lastAttemptErr is
// the error that the last attempt to execute the request returned.
func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
func noMoreReplicasErr(ambiguousErr, replicaUnavailableErr, lastAttemptErr error) error {
if ambiguousErr != nil {
return kvpb.NewAmbiguousResultErrorf("error=%v [exhausted] (last error: %v)",
ambiguousErr, lastAttemptErr)
}
if replicaUnavailableErr != nil {
return replicaUnavailableErr
}

// TODO(bdarnell): The error from the last attempt is not necessarily the best
// one to return; we may want to remember the "best" error we've seen (for
Expand Down Expand Up @@ -2361,7 +2364,7 @@ func (ds *DistSender) sendToReplicas(

// This loop will retry operations that fail with errors that reflect
// per-replica state and may succeed on other replicas.
var ambiguousError error
var ambiguousError, replicaUnavailableError error
var br *kvpb.BatchResponse
attempts := int64(0)
for first := true; ; first, attempts = false, attempts+1 {
Expand All @@ -2377,7 +2380,7 @@ func (ds *DistSender) sendToReplicas(
if lastErr == nil && br != nil {
lastErr = br.Error.GoError()
}
err = skipStaleReplicas(transport, routing, ambiguousError, lastErr)
err = skipStaleReplicas(transport, routing, ambiguousError, replicaUnavailableError, lastErr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2591,6 +2594,70 @@ func (ds *DistSender) sendToReplicas(
// We'll try other replicas which typically gives us the leaseholder, either
// via the NotLeaseHolderError or nil error paths, both of which update the
// leaseholder in the range cache.
case *kvpb.ReplicaUnavailableError:
// The replica's circuit breaker is tripped. This only means that this
// replica is unable to propose writes -- the range may or may not be
// available with a quorum elsewhere (e.g. in the case of a partial
// network partition or stale replica). There are several possibilities:
//
// 0. This replica knows about a valid leaseholder elsewhere. It will
// return a NLHE instead of a RUE even with a tripped circuit
// breaker, so we'll take that branch instead and retry the
// leaseholder. We list this case explicitly, as a reminder.
//
// 1. This replica is the current leaseholder. The range cache can't
// tell us with certainty who the leaseholder is, so we try other
// replicas in case a lease exists elsewhere, but if we get an NLHE
// pointing back to this one we fail fast.
//
// 2. This replica is the current Raft leader, but it has lost quorum
// (prior to stepping down via CheckQuorum). We go on to try other
// replicas, which may return a NLHE pointing back to the leader
// instead of attempting to acquire a lease, which we'll fail fast.
//
// 3. This replica does not know about a current quorum or leaseholder,
// but one does exist elsewhere. Try other replicas to discover it,
// but error out if it's unreachable.
//
// 4. There is no quorum nor lease. Error out after trying all replicas.
//
// To handle these cases, we track RUEs in *replicaUnavailableError.
// This contains either:
//
// - The last RUE we received from a supposed leaseholder, as given by
// the range cache via routing.Leaseholder() at the time of the error.
//
// - Otherwise, the first RUE we received from any replica.
//
// If, when retrying a later replica, we receive a NLHE pointing to the
// same replica as the RUE, we ignore the NLHE and move on to the next
// replica. This also handles the case where a NLHE points to a new
// leaseholder and that leaseholder returns RUE, in which case the next
// NLHE will error out.
//
// Otherwise, if no NLHE points back to the RUE, we will return the RUE
// when we've tried all replicas unsuccessfully.
//
// NB: we can't return tErr directly, because GetDetail() strips error
// marks from the error (e.g. ErrBreakerOpen).
if !tErr.Replica.IsSame(curReplica) {
// The ReplicaUnavailableError may have been proxied via this replica.
// This can happen e.g. if the replica has to access a txn record on a
// different range during intent resolution, and that range returns a
// RUE. In this case we just return it directly, as br.Error.
return br, nil
} else if replicaUnavailableError == nil {
// This is the first time we see a RUE. Record it, such that we'll
// return it if all other replicas fail (regardless of error).
replicaUnavailableError = br.Error.GoError()
} else if lh := routing.Leaseholder(); lh != nil && lh.IsSame(curReplica) {
// This error came from the supposed leaseholder. Record it, such that
// subsequent NLHEs pointing back to this one can be ignored instead
// of getting stuck in a retry loop. This ensures we'll eventually
// error out when the transport is exhausted even if multiple replicas
// return NLHEs to different replicas all returning RUEs.
replicaUnavailableError = br.Error.GoError()
}
case *kvpb.NotLeaseHolderError:
ds.metrics.NotLeaseHolderErrCount.Inc(1)
// If we got some lease information, we use it. If not, we loop around
Expand All @@ -2608,14 +2675,22 @@ func (ds *DistSender) sendToReplicas(
// retry. Note that the leaseholder might not be the one indicated by
// the NLHE we just received, in case that error carried stale info.
if lh := routing.Leaseholder(); lh != nil {
// If we've already tried this replica and it's unavailable due to
// a tripped replica circuit breaker, skip it to avoid loops.
var lhRUE bool
if err := replicaUnavailableError; err != nil {
if rue := (*kvpb.ReplicaUnavailableError)(nil); errors.As(err, &rue) {
lhRUE = lh.IsSame(rue.Replica)
}
}
// If the leaseholder is the replica that we've just tried, and
// we've tried this replica a bunch of times already, let's move on
// and not try it again. This prevents us getting stuck on a replica
// that we think has the lease but keeps returning redirects to us
// (possibly because it hasn't applied its lease yet). Perhaps that
// lease expires and someone else gets a new one, so by moving on we
// get out of possibly infinite loops.
if !lh.IsSame(curReplica) || sameReplicaRetries < sameReplicaRetryLimit {
if (!lh.IsSame(curReplica) || sameReplicaRetries < sameReplicaRetryLimit) && !lhRUE {
moved := transport.MoveToFront(*lh)
if !moved {
// The transport always includes the client's view of the
Expand Down Expand Up @@ -2888,7 +2963,9 @@ func (ds *DistSender) AllRangeSpans(
//
// Returns an error if the transport is exhausted.
func skipStaleReplicas(
transport Transport, routing rangecache.EvictionToken, ambiguousError error, lastErr error,
transport Transport,
routing rangecache.EvictionToken,
ambiguousError, replicaUnavailableError, lastErr error,
) error {
// Check whether the range cache told us that the routing info we had is
// very out-of-date. If so, there's not much point in trying the other
Expand All @@ -2898,12 +2975,13 @@ func skipStaleReplicas(
if !routing.Valid() {
return noMoreReplicasErr(
ambiguousError,
nil, // ignore the replicaUnavailableError, retry with new routing info
errors.Wrap(lastErr, "routing information detected to be stale"))
}

for {
if transport.IsExhausted() {
return noMoreReplicasErr(ambiguousError, lastErr)
return noMoreReplicasErr(ambiguousError, replicaUnavailableError, lastErr)
}

if _, ok := routing.Desc().GetReplicaDescriptorByID(transport.NextReplica().ReplicaID); ok {
Expand Down
204 changes: 204 additions & 0 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,210 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) {
}
}

// This tests that if the DistSender encounters individual replicas with
// ReplicaUnavailableError (i.e. tripped replica circuit breakers), then it will
// go on to try other replicas when possible (e.g. when there is a quorum
// elsewhere), but not get stuck in infinite retry loops (e.g. when replicas
// return NLHEs that point to a leaseholder or Raft leader which is unavailable,
// or when all replicas are unavailable).
//
// It does not use setupCircuitBreakerTest, which assumes only 2 nodes.
func TestReplicaCircuitBreaker_Partial_Retry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timing-sensitive.
skip.UnderRace(t)
skip.UnderDeadlock(t)

// Use a context timeout, to prevent test hangs on failures.
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

// Always use expiration-based leases, such that the lease will expire when we
// partition off Raft traffic on n3.
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true)

// Use a manual clock, so we can expire leases at will.
manualClock := hlc.NewHybridManualClock()

// Set up a 3-node cluster.
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
},
Settings: st,
RaftConfig: base.RaftConfig{
// Speed up the test.
RaftTickInterval: 200 * time.Millisecond,
RaftElectionTimeoutTicks: 5,
RaftHeartbeatIntervalTicks: 1,
},
},
})
defer tc.Stopper().Stop(ctx)

n1 := tc.Server(0)
n2 := tc.Server(1)
n3 := tc.Server(2)
db1 := n1.ApplicationLayer().DB()
db2 := n2.ApplicationLayer().DB()
db3 := n3.ApplicationLayer().DB()
dbs := []*kv.DB{db1, db2, db3}

// Specify the key and value to use.
prefix := append(n1.ApplicationLayer().Codec().TenantPrefix(), keys.ScratchRangeMin...)
key := append(prefix.Clone(), []byte("/foo")...)
value := []byte("bar")

// Split off a range and upreplicate it.
_, _, err := n1.StorageLayer().SplitRange(prefix)
require.NoError(t, err)
desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...)
t.Logf("split off range %s", desc)

repl1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(prefix))
repl2 := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(prefix))
repl3 := tc.GetFirstStoreFromServer(t, 2).LookupReplica(roachpb.RKey(prefix))
repls := []*kvserver.Replica{repl1, repl2, repl3}

// Set up test helpers.
requireRUEs := func(t *testing.T, dbs []*kv.DB) {
t.Helper()
for _, db := range dbs {
err := db.Put(ctx, key, value)
require.Error(t, err)
require.True(t, errors.HasType(err, (*kvpb.ReplicaUnavailableError)(nil)),
"expected ReplicaUnavailableError, got %v", err)
}
t.Logf("writes failed with ReplicaUnavailableError")
}

requireNoRUEs := func(t *testing.T, dbs []*kv.DB) {
t.Helper()
for _, db := range dbs {
require.NoError(t, db.Put(ctx, key, value))
}
t.Logf("writes succeeded")
}

// Move the leaseholder to n3, and wait for it to become the Raft leader too.
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(2))
t.Logf("transferred range lease to n3")

require.Eventually(t, func() bool {
for _, repl := range repls {
if repl.RaftStatus().Lead != 3 {
return false
}
}
return true
}, 5*time.Second, 100*time.Millisecond)
t.Logf("transferred raft leadership to n3")

requireNoRUEs(t, dbs)

// Partition Raft traffic on n3 away from n1 and n2, and eagerly trip its
// breaker. Note that we don't partition RPC traffic, such that client
// requests and node liveness heartbeats still succeed.
partitioned := &atomic.Bool{}
partitioned.Store(true)
dropRaftMessagesFrom(t, n1, desc.RangeID, []roachpb.ReplicaID{3}, partitioned)
dropRaftMessagesFrom(t, n2, desc.RangeID, []roachpb.ReplicaID{3}, partitioned)
dropRaftMessagesFrom(t, n3, desc.RangeID, []roachpb.ReplicaID{1, 2}, partitioned)
t.Logf("partitioned n3 raft traffic from n1 and n2")

repl3.TripBreaker()
t.Logf("tripped n3 circuit breaker")

// While n3 is the leaseholder, all gateways should return RUE.
requireRUEs(t, dbs)

// Expire the lease, but not Raft leadership. All gateways should still return
// RUE, since followers return NLHE pointing to the Raft leader, and it will
// return RUE.
lease, _ := repl3.GetLease()
manualClock.Forward(lease.Expiration.WallTime)
t.Logf("expired n3 lease")

requireRUEs(t, dbs)

// Wait for the leadership to move. Writes should now succeed -- they will
// initially go to n3, the previous leaseholder, but it will return NLHE. The
// DistSender will retry the other replicas, which eventually acquire a new
// lease and serve the write.
var leader uint64
require.Eventually(t, func() bool {
for _, repl := range repls {
if l := repl.RaftStatus().Lead; l == 3 {
return false
} else if l > 0 {
leader = l
}
}
return true
}, 5*time.Second, 100*time.Millisecond)
t.Logf("raft leadership moved to n%d", leader)

requireNoRUEs(t, dbs)

// Also partition n1 and n2 away from each other, and trip their breakers. All
// nodes are now completely partitioned away from each other.
dropRaftMessagesFrom(t, n1, desc.RangeID, []roachpb.ReplicaID{2, 3}, partitioned)
dropRaftMessagesFrom(t, n2, desc.RangeID, []roachpb.ReplicaID{1, 3}, partitioned)

repl1.TripBreaker()
repl2.TripBreaker()
t.Logf("partitioned all nodes and tripped their breakers")

// n1 or n2 still has the lease. Writes should return a
// ReplicaUnavailableError.
requireRUEs(t, dbs)

// Expire the lease, but not raft leadership. Writes should still error
// because the leader's circuit breaker is tripped.
lease, _ = repl1.GetLease()
manualClock.Forward(lease.Expiration.WallTime)
t.Logf("expired n%d lease", lease.Replica.ReplicaID)

requireRUEs(t, dbs)

// Wait for raft leadership to expire. Writes should error after the
// DistSender attempts all 3 replicas and they all fail.
require.Eventually(t, func() bool {
for _, repl := range repls {
if repl.RaftStatus().Lead != 0 {
return false
}
}
return true
}, 5*time.Second, 100*time.Millisecond)
t.Logf("no raft leader")

requireRUEs(t, dbs)

// Recover the partition. Writes should soon recover.
partitioned.Store(false)
t.Logf("partitioned healed")

require.Eventually(t, func() bool {
for _, db := range dbs {
if err := db.Put(ctx, key, value); err != nil {
return false
}
}
return true
}, 5*time.Second, 100*time.Millisecond)
t.Logf("writes succeeded")

require.NoError(t, ctx.Err())
}

// Test infrastructure below.

func makeBreakerToggleable(b *circuit.Breaker) (setProbeEnabled func(bool)) {
Expand Down

0 comments on commit f94e018

Please sign in to comment.