Skip to content

Commit 339c47e

Browse files
authored
Avoid race condition between predicate move and commit (#2392)
- The reason for bug #2338 was that there was a race condition between a mutation and predicate move. Zero was not checking if a predicate is under move before allowing a commit. Thus, a mutation could get proposed in a group, then a move starts, and get committed by Zero (after the move starts). - This change this issue by ensuring that Zero checks if a predicate is being moved, before allowing commit. - Any pending transactions are also cancelled once the move starts, so this would only happen as part of a race condition and not afterward. Mechanism: - Send the real keys back to Zero, as part of Transaction Context. - Zero uses these keys to parse the predicate, and checks if that predicate is currently moving. If so, it would abort the transaction. - Also, check for `_predicate_` being moved. For some reason, if we don't consider this predicate, we could still lose data. - Before doing a mutation in Dgraph alpha, check if that tablet can be written to. - Loop until all transactions corresponding to the predicate move are aborted. Only then start the move. Tangential changes: - Update the port number for bank integration test. - Remove the separate key value or clean channel. Make it run as part of the main Node.Run loop. - Add a max function. - Small refactoring here and there.
1 parent 7796a40 commit 339c47e

File tree

11 files changed

+112
-88
lines changed

11 files changed

+112
-88
lines changed

contrib/integration/bank/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (s *State) loop(wg *sync.WaitGroup) {
175175
func main() {
176176
flag.Parse()
177177

178-
conn, err := grpc.Dial("localhost:9081", grpc.WithInsecure())
178+
conn, err := grpc.Dial("localhost:9080", grpc.WithInsecure())
179179
if err != nil {
180180
log.Fatal(err)
181181
}

dgraph/cmd/zero/oracle.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package zero
99

1010
import (
11+
"encoding/base64"
1112
"errors"
1213
"math/rand"
1314
"time"
@@ -284,6 +285,38 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
284285
return s.proposeTxn(ctx, src)
285286
}
286287

288+
// Check if any of these tablets is being moved. If so, abort the transaction.
289+
preds := make(map[string]struct{})
290+
// _predicate_ would never be part of conflict detection, so keys corresponding to any
291+
// modifications to this predicate would not be sent to Zero. But, we still need to abort
292+
// transactions which are coming in, while this predicate is being moved. This means that if
293+
// _predicate_ expansion is enabled, and a move for this predicate is happening, NO transactions
294+
// across the entire cluster would commit. Sorry! But if we don't do this, we might lose commits
295+
// which sneaked in during the move.
296+
preds["_predicate_"] = struct{}{}
297+
298+
for _, k := range src.Keys {
299+
key, err := base64.StdEncoding.DecodeString(k)
300+
if err != nil {
301+
continue
302+
}
303+
pk := x.Parse(key)
304+
if pk != nil {
305+
preds[pk.Attr] = struct{}{}
306+
}
307+
}
308+
for pred := range preds {
309+
tablet := s.ServingTablet(pred)
310+
if tablet == nil || tablet.GetReadOnly() {
311+
src.Aborted = true
312+
return s.proposeTxn(ctx, src)
313+
}
314+
}
315+
316+
// TODO: We could take fingerprint of the keys, and store them in uint64, allowing the rowCommit
317+
// map to be keyed by uint64, which would be cheaper. But, unsure about the repurcussions of
318+
// that. It would save some memory. So, worth a try.
319+
287320
var num intern.Num
288321
num.Val = 1
289322
assigned, err := s.lease(ctx, &num, true)

dgraph/cmd/zero/tablet.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func (s *Server) rebalanceTablets() {
6464
}
6565

6666
func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) error {
67+
// Typically move predicate is run only on leader. But, in this case, an external HTTP request
68+
// can also trigger a predicate move. We could render them invalid here by checking if this node
69+
// is actually the leader. But, I have noticed no side effects with allowing them to run, even
70+
// if this node is a follower node.
6771
tab := s.ServingTablet(predicate)
6872
x.AssertTruef(tab != nil, "Tablet to be moved: [%v] should not be nil", predicate)
6973
x.Printf("Going to move predicate: [%v], size: [%v] from group %d to %d\n", predicate,
@@ -81,6 +85,7 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
8185
break
8286
}
8387

88+
x.Printf("Sleeping before we run recovery for tablet move")
8489
// We might have initiated predicate move on some other node, give it some
8590
// time to get cancelled. On cancellation the other node would set the predicate
8691
// to write mode again and we need to be sure that it doesn't happen after we

posting/index.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,13 +435,11 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error {
435435
if !remove(item.Key()) {
436436
continue
437437
}
438-
nkey := make([]byte, len(item.Key()))
439-
copy(nkey, item.Key())
438+
nkey := item.KeyCopy(nil)
440439
version := item.Version()
441440

442441
txn := pstore.NewTransactionAt(version, true)
443-
// Purge doesn't delete anything, so write an empty pl
444-
txn.SetWithDiscard(nkey, nil, BitEmptyPosting)
442+
txn.Delete(nkey)
445443
wg.Add(1)
446444
err := txn.CommitAt(version, func(e error) {
447445
defer wg.Done()

posting/mvcc.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ package posting
1010
import (
1111
"bytes"
1212
"context"
13+
"encoding/base64"
1314
"math"
1415
"sort"
15-
"strconv"
1616
"sync"
1717
"sync/atomic"
1818
"time"
@@ -21,7 +21,6 @@ import (
2121
"github.com/dgraph-io/dgo/protos/api"
2222
"github.com/dgraph-io/dgraph/protos/intern"
2323
"github.com/dgraph-io/dgraph/x"
24-
farm "github.com/dgryski/go-farm"
2524
)
2625

2726
var (
@@ -216,8 +215,10 @@ func (t *Txn) Fill(ctx *api.TxnContext) {
216215
for i := t.nextKeyIdx; i < len(t.deltas); i++ {
217216
d := t.deltas[i]
218217
if d.checkConflict {
219-
fp := farm.Fingerprint64(d.key)
220-
ctx.Keys = append(ctx.Keys, strconv.FormatUint(fp, 36))
218+
// Instead of taking a fingerprint of the keys, send the whole key to Zero. So, Zero can
219+
// parse the key and check if that predicate is undergoing a move, hence avoiding #2338.
220+
k := base64.StdEncoding.EncodeToString(d.key)
221+
ctx.Keys = append(ctx.Keys, k)
221222
}
222223
}
223224
t.nextKeyIdx = len(t.deltas)

worker/draft.go

Lines changed: 12 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -372,34 +372,7 @@ func (n *node) applyConfChange(e raftpb.Entry) {
372372
groups().triggerMembershipSync()
373373
}
374374

375-
type KeyValueOrCleanProposal struct {
376-
raftIdx uint64
377-
proposal *intern.Proposal
378-
}
379-
380-
func (n *node) processKeyValueOrCleanProposals(
381-
kvChan chan KeyValueOrCleanProposal) {
382-
// Run KeyValueProposals and CleanPredicate one by one always.
383-
// During predicate move we first clean the predicate and then
384-
// propose key values, we wait for clean predicate to be done before
385-
// we propose key values. But during replay if we run these proposals
386-
// in goroutine then we will have no such guarantees so always run
387-
// them sequentially.
388-
for e := range kvChan {
389-
if len(e.proposal.Kv) > 0 {
390-
n.processKeyValues(e.raftIdx, e.proposal.Key, e.proposal.Kv)
391-
} else if len(e.proposal.CleanPredicate) > 0 {
392-
n.deletePredicate(e.raftIdx, e.proposal.Key, e.proposal.CleanPredicate)
393-
} else {
394-
x.Fatalf("Unknown proposal, %+v\n", e.proposal)
395-
}
396-
}
397-
}
398-
399375
func (n *node) processApplyCh() {
400-
kvChan := make(chan KeyValueOrCleanProposal, 1000)
401-
go n.processKeyValueOrCleanProposals(kvChan)
402-
403376
for e := range n.applyCh {
404377
if len(e.Data) == 0 {
405378
// This is not in the proposal map
@@ -443,30 +416,27 @@ func (n *node) processApplyCh() {
443416
if proposal.Mutations != nil {
444417
// syncmarks for this shouldn't be marked done until it's comitted.
445418
n.sch.schedule(proposal, e.Index)
419+
446420
} else if len(proposal.Kv) > 0 {
447-
kvChan <- KeyValueOrCleanProposal{
448-
raftIdx: e.Index,
449-
proposal: proposal,
450-
}
421+
n.processKeyValues(e.Index, proposal.Key, proposal.Kv)
422+
451423
} else if proposal.State != nil {
452424
// This state needn't be snapshotted in this group, on restart we would fetch
453425
// a state which is latest or equal to this.
454426
groups().applyState(proposal.State)
455427
// When proposal is done it emits done watermarks.
456428
posting.TxnMarks().Done(e.Index)
457429
n.props.Done(proposal.Key, nil)
430+
458431
} else if len(proposal.CleanPredicate) > 0 {
459-
kvChan <- KeyValueOrCleanProposal{
460-
raftIdx: e.Index,
461-
proposal: proposal,
462-
}
432+
n.deletePredicate(e.Index, proposal.Key, proposal.CleanPredicate)
433+
463434
} else if proposal.TxnContext != nil {
464435
go n.commitOrAbort(e.Index, proposal.Key, proposal.TxnContext)
465436
} else {
466437
x.Fatalf("Unknown proposal")
467438
}
468439
}
469-
close(kvChan)
470440
}
471441

472442
func (n *node) commitOrAbort(index uint64, pid string, tctx *api.TxnContext) {
@@ -492,11 +462,11 @@ func (n *node) deletePredicate(index uint64, pid string, predicate string) {
492462
n.props.Done(pid, err)
493463
}
494464

495-
func (n *node) processKeyValues(index uint64, pid string, kvs []*intern.KV) error {
496-
ctx, _ := n.props.CtxAndTxn(pid)
465+
func (n *node) processKeyValues(index uint64, pkey string, kvs []*intern.KV) error {
466+
ctx, _ := n.props.CtxAndTxn(pkey)
497467
err := populateKeyValues(ctx, kvs)
498468
posting.TxnMarks().Done(index)
499-
n.props.Done(pid, err)
469+
n.props.Done(pkey, err)
500470
return nil
501471
}
502472

@@ -865,7 +835,7 @@ func (n *node) joinPeers() error {
865835

866836
gconn := pl.Get()
867837
c := intern.NewRaftClient(gconn)
868-
x.Printf("Calling JoinCluster")
838+
x.Printf("Calling JoinCluster via leader: %s", pl.Addr)
869839
if _, err := c.JoinCluster(n.ctx, n.RaftContext); err != nil {
870840
return x.Errorf("Error while joining cluster: %+v\n", err)
871841
}
@@ -935,11 +905,11 @@ func (n *node) InitAndStartNode(wal *raftwal.Wal) {
935905
n.SetRaft(raft.RestartNode(n.Cfg))
936906
} else {
937907
x.Printf("New Node for group: %d\n", n.gid)
938-
if _, hasPeer := groups().MyPeer(); hasPeer {
908+
if peerId, hasPeer := groups().MyPeer(); hasPeer {
939909
// Get snapshot before joining peers as it can take time to retrieve it and we dont
940910
// want the quorum to be inactive when it happens.
941911

942-
x.Println("Retrieving snapshot.")
912+
x.Println("Retrieving snapshot from peer: %d", peerId)
943913
n.retryUntilSuccess(n.retrieveSnapshot, time.Second)
944914

945915
x.Println("Trying to join peers.")

worker/groups.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,14 @@ func (g *groupi) BelongsTo(key string) uint32 {
306306
return 0
307307
}
308308

309+
func (g *groupi) ServesTabletRW(key string) bool {
310+
tablet := g.Tablet(key)
311+
if tablet != nil && !tablet.ReadOnly && tablet.GroupId == groups().groupId() {
312+
return true
313+
}
314+
return false
315+
}
316+
309317
func (g *groupi) ServesTablet(key string) bool {
310318
tablet := g.Tablet(key)
311319
if tablet != nil && tablet.GroupId == groups().groupId() {

worker/mutation.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,12 @@ func deletePredicateEdge(edge *intern.DirectedEdge) bool {
3838
return edge.Entity == 0 && bytes.Equal(edge.Value, []byte(x.Star))
3939
}
4040

41-
// runMutation goes through all the edges and applies them. It returns the
42-
// mutations which were not applied in left.
41+
// runMutation goes through all the edges and applies them.
4342
func runMutation(ctx context.Context, edge *intern.DirectedEdge, txn *posting.Txn) error {
4443
if tr, ok := trace.FromContext(ctx); ok {
4544
tr.LazyPrintf("In run mutations")
4645
}
47-
if !groups().ServesTablet(edge.Attr) {
46+
if !groups().ServesTabletRW(edge.Attr) {
4847
// Don't assert, can happen during replay of raft logs if server crashes immediately
4948
// after predicate move and before snapshot.
5049
return errUnservedTablet
@@ -411,10 +410,8 @@ func fillTxnContext(tctx *api.TxnContext, gid uint32, startTs uint64) {
411410
// applied watermark can be less than this proposal's index so return the maximum.
412411
// For some proposals like dropPredicate, we don't store them in txns map, so we
413412
// don't know the raft index. For them we would return applied watermark.
414-
if x := node.Applied.DoneUntil(); x > index {
415-
index = x
416-
}
417-
tctx.LinRead.Ids[gid] = index
413+
doneUntil := node.Applied.DoneUntil()
414+
tctx.LinRead.Ids[gid] = x.Max(index, doneUntil)
418415
}
419416

420417
// proposeOrSend either proposes the mutation if the node serves the group gid or sends it to

0 commit comments

Comments
 (0)