Skip to content

Commit

Permalink
Fix remaning jepsen issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Janardhan Reddy authored Mar 28, 2018
1 parent f66c7df commit 978c749
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 5 deletions.
2 changes: 1 addition & 1 deletion dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,8 @@ func (n *node) Run() {
if rd.SoftState != nil {
if rd.RaftState == raft.StateLeader && !leader {
n.server.updateLeases()
leader = true
}
leader = rd.RaftState == raft.StateLeader
// Oracle stream would close the stream once it steps down as leader
// predicate move would cancel any in progress move on stepping down.
n.triggerLeaderChange()
Expand Down
2 changes: 1 addition & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *intern.DirectedEdge
hasPendingDelete := (l.markdeleteAll != txn.StartTs) &&
l.markdeleteAll > 0 && t.Op == intern.DirectedEdge_DEL &&
bytes.Equal(t.Value, []byte(x.Star))
doAbort := hasPendingDelete || txn.StartTs < l.commitTs
doAbort := hasPendingDelete

checkConflict := false

Expand Down
6 changes: 4 additions & 2 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,16 @@ func TestAddMutation_DelRead(t *testing.T) {
require.EqualValues(t, 0, ol.Length(3, 0))

// Commit sp* only in oracle, don't apply to pl yet
Oracle().commits[1] = 5
Oracle().commits[3] = 5
defer func() {
delete(Oracle().commits, 1)
delete(Oracle().commits, 3)
}()

// This read should ignore sp*, since readts is 4 and it was committed at 5
require.EqualValues(t, 1, ol.Length(4, 0))
checkValue(t, ol, "newcars", 4)

require.EqualValues(t, 0, ol.Length(6, 0))
}

func TestAddMutation_jchiu2(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ func (n *node) processMutation(task *task) error {
}
rv := x.RaftValue{Group: n.gid, Index: ridx}
ctx = context.WithValue(ctx, "raft", rv)

// Index updates would be wrong if we don't wait.
// Say we do <0x1> <name> "janardhan", <0x1> <name> "pawan",
// while applying the second mutation we check the old value
// of name and delete it from "janardhan"'s index. If we don't
// wait for commit information then mutation won't see the value
posting.Oracle().WaitForTs(context.Background(), txn.StartTs)
if err := runMutation(ctx, edge, txn); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("process mutation: %v", err)
Expand Down
10 changes: 9 additions & 1 deletion worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,21 @@ func Timestamps(ctx context.Context, num *intern.Num) (*api.AssignedIds, error)

func fillTxnContext(tctx *api.TxnContext, gid uint32, startTs uint64) {
node := groups().Node
var index uint64
if txn := posting.Txns().Get(startTs); txn != nil {
txn.Fill(tctx)
index = txn.LastIndex()
}
tctx.LinRead = &api.LinRead{
Ids: make(map[uint32]uint64),
}
tctx.LinRead.Ids[gid] = node.Applied.DoneUntil()
// applied watermark can be less than this proposal's index so return the maximum.
// For some proposals like dropPredicate, we don't store them in txns map, so we
// don't know the raft index. For them we would return applied watermark.
if x := node.Applied.DoneUntil(); x > index {
index = x
}
tctx.LinRead.Ids[gid] = index
}

// proposeOrSend either proposes the mutation if the node serves the group gid or sends it to
Expand Down

0 comments on commit 978c749

Please sign in to comment.