Permalink
Browse files

Fix remaning jepsen issues

  • Loading branch information...
janardhan1993 committed Mar 28, 2018
1 parent f66c7df commit 978c7498a46cdbce593245bda65336b0deb789a4
Showing with 22 additions and 5 deletions.
  1. +1 −1 dgraph/cmd/zero/raft.go
  2. +1 −1 posting/list.go
  3. +4 −2 posting/list_test.go
  4. +7 −0 worker/draft.go
  5. +9 −1 worker/mutation.go
View
@@ -570,8 +570,8 @@ func (n *node) Run() {
if rd.SoftState != nil {
if rd.RaftState == raft.StateLeader && !leader {
n.server.updateLeases()
leader = true
}
leader = rd.RaftState == raft.StateLeader
// Oracle stream would close the stream once it steps down as leader
// predicate move would cancel any in progress move on stepping down.
n.triggerLeaderChange()
View
@@ -345,7 +345,7 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *intern.DirectedEdge
hasPendingDelete := (l.markdeleteAll != txn.StartTs) &&
l.markdeleteAll > 0 && t.Op == intern.DirectedEdge_DEL &&
bytes.Equal(t.Value, []byte(x.Star))
doAbort := hasPendingDelete || txn.StartTs < l.commitTs
doAbort := hasPendingDelete
checkConflict := false
View
@@ -271,14 +271,16 @@ func TestAddMutation_DelRead(t *testing.T) {
require.EqualValues(t, 0, ol.Length(3, 0))
// Commit sp* only in oracle, don't apply to pl yet
Oracle().commits[1] = 5
Oracle().commits[3] = 5
defer func() {
delete(Oracle().commits, 1)
delete(Oracle().commits, 3)
}()
// This read should ignore sp*, since readts is 4 and it was committed at 5
require.EqualValues(t, 1, ol.Length(4, 0))
checkValue(t, ol, "newcars", 4)
require.EqualValues(t, 0, ol.Length(6, 0))
}
func TestAddMutation_jchiu2(t *testing.T) {
View
@@ -278,6 +278,13 @@ func (n *node) processMutation(task *task) error {
}
rv := x.RaftValue{Group: n.gid, Index: ridx}
ctx = context.WithValue(ctx, "raft", rv)
// Index updates would be wrong if we don't wait.
// Say we do <0x1> <name> "janardhan", <0x1> <name> "pawan",
// while applying the second mutation we check the old value
// of name and delete it from "janardhan"'s index. If we don't
// wait for commit information then mutation won't see the value
posting.Oracle().WaitForTs(context.Background(), txn.StartTs)
if err := runMutation(ctx, edge, txn); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("process mutation: %v", err)
View
@@ -410,13 +410,21 @@ func Timestamps(ctx context.Context, num *intern.Num) (*api.AssignedIds, error)
func fillTxnContext(tctx *api.TxnContext, gid uint32, startTs uint64) {
node := groups().Node
var index uint64
if txn := posting.Txns().Get(startTs); txn != nil {
txn.Fill(tctx)
index = txn.LastIndex()
}
tctx.LinRead = &api.LinRead{
Ids: make(map[uint32]uint64),
}
tctx.LinRead.Ids[gid] = node.Applied.DoneUntil()
// applied watermark can be less than this proposal's index so return the maximum.
// For some proposals like dropPredicate, we don't store them in txns map, so we
// don't know the raft index. For them we would return applied watermark.
if x := node.Applied.DoneUntil(); x > index {
index = x
}
tctx.LinRead.Ids[gid] = index
}
// proposeOrSend either proposes the mutation if the node serves the group gid or sends it to

0 comments on commit 978c749

Please sign in to comment.