Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add timeout for lease acquisitions #81136

Merged
merged 2 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,6 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStress(t, "https://github.com/cockroachdb/cockroach/issues/70113")

stickyRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyRegistry.CloseAllStickyInMemEngines()
ctx := context.Background()
Expand Down Expand Up @@ -764,6 +762,11 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
DefaultZoneConfigOverride: &zcfg,
StickyEngineRegistry: stickyRegistry,
},
Store: &kvserver.StoreTestingKnobs{
// The Raft leadership may not end up on the eu node, but it needs to
// be able to acquire the lease anyway.
AllowLeaseRequestProposalsWhenNotLeader: true,
},
},
StoreSpecs: []base.StoreSpec{
{
Expand Down
266 changes: 154 additions & 112 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -313,10 +314,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
status kvserverpb.LeaseStatus,
leaseReq roachpb.Request,
) error {
// Create a new context *without* a timeout. Instead, we multiplex the
// cancellation of all contexts onto this new one, only canceling it if all
// coalesced requests timeout/cancel. p.cancelLocked (defined below) is the
// cancel function that must be called; calling just cancel is insufficient.
// Create a new context. We multiplex the cancellation of all contexts onto
// this new one, canceling it if all coalesced requests timeout/cancel.
// p.cancelLocked (defined below) is the cancel function that must be called;
// calling just cancel is insufficient.
ctx := p.repl.AnnotateCtx(context.Background())

const opName = "request range lease"
Expand Down Expand Up @@ -348,10 +349,15 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
p.nextLease = roachpb.Lease{}
}

// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * p.repl.store.cfg.RaftElectionTimeout()

const taskName = "pendingLeaseRequest: requesting lease"
err := p.repl.store.Stopper().RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: "pendingLeaseRequest: requesting lease",
TaskName: taskName,
// Trace the lease acquisition as a child even though it might outlive the
// parent in case the parent's ctx is canceled. Other requests might
// later block on this lease acquisition too, and we can't include the
Expand All @@ -362,116 +368,22 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
func(ctx context.Context) {
defer sp.Finish()

// If requesting an epoch-based lease & current state is expired,
// potentially heartbeat our own liveness or increment epoch of
// prior owner. Note we only do this if the previous lease was
// epoch-based.
var pErr *roachpb.Error
if reqLease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED &&
status.Lease.Type() == roachpb.LeaseEpoch {
var err error
// If this replica is previous & next lease holder, manually heartbeat to become live.
if status.OwnedBy(nextLeaseHolder.StoreID) &&
p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
} else if status.Liveness.Epoch == status.Lease.Epoch {
// If not owner, increment epoch if necessary to invalidate lease.
// However, we only do so in the event that the next leaseholder is
// considered live at this time. If not, there's no sense in
// incrementing the expired leaseholder's epoch.
if live, liveErr := p.repl.store.cfg.NodeLiveness.IsLive(nextLeaseHolder.NodeID); !live || liveErr != nil {
if liveErr != nil {
err = errors.Wrapf(liveErr, "not incrementing epoch on n%d because next leaseholder (n%d) not live",
status.Liveness.NodeID, nextLeaseHolder.NodeID)
} else {
err = errors.Errorf("not incrementing epoch on n%d because next leaseholder (n%d) not live (err = nil)",
status.Liveness.NodeID, nextLeaseHolder.NodeID)
}
log.VEventf(ctx, 1, "%v", err)
} else if err = p.repl.store.cfg.NodeLiveness.IncrementEpoch(ctx, status.Liveness); err != nil {
// If we get ErrEpochAlreadyIncremented, someone else beat
// us to it. This proves that the target node is truly
// dead *now*, but it doesn't prove that it was dead at
// status.Timestamp (which we've encoded into our lease
// request). It's possible that the node was temporarily
// considered dead but revived without having its epoch
// incremented, i.e. that it was in fact live at
// status.Timestamp.
//
// It would be incorrect to simply proceed to sending our
// lease request since our lease.Start may precede the
// effective end timestamp of the predecessor lease (the
// expiration of the last successful heartbeat before the
// epoch increment), and so under this lease this node's
// timestamp cache would not necessarily reflect all reads
// served by the prior leaseholder.
//
// It would be correct to bump the timestamp in the lease
// request and proceed, but that just sets up another race
// between this node and the one that already incremented
// the epoch. They're probably going to beat us this time
// too, so just return the NotLeaseHolderError here
// instead of trying to fix up the timestamps and submit
// the lease request.
//
// ErrEpochAlreadyIncremented is not an unusual situation,
// so we don't log it as an error.
//
// https://github.com/cockroachdb/cockroach/issues/35986
if !errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
log.Errorf(ctx, "failed to increment leaseholder's epoch: %s", err)
}
}
}
// Set error for propagation to all waiters below. Don't include the
// previous lease, which we know isn't valid. In particular, if it was
// ours but we failed to reacquire it (e.g. because our heartbeat failed
// due to a stalled disk) then we don't want DistSender to retry us.
if err != nil {
pErr = roachpb.NewError(newNotLeaseHolderError(
roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err)))
}
}
// Run the lease acquisition request with a timeout. We must eventually
// return a NotLeaseHolderError rather than hanging, otherwise we could
// prevent the caller from nudging a different replica into acquiring the
// lease.
err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error {
return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq)
})
// Error will be handled below.

// Send the RequestLeaseRequest or TransferLeaseRequest and wait for the new
// lease to be applied.
if pErr == nil {
// The Replica circuit breakers together with round-tripping a ProbeRequest
// here before asking for the lease could provide an alternative, simpler
// solution to the below issue:
//
// https://github.com/cockroachdb/cockroach/issues/37906
ba := roachpb.BatchRequest{}
ba.Timestamp = p.repl.store.Clock().Now()
ba.RangeID = p.repl.RangeID
// NB:
// RequestLease always bypasses the circuit breaker (i.e. will prefer to
// get stuck on an unavailable range rather than failing fast; see
// `(*RequestLeaseRequest).flags()`). This enables the caller to chose
// between either behavior for themselves: if they too want to bypass
// the circuit breaker, they simply don't check for the circuit breaker
// while waiting for their lease handle. If they want to fail-fast, they
// do. If the lease instead adopted the caller's preference, we'd have
// to handle the case of multiple preferences joining onto one lease
// request, which is more difficult.
//
// TransferLease will observe the circuit breaker, as transferring a
// lease when the range is unavailable results in, essentially, giving
// up on the lease and thus worsening the situation.
ba.Add(leaseReq)
_, pErr = p.repl.Send(ctx, ba)
}
// We reset our state below regardless of whether we've gotten an error or
// not, but note that an error is ambiguous - there's no guarantee that the
// transfer will not still apply. That's OK, however, as the "in transfer"
// state maintained by the pendingLeaseRequest is not relied on for
// correctness (see repl.mu.minLeaseProposedTS), and resetting the state
// is beneficial as it'll allow the replica to attempt to transfer again or
// extend the existing lease in the future.

p.repl.mu.Lock()
defer p.repl.mu.Unlock()
if ctx.Err() != nil {
Expand All @@ -480,15 +392,14 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// active so we don't want to do anything else.
return
}

// Send result of lease to all waiter channels and cleanup request.
for llHandle := range p.llHandles {
// Don't send the same transaction object twice; this can lead to races.
if pErr != nil {
pErrClone := *pErr
if err != nil {
pErr := roachpb.NewError(err)
// TODO(tbg): why?
pErrClone.SetTxn(pErr.GetTxn())
llHandle.resolve(&pErrClone)
pErr.SetTxn(pErr.GetTxn())
llHandle.resolve(pErr)
} else {
llHandle.resolve(nil)
}
Expand All @@ -504,6 +415,118 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
return nil
}

// requestLease sends a synchronous transfer lease or lease request to the
// specified replica. It is only meant to be called from requestLeaseAsync,
// since it does not coordinate with other in-flight lease requests.
func (p *pendingLeaseRequest) requestLease(
ctx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
reqLease roachpb.Lease,
status kvserverpb.LeaseStatus,
leaseReq roachpb.Request,
) error {
// If requesting an epoch-based lease & current state is expired,
// potentially heartbeat our own liveness or increment epoch of
// prior owner. Note we only do this if the previous lease was
// epoch-based.
if reqLease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED &&
status.Lease.Type() == roachpb.LeaseEpoch {
var err error
// If this replica is previous & next lease holder, manually heartbeat to become live.
if status.OwnedBy(nextLeaseHolder.StoreID) &&
p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
} else if status.Liveness.Epoch == status.Lease.Epoch {
// If not owner, increment epoch if necessary to invalidate lease.
// However, we only do so in the event that the next leaseholder is
// considered live at this time. If not, there's no sense in
// incrementing the expired leaseholder's epoch.
if live, liveErr := p.repl.store.cfg.NodeLiveness.IsLive(nextLeaseHolder.NodeID); !live || liveErr != nil {
if liveErr != nil {
err = errors.Wrapf(liveErr, "not incrementing epoch on n%d because next leaseholder (n%d) not live",
status.Liveness.NodeID, nextLeaseHolder.NodeID)
} else {
err = errors.Errorf("not incrementing epoch on n%d because next leaseholder (n%d) not live (err = nil)",
status.Liveness.NodeID, nextLeaseHolder.NodeID)
}
log.VEventf(ctx, 1, "%v", err)
} else if err = p.repl.store.cfg.NodeLiveness.IncrementEpoch(ctx, status.Liveness); err != nil {
// If we get ErrEpochAlreadyIncremented, someone else beat
// us to it. This proves that the target node is truly
// dead *now*, but it doesn't prove that it was dead at
// status.Timestamp (which we've encoded into our lease
// request). It's possible that the node was temporarily
// considered dead but revived without having its epoch
// incremented, i.e. that it was in fact live at
// status.Timestamp.
//
// It would be incorrect to simply proceed to sending our
// lease request since our lease.Start may precede the
// effective end timestamp of the predecessor lease (the
// expiration of the last successful heartbeat before the
// epoch increment), and so under this lease this node's
// timestamp cache would not necessarily reflect all reads
// served by the prior leaseholder.
//
// It would be correct to bump the timestamp in the lease
// request and proceed, but that just sets up another race
// between this node and the one that already incremented
// the epoch. They're probably going to beat us this time
// too, so just return the NotLeaseHolderError here
// instead of trying to fix up the timestamps and submit
// the lease request.
//
// ErrEpochAlreadyIncremented is not an unusual situation,
// so we don't log it as an error.
//
// https://github.com/cockroachdb/cockroach/issues/35986
if !errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
log.Errorf(ctx, "failed to increment leaseholder's epoch: %s", err)
}
}
}
if err != nil {
// Return an NLHE with an empty lease, since we know the previous lease
// isn't valid. In particular, if it was ours but we failed to reacquire
// it (e.g. because our heartbeat failed due to a stalled disk) then we
// don't want DistSender to retry us.
return newNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
}

// Send the RequestLeaseRequest or TransferLeaseRequest and wait for the new
// lease to be applied.
//
// The Replica circuit breakers together with round-tripping a ProbeRequest
// here before asking for the lease could provide an alternative, simpler
// solution to the below issue:
//
// https://github.com/cockroachdb/cockroach/issues/37906
ba := roachpb.BatchRequest{}
ba.Timestamp = p.repl.store.Clock().Now()
ba.RangeID = p.repl.RangeID
// NB:
// RequestLease always bypasses the circuit breaker (i.e. will prefer to
// get stuck on an unavailable range rather than failing fast; see
// `(*RequestLeaseRequest).flags()`). This enables the caller to chose
// between either behavior for themselves: if they too want to bypass
// the circuit breaker, they simply don't check for the circuit breaker
// while waiting for their lease handle. If they want to fail-fast, they
// do. If the lease instead adopted the caller's preference, we'd have
// to handle the case of multiple preferences joining onto one lease
// request, which is more difficult.
//
// TransferLease will observe the circuit breaker, as transferring a
// lease when the range is unavailable results in, essentially, giving
// up on the lease and thus worsening the situation.
ba.Add(leaseReq)
_, pErr := p.repl.Send(ctx, ba)
return pErr.GoError()
}

// JoinRequest adds one more waiter to the currently pending request.
// It is the caller's responsibility to ensure that there is a pending request,
// and that the request is compatible with whatever the caller is currently
Expand Down Expand Up @@ -1140,6 +1163,25 @@ func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStat
// the request is operating at the current time.
func (r *Replica) redirectOnOrAcquireLeaseForRequest(
ctx context.Context, reqTS hlc.Timestamp, brSig signaller,
) (status kvserverpb.LeaseStatus, pErr *roachpb.Error) {
// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * r.store.cfg.RaftElectionTimeout()
if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout,
func(ctx context.Context) error {
status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig)
return nil
},
); err != nil {
return kvserverpb.LeaseStatus{}, roachpb.NewError(err)
}
return status, pErr
}

// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like
// redirectOnOrAcquireLeaseForRequest, but runs without a timeout.
func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout(
ctx context.Context, reqTS hlc.Timestamp, brSig signaller,
) (kvserverpb.LeaseStatus, *roachpb.Error) {
// Try fast-path.
now := r.store.Clock().NowAsClockTimestamp()
Expand Down