Skip to content
Browse files
Avoid race condition between predicate move and commit (#2392)
- The reason for bug #2338 was that there was a race condition between a mutation and predicate move. Zero was not checking if a predicate is under move before allowing a commit. Thus, a mutation could get proposed in a group, then a move starts, and get committed by Zero (after the move starts).
- This change this issue by ensuring that Zero checks if a predicate is being moved, before allowing commit.
- Any pending transactions are also cancelled once the move starts, so this would only happen as part of a race condition and not afterward.

- Send the real keys back to Zero, as part of Transaction Context.
- Zero uses these keys to parse the predicate, and checks if that predicate is currently moving. If so, it would abort the transaction.
- Also, check for `_predicate_` being moved. For some reason, if we don't consider this predicate, we could still lose data.
- Before doing a mutation in Dgraph alpha, check if that tablet can be written to.
- Loop until all transactions corresponding to the predicate move are aborted. Only then start the move.

Tangential changes:
- Update the port number for bank integration test.
- Remove the separate key value or clean channel. Make it run as part of the main Node.Run loop.
- Add a max function.
- Small refactoring here and there.
  • Loading branch information
manishrjain committed May 16, 2018
1 parent 7796a40 commit 339c47e0e6bf99da52821ae9d1b1da97e70161fe
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 88 deletions.
@@ -175,7 +175,7 @@ func (s *State) loop(wg *sync.WaitGroup) {
func main() {

conn, err := grpc.Dial("localhost:9081", grpc.WithInsecure())
conn, err := grpc.Dial("localhost:9080", grpc.WithInsecure())
if err != nil {
@@ -8,6 +8,7 @@
package zero

import (
@@ -284,6 +285,38 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
return s.proposeTxn(ctx, src)

// Check if any of these tablets is being moved. If so, abort the transaction.
preds := make(map[string]struct{})
// _predicate_ would never be part of conflict detection, so keys corresponding to any
// modifications to this predicate would not be sent to Zero. But, we still need to abort
// transactions which are coming in, while this predicate is being moved. This means that if
// _predicate_ expansion is enabled, and a move for this predicate is happening, NO transactions
// across the entire cluster would commit. Sorry! But if we don't do this, we might lose commits
// which sneaked in during the move.
preds["_predicate_"] = struct{}{}

for _, k := range src.Keys {
key, err := base64.StdEncoding.DecodeString(k)
if err != nil {
pk := x.Parse(key)
if pk != nil {
preds[pk.Attr] = struct{}{}
for pred := range preds {
tablet := s.ServingTablet(pred)
if tablet == nil || tablet.GetReadOnly() {
src.Aborted = true
return s.proposeTxn(ctx, src)

// TODO: We could take fingerprint of the keys, and store them in uint64, allowing the rowCommit
// map to be keyed by uint64, which would be cheaper. But, unsure about the repurcussions of
// that. It would save some memory. So, worth a try.

var num intern.Num
num.Val = 1
assigned, err :=, &num, true)
@@ -64,6 +64,10 @@ func (s *Server) rebalanceTablets() {

func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) error {
// Typically move predicate is run only on leader. But, in this case, an external HTTP request
// can also trigger a predicate move. We could render them invalid here by checking if this node
// is actually the leader. But, I have noticed no side effects with allowing them to run, even
// if this node is a follower node.
tab := s.ServingTablet(predicate)
x.AssertTruef(tab != nil, "Tablet to be moved: [%v] should not be nil", predicate)
x.Printf("Going to move predicate: [%v], size: [%v] from group %d to %d\n", predicate,
@@ -81,6 +85,7 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro

x.Printf("Sleeping before we run recovery for tablet move")
// We might have initiated predicate move on some other node, give it some
// time to get cancelled. On cancellation the other node would set the predicate
// to write mode again and we need to be sure that it doesn't happen after we
@@ -435,13 +435,11 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error {
if !remove(item.Key()) {
nkey := make([]byte, len(item.Key()))
copy(nkey, item.Key())
nkey := item.KeyCopy(nil)
version := item.Version()

txn := pstore.NewTransactionAt(version, true)
// Purge doesn't delete anything, so write an empty pl
txn.SetWithDiscard(nkey, nil, BitEmptyPosting)
err := txn.CommitAt(version, func(e error) {
defer wg.Done()
@@ -10,9 +10,9 @@ package posting
import (
@@ -21,7 +21,6 @@ import (
farm ""

var (
@@ -216,8 +215,10 @@ func (t *Txn) Fill(ctx *api.TxnContext) {
for i := t.nextKeyIdx; i < len(t.deltas); i++ {
d := t.deltas[i]
if d.checkConflict {
fp := farm.Fingerprint64(d.key)
ctx.Keys = append(ctx.Keys, strconv.FormatUint(fp, 36))
// Instead of taking a fingerprint of the keys, send the whole key to Zero. So, Zero can
// parse the key and check if that predicate is undergoing a move, hence avoiding #2338.
k := base64.StdEncoding.EncodeToString(d.key)
ctx.Keys = append(ctx.Keys, k)
t.nextKeyIdx = len(t.deltas)
@@ -372,34 +372,7 @@ func (n *node) applyConfChange(e raftpb.Entry) {

type KeyValueOrCleanProposal struct {
raftIdx uint64
proposal *intern.Proposal

func (n *node) processKeyValueOrCleanProposals(
kvChan chan KeyValueOrCleanProposal) {
// Run KeyValueProposals and CleanPredicate one by one always.
// During predicate move we first clean the predicate and then
// propose key values, we wait for clean predicate to be done before
// we propose key values. But during replay if we run these proposals
// in goroutine then we will have no such guarantees so always run
// them sequentially.
for e := range kvChan {
if len(e.proposal.Kv) > 0 {
n.processKeyValues(e.raftIdx, e.proposal.Key, e.proposal.Kv)
} else if len(e.proposal.CleanPredicate) > 0 {
n.deletePredicate(e.raftIdx, e.proposal.Key, e.proposal.CleanPredicate)
} else {
x.Fatalf("Unknown proposal, %+v\n", e.proposal)

func (n *node) processApplyCh() {
kvChan := make(chan KeyValueOrCleanProposal, 1000)
go n.processKeyValueOrCleanProposals(kvChan)

for e := range n.applyCh {
if len(e.Data) == 0 {
// This is not in the proposal map
@@ -443,30 +416,27 @@ func (n *node) processApplyCh() {
if proposal.Mutations != nil {
// syncmarks for this shouldn't be marked done until it's comitted.
n.sch.schedule(proposal, e.Index)

} else if len(proposal.Kv) > 0 {
kvChan <- KeyValueOrCleanProposal{
raftIdx: e.Index,
proposal: proposal,
n.processKeyValues(e.Index, proposal.Key, proposal.Kv)

} else if proposal.State != nil {
// This state needn't be snapshotted in this group, on restart we would fetch
// a state which is latest or equal to this.
// When proposal is done it emits done watermarks.
n.props.Done(proposal.Key, nil)

} else if len(proposal.CleanPredicate) > 0 {
kvChan <- KeyValueOrCleanProposal{
raftIdx: e.Index,
proposal: proposal,
n.deletePredicate(e.Index, proposal.Key, proposal.CleanPredicate)

} else if proposal.TxnContext != nil {
go n.commitOrAbort(e.Index, proposal.Key, proposal.TxnContext)
} else {
x.Fatalf("Unknown proposal")

func (n *node) commitOrAbort(index uint64, pid string, tctx *api.TxnContext) {
@@ -492,11 +462,11 @@ func (n *node) deletePredicate(index uint64, pid string, predicate string) {
n.props.Done(pid, err)

func (n *node) processKeyValues(index uint64, pid string, kvs []*intern.KV) error {
ctx, _ := n.props.CtxAndTxn(pid)
func (n *node) processKeyValues(index uint64, pkey string, kvs []*intern.KV) error {
ctx, _ := n.props.CtxAndTxn(pkey)
err := populateKeyValues(ctx, kvs)
n.props.Done(pid, err)
n.props.Done(pkey, err)
return nil

@@ -865,7 +835,7 @@ func (n *node) joinPeers() error {

gconn := pl.Get()
c := intern.NewRaftClient(gconn)
x.Printf("Calling JoinCluster")
x.Printf("Calling JoinCluster via leader: %s", pl.Addr)
if _, err := c.JoinCluster(n.ctx, n.RaftContext); err != nil {
return x.Errorf("Error while joining cluster: %+v\n", err)
@@ -935,11 +905,11 @@ func (n *node) InitAndStartNode(wal *raftwal.Wal) {
} else {
x.Printf("New Node for group: %d\n", n.gid)
if _, hasPeer := groups().MyPeer(); hasPeer {
if peerId, hasPeer := groups().MyPeer(); hasPeer {
// Get snapshot before joining peers as it can take time to retrieve it and we dont
// want the quorum to be inactive when it happens.

x.Println("Retrieving snapshot.")
x.Println("Retrieving snapshot from peer: %d", peerId)
n.retryUntilSuccess(n.retrieveSnapshot, time.Second)

x.Println("Trying to join peers.")
@@ -306,6 +306,14 @@ func (g *groupi) BelongsTo(key string) uint32 {
return 0

func (g *groupi) ServesTabletRW(key string) bool {
tablet := g.Tablet(key)
if tablet != nil && !tablet.ReadOnly && tablet.GroupId == groups().groupId() {
return true
return false

func (g *groupi) ServesTablet(key string) bool {
tablet := g.Tablet(key)
if tablet != nil && tablet.GroupId == groups().groupId() {
@@ -38,13 +38,12 @@ func deletePredicateEdge(edge *intern.DirectedEdge) bool {
return edge.Entity == 0 && bytes.Equal(edge.Value, []byte(x.Star))

// runMutation goes through all the edges and applies them. It returns the
// mutations which were not applied in left.
// runMutation goes through all the edges and applies them.
func runMutation(ctx context.Context, edge *intern.DirectedEdge, txn *posting.Txn) error {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("In run mutations")
if !groups().ServesTablet(edge.Attr) {
if !groups().ServesTabletRW(edge.Attr) {
// Don't assert, can happen during replay of raft logs if server crashes immediately
// after predicate move and before snapshot.
return errUnservedTablet
@@ -411,10 +410,8 @@ func fillTxnContext(tctx *api.TxnContext, gid uint32, startTs uint64) {
// applied watermark can be less than this proposal's index so return the maximum.
// For some proposals like dropPredicate, we don't store them in txns map, so we
// don't know the raft index. For them we would return applied watermark.
if x := node.Applied.DoneUntil(); x > index {
index = x
tctx.LinRead.Ids[gid] = index
doneUntil := node.Applied.DoneUntil()
tctx.LinRead.Ids[gid] = x.Max(index, doneUntil)

// proposeOrSend either proposes the mutation if the node serves the group gid or sends it to

0 comments on commit 339c47e

Please sign in to comment.