Skip to content

Commit

Permalink
[DNM][WIP] client/leasemanager: additions for leases/advisory locks
Browse files Browse the repository at this point in the history
Plumbing to update the ts cache when removing a "leasing intent".

Release note: None
  • Loading branch information
sumeerbhola authored and yuzefovich committed Feb 19, 2020
1 parent cff7c11 commit e8d6ae1
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 11 deletions.
10 changes: 6 additions & 4 deletions pkg/internal/client/leasemanager/leasemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package leasemanager

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (lm *LeaseManager) AcquireShared(
if err = txn.ForceHeartbeat(); err != nil {
return nil, err
}
return &leaseImpl{txn: txn}, nil
return &leaseImpl{txn: txn, maxOffset: lm.db.Clock().MaxOffset()}, nil
}

func (lm *LeaseManager) AcquireExclusive(
Expand All @@ -92,7 +93,7 @@ func (lm *LeaseManager) AcquireExclusive(
if err := txn.ForceHeartbeat(); err != nil {
return nil, err
}
return &leaseImpl{txn: txn}, nil
return &leaseImpl{txn: txn, maxOffset: lm.db.Clock().MaxOffset()}, nil
}

// TODO(ajwerner): Optimize allocations here by allocating all of the keys from
Expand Down Expand Up @@ -133,9 +134,10 @@ func makeSharedKey(prefix, key []byte, id uuid.UUID) roachpb.Key {
}

type leaseImpl struct {
txn *client.Txn
txn *client.Txn
maxOffset time.Duration
}

func (l *leaseImpl) GetExpiration() hlc.Timestamp {
return l.txn.ExpiryTimestamp()
return l.txn.ExpiryTimestamp().Add(-l.maxOffset.Nanoseconds(), 0)
}
2 changes: 1 addition & 1 deletion pkg/internal/client/leasemanager/leasemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestLeases(t *testing.T) {
}
txn := tc.Server(0).DB().NewTxn(ctx, "")

if err := acquire(ctx, txn, lockKeys[s.lock]); err != nil {
if _, err := acquire(ctx, txn, lockKeys[s.lock]); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,7 @@ func TestSequenceNumbers(t *testing.T) {
for i := 0; i < 5; i++ {
var ba roachpb.BatchRequest
for j := 0; j < i; j++ {
ba.Add(roachpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("foo")).(*roachpb.PutRequest))
ba.Add(roachpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("foo")))
}
if _, pErr := txn.Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ func declareKeysPushTransaction(
// activity and extend the waiting period until a transaction is
// considered expired. This waiting period is a "courtesy" - if we
// simply aborted txns right away then we would see worse performance
// under contention, but everything would still be correct.
// under contention, but everything would still be correct. Note that
// for transactions containing "lease intents" this waiting is not a
// courtesy -- a successful heartbeat is a guarantee that the
// transaction will not be aborted before the expiry. The expiry
// decision may be made at a node that is ahead of the node where
// the intent is removed. So we subtract the max offset from
// the promise made to the leaseholder and do the same when bumping
// the ts cache (the latter is optional for correctness).

//
// Txn record not expired: If the pushee txn is not expired, its
// priority is compared against the pusher's (see CanPushWithPriority).
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ func ResolveIntent(
}

intent := args.AsIntent()
var res result.Result
leasingIntentFunc := func(intent roachpb.Intent) {
// TODO(sbhola): update ts cache
res.Local.AbortedLeasingIntents = append(res.Local.AbortedLeasingIntents, intent)
}
ok, err := engine.MVCCResolveWriteIntent(ctx, readWriter, ms, intent, leasingIntentFunc)
if err != nil {
return result.Result{}, err
}

var res result.Result
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

if WriteAbortSpanOnResolve(args.Status, args.Poison, ok) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func ResolveIntentRange(
iterAndBuf := engine.GetIterAndBuf(readWriter, engine.IterOptions{UpperBound: args.EndKey})
defer iterAndBuf.Cleanup()

var res result.Result
leasingIntentFunc := func(intent roachpb.Intent) {
// TODO(sbhola): update ts cache
res.Local.AbortedLeasingIntents = append(res.Local.AbortedLeasingIntents, intent)
}
numKeys, resumeSpan, err := engine.MVCCResolveWriteIntentRangeUsingIter(ctx, readWriter, iterAndBuf, ms, intent, cArgs.MaxKeys, leasingIntentFunc)
if err != nil {
Expand All @@ -61,7 +62,6 @@ func ResolveIntentRange(
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
}

var res result.Result
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

if WriteAbortSpanOnResolve(args.Status, args.Poison, numKeys > 0) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type LocalResult struct {
// live.
EndTxns []EndTxnIntents

AbortedLeasingIntents []roachpb.Intent

// When set (in which case we better be the first range), call
// GossipFirstRange if the Replica holds the lease.
GossipFirstRange bool
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -605,6 +606,10 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re
if lResult.EndTxns != nil {
log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns)
}
for _, intent := range lResult.AbortedLeasingIntents {
expiryTs := intent.HeartbeatTimestamp.Add(txnwait.TxnLivenessThreshold.Nanoseconds()-r.Clock().MaxOffset().Nanoseconds(), 0)
r.store.tsCache.Add(intent.Key, nil, expiryTs, intent.Txn.ID)
}
if lResult.MaybeWatchForMerge {
log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false")
}
Expand Down

0 comments on commit e8d6ae1

Please sign in to comment.