Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add linerizability #2287

Merged
merged 1 commit into from Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -165,15 +165,6 @@ func (n *Node) SetPeer(pid uint64, addr string) {
n.peers[pid] = addr
}

func (n *Node) WaitForMinProposal(ctx context.Context, read *api.LinRead) error {
if read == nil || read.Ids == nil {
return nil
}
gid := n.RaftContext.Group
min := read.Ids[gid]
return n.Applied.WaitForMark(ctx, min)
}

func (n *Node) Send(m raftpb.Message) {
x.AssertTruef(n.Id != m.To, "Sending message to itself")
data, err := m.Marshal()
@@ -335,7 +335,23 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *intern.DirectedEdge
hasPendingDelete := (l.markdeleteAll != txn.StartTs) &&
l.markdeleteAll > 0 && t.Op == intern.DirectedEdge_DEL &&
bytes.Equal(t.Value, []byte(x.Star))
doAbort := hasPendingDelete
doAbort := false
if hasPendingDelete {
// commitOrAbort proposals are applied in goroutines and there is no
// fixed ordering, so do this check to ensure we don't reject a mutation
// which was applied on leader.
// Example: We do sp*, commit and then one more sp*. Even If the commit proposal
// was applied on leader before second sp*, that guarantee is not true on
// follower, since scheduler doesn't care about commitOrAbort proposals and second
// sp* can be applied in memory before the commitProposal.
if commitTs := Oracle().CommitTs(l.markdeleteAll); commitTs > 0 {
l.commitMutation(ctx, l.markdeleteAll, commitTs)
} else if Oracle().Aborted(l.markdeleteAll) {
l.abortTransaction(ctx, l.markdeleteAll)
} else {
doAbort = true
}
}

checkConflict := false

@@ -58,6 +58,13 @@ func (o *oracle) CommitTs(startTs uint64) uint64 {
return o.commits[startTs]
}

func (o *oracle) Aborted(startTs uint64) bool {
o.RLock()
defer o.RUnlock()
_, ok := o.aborts[startTs]
return ok
}

func (o *oracle) addToWaiters(startTs uint64) (chan struct{}, bool) {
o.Lock()
defer o.Unlock()

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.