Skip to content

Commit

Permalink
kvserver: add timeout for lease acquisitions
Browse files Browse the repository at this point in the history
This patch adds a timeout for lease acquisitions. It is set to
twice the Raft election timeout (6 seconds total), since it may need to
hold a Raft election and repropose the lease acquisition command, each
of which may take up to one election timeout.

Without this timeout, it's possible for a lease acquisition to stall
indefinitely (e.g. in the case of a stalled disk). This prevents a
`NotLeaseHolderError` from being returned to the client DistSender,
which in turn prevents it from trying other replicas that could acquire
the lease instead. This can cause a lease to remain invalid forever.

Release note (bug fix): Fixed a bug where an unresponsive node (e.g.
with a stalled disk) could prevent other nodes from acquiring its
leases, effectively stalling these ranges until the node was shut down
or recovered.
  • Loading branch information
erikgrinaker committed May 24, 2022
1 parent 7100ff9 commit ab74b97
Showing 1 changed file with 154 additions and 112 deletions.
266 changes: 154 additions & 112 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -313,10 +314,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
status kvserverpb.LeaseStatus,
leaseReq roachpb.Request,
) error {
// Create a new context *without* a timeout. Instead, we multiplex the
// cancellation of all contexts onto this new one, only canceling it if all
// coalesced requests timeout/cancel. p.cancelLocked (defined below) is the
// cancel function that must be called; calling just cancel is insufficient.
// Create a new context. We multiplex the cancellation of all contexts onto
// this new one, canceling it if all coalesced requests timeout/cancel.
// p.cancelLocked (defined below) is the cancel function that must be called;
// calling just cancel is insufficient.
ctx := p.repl.AnnotateCtx(context.Background())

const opName = "request range lease"
Expand Down Expand Up @@ -348,10 +349,15 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
p.nextLease = roachpb.Lease{}
}

// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * p.repl.store.cfg.RaftElectionTimeout()

const taskName = "pendingLeaseRequest: requesting lease"
err := p.repl.store.Stopper().RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: "pendingLeaseRequest: requesting lease",
TaskName: taskName,
// Trace the lease acquisition as a child even though it might outlive the
// parent in case the parent's ctx is canceled. Other requests might
// later block on this lease acquisition too, and we can't include the
Expand All @@ -362,116 +368,22 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
func(ctx context.Context) {
defer sp.Finish()

// If requesting an epoch-based lease & current state is expired,
// potentially heartbeat our own liveness or increment epoch of
// prior owner. Note we only do this if the previous lease was
// epoch-based.
var pErr *roachpb.Error
if reqLease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED &&
status.Lease.Type() == roachpb.LeaseEpoch {
var err error
// If this replica is previous & next lease holder, manually heartbeat to become live.
if status.OwnedBy(nextLeaseHolder.StoreID) &&
p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
} else if status.Liveness.Epoch == status.Lease.Epoch {
// If not owner, increment epoch if necessary to invalidate lease.
// However, we only do so in the event that the next leaseholder is
// considered live at this time. If not, there's no sense in
// incrementing the expired leaseholder's epoch.
if live, liveErr := p.repl.store.cfg.NodeLiveness.IsLive(nextLeaseHolder.NodeID); !live || liveErr != nil {
if liveErr != nil {
err = errors.Wrapf(liveErr, "not incrementing epoch on n%d because next leaseholder (n%d) not live",
status.Liveness.NodeID, nextLeaseHolder.NodeID)
} else {
err = errors.Errorf("not incrementing epoch on n%d because next leaseholder (n%d) not live (err = nil)",
status.Liveness.NodeID, nextLeaseHolder.NodeID)
}
log.VEventf(ctx, 1, "%v", err)
} else if err = p.repl.store.cfg.NodeLiveness.IncrementEpoch(ctx, status.Liveness); err != nil {
// If we get ErrEpochAlreadyIncremented, someone else beat
// us to it. This proves that the target node is truly
// dead *now*, but it doesn't prove that it was dead at
// status.Timestamp (which we've encoded into our lease
// request). It's possible that the node was temporarily
// considered dead but revived without having its epoch
// incremented, i.e. that it was in fact live at
// status.Timestamp.
//
// It would be incorrect to simply proceed to sending our
// lease request since our lease.Start may precede the
// effective end timestamp of the predecessor lease (the
// expiration of the last successful heartbeat before the
// epoch increment), and so under this lease this node's
// timestamp cache would not necessarily reflect all reads
// served by the prior leaseholder.
//
// It would be correct to bump the timestamp in the lease
// request and proceed, but that just sets up another race
// between this node and the one that already incremented
// the epoch. They're probably going to beat us this time
// too, so just return the NotLeaseHolderError here
// instead of trying to fix up the timestamps and submit
// the lease request.
//
// ErrEpochAlreadyIncremented is not an unusual situation,
// so we don't log it as an error.
//
// https://github.com/cockroachdb/cockroach/issues/35986
if !errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
log.Errorf(ctx, "failed to increment leaseholder's epoch: %s", err)
}
}
}
// Set error for propagation to all waiters below. Don't include the
// previous lease, which we know isn't valid. In particular, if it was
// ours but we failed to reacquire it (e.g. because our heartbeat failed
// due to a stalled disk) then we don't want DistSender to retry us.
if err != nil {
pErr = roachpb.NewError(newNotLeaseHolderError(
roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err)))
}
}
// Run the lease acquisition request with a timeout. We must eventually
// return a NotLeaseHolderError rather than hanging, otherwise we could
// prevent the caller from nudging a different replica into acquiring the
// lease.
err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error {
return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq)
})
// Error will be handled below.

// Send the RequestLeaseRequest or TransferLeaseRequest and wait for the new
// lease to be applied.
if pErr == nil {
// The Replica circuit breakers together with round-tripping a ProbeRequest
// here before asking for the lease could provide an alternative, simpler
// solution to the below issue:
//
// https://github.com/cockroachdb/cockroach/issues/37906
ba := roachpb.BatchRequest{}
ba.Timestamp = p.repl.store.Clock().Now()
ba.RangeID = p.repl.RangeID
// NB:
// RequestLease always bypasses the circuit breaker (i.e. will prefer to
// get stuck on an unavailable range rather than failing fast; see
// `(*RequestLeaseRequest).flags()`). This enables the caller to chose
// between either behavior for themselves: if they too want to bypass
// the circuit breaker, they simply don't check for the circuit breaker
// while waiting for their lease handle. If they want to fail-fast, they
// do. If the lease instead adopted the caller's preference, we'd have
// to handle the case of multiple preferences joining onto one lease
// request, which is more difficult.
//
// TransferLease will observe the circuit breaker, as transferring a
// lease when the range is unavailable results in, essentially, giving
// up on the lease and thus worsening the situation.
ba.Add(leaseReq)
_, pErr = p.repl.Send(ctx, ba)
}
// We reset our state below regardless of whether we've gotten an error or
// not, but note that an error is ambiguous - there's no guarantee that the
// transfer will not still apply. That's OK, however, as the "in transfer"
// state maintained by the pendingLeaseRequest is not relied on for
// correctness (see repl.mu.minLeaseProposedTS), and resetting the state
// is beneficial as it'll allow the replica to attempt to transfer again or
// extend the existing lease in the future.

p.repl.mu.Lock()
defer p.repl.mu.Unlock()
if ctx.Err() != nil {
Expand All @@ -480,15 +392,14 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// active so we don't want to do anything else.
return
}

// Send result of lease to all waiter channels and cleanup request.
for llHandle := range p.llHandles {
// Don't send the same transaction object twice; this can lead to races.
if pErr != nil {
pErrClone := *pErr
if err != nil {
pErr := roachpb.NewError(err)
// TODO(tbg): why?
pErrClone.SetTxn(pErr.GetTxn())
llHandle.resolve(&pErrClone)
pErr.SetTxn(pErr.GetTxn())
llHandle.resolve(pErr)
} else {
llHandle.resolve(nil)
}
Expand All @@ -504,6 +415,118 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
return nil
}

// requestLease sends a synchronous transfer lease or lease request to the
// specified replica. It is only meant to be called from requestLeaseAsync,
// since it does not coordinate with other in-flight lease requests.
func (p *pendingLeaseRequest) requestLease(
ctx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
reqLease roachpb.Lease,
status kvserverpb.LeaseStatus,
leaseReq roachpb.Request,
) error {
// If requesting an epoch-based lease & current state is expired,
// potentially heartbeat our own liveness or increment epoch of
// prior owner. Note we only do this if the previous lease was
// epoch-based.
if reqLease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED &&
status.Lease.Type() == roachpb.LeaseEpoch {
var err error
// If this replica is previous & next lease holder, manually heartbeat to become live.
if status.OwnedBy(nextLeaseHolder.StoreID) &&
p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
} else if status.Liveness.Epoch == status.Lease.Epoch {
// If not owner, increment epoch if necessary to invalidate lease.
// However, we only do so in the event that the next leaseholder is
// considered live at this time. If not, there's no sense in
// incrementing the expired leaseholder's epoch.
if live, liveErr := p.repl.store.cfg.NodeLiveness.IsLive(nextLeaseHolder.NodeID); !live || liveErr != nil {
if liveErr != nil {
err = errors.Wrapf(liveErr, "not incrementing epoch on n%d because next leaseholder (n%d) not live",
status.Liveness.NodeID, nextLeaseHolder.NodeID)
} else {
err = errors.Errorf("not incrementing epoch on n%d because next leaseholder (n%d) not live (err = nil)",
status.Liveness.NodeID, nextLeaseHolder.NodeID)
}
log.VEventf(ctx, 1, "%v", err)
} else if err = p.repl.store.cfg.NodeLiveness.IncrementEpoch(ctx, status.Liveness); err != nil {
// If we get ErrEpochAlreadyIncremented, someone else beat
// us to it. This proves that the target node is truly
// dead *now*, but it doesn't prove that it was dead at
// status.Timestamp (which we've encoded into our lease
// request). It's possible that the node was temporarily
// considered dead but revived without having its epoch
// incremented, i.e. that it was in fact live at
// status.Timestamp.
//
// It would be incorrect to simply proceed to sending our
// lease request since our lease.Start may precede the
// effective end timestamp of the predecessor lease (the
// expiration of the last successful heartbeat before the
// epoch increment), and so under this lease this node's
// timestamp cache would not necessarily reflect all reads
// served by the prior leaseholder.
//
// It would be correct to bump the timestamp in the lease
// request and proceed, but that just sets up another race
// between this node and the one that already incremented
// the epoch. They're probably going to beat us this time
// too, so just return the NotLeaseHolderError here
// instead of trying to fix up the timestamps and submit
// the lease request.
//
// ErrEpochAlreadyIncremented is not an unusual situation,
// so we don't log it as an error.
//
// https://github.com/cockroachdb/cockroach/issues/35986
if !errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
log.Errorf(ctx, "failed to increment leaseholder's epoch: %s", err)
}
}
}
if err != nil {
// Return an NLHE with an empty lease, since we know the previous lease
// isn't valid. In particular, if it was ours but we failed to reacquire
// it (e.g. because our heartbeat failed due to a stalled disk) then we
// don't want DistSender to retry us.
return newNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
}

// Send the RequestLeaseRequest or TransferLeaseRequest and wait for the new
// lease to be applied.
//
// The Replica circuit breakers together with round-tripping a ProbeRequest
// here before asking for the lease could provide an alternative, simpler
// solution to the below issue:
//
// https://github.com/cockroachdb/cockroach/issues/37906
ba := roachpb.BatchRequest{}
ba.Timestamp = p.repl.store.Clock().Now()
ba.RangeID = p.repl.RangeID
// NB:
// RequestLease always bypasses the circuit breaker (i.e. will prefer to
// get stuck on an unavailable range rather than failing fast; see
// `(*RequestLeaseRequest).flags()`). This enables the caller to chose
// between either behavior for themselves: if they too want to bypass
// the circuit breaker, they simply don't check for the circuit breaker
// while waiting for their lease handle. If they want to fail-fast, they
// do. If the lease instead adopted the caller's preference, we'd have
// to handle the case of multiple preferences joining onto one lease
// request, which is more difficult.
//
// TransferLease will observe the circuit breaker, as transferring a
// lease when the range is unavailable results in, essentially, giving
// up on the lease and thus worsening the situation.
ba.Add(leaseReq)
_, pErr := p.repl.Send(ctx, ba)
return pErr.GoError()
}

// JoinRequest adds one more waiter to the currently pending request.
// It is the caller's responsibility to ensure that there is a pending request,
// and that the request is compatible with whatever the caller is currently
Expand Down Expand Up @@ -1140,6 +1163,25 @@ func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStat
// the request is operating at the current time.
func (r *Replica) redirectOnOrAcquireLeaseForRequest(
ctx context.Context, reqTS hlc.Timestamp, brSig signaller,
) (status kvserverpb.LeaseStatus, pErr *roachpb.Error) {
// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * r.store.cfg.RaftElectionTimeout()
if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout,
func(ctx context.Context) error {
status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig)
return nil
},
); err != nil {
return kvserverpb.LeaseStatus{}, roachpb.NewError(err)
}
return status, pErr
}

// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like
// redirectOnOrAcquireLeaseForRequest, but runs without a timeout.
func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout(
ctx context.Context, reqTS hlc.Timestamp, brSig signaller,
) (kvserverpb.LeaseStatus, *roachpb.Error) {
// Try fast-path.
now := r.store.Clock().NowAsClockTimestamp()
Expand Down

0 comments on commit ab74b97

Please sign in to comment.