Skip to content

Commit

Permalink
fixing implementation for ludicrous mode
Browse files Browse the repository at this point in the history
  • Loading branch information
aman-bansal committed Feb 4, 2021
1 parent e110b35 commit ffb28bd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 22 deletions.
42 changes: 20 additions & 22 deletions worker/change_data_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ import (
"sync/atomic"
"time"

"github.com/dgraph-io/ristretto/z"

"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/types"

"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"

"github.com/dgraph-io/dgraph/protos/pb"
"go.etcd.io/etcd/raft/raftpb"

"github.com/dgraph-io/dgraph/x"
)

const defaultCDCConfig = "enabled=false; max_recovery=10000"
Expand Down Expand Up @@ -183,41 +179,38 @@ func (cd *ChangeData) processCDCEvents() {
continue
}
// In ludicrous events send the events as soon as you get it.
// We wont wait for oracle delta in case of ludicrous mode
// We wont wait for oracle delta in case of ludicrous mode.
// Since all mutations will eventually succeed.
// We can set the read ts here only.
if x.WorkerConfig.LudicrousMode {
if err := sendEvents(nil, events); err != nil {
return
}
cd.cdcIndex = entry.Index
cd.updateMinReadTs(proposal.Mutations.StartTs)
continue
}
if cd.pendingEvents[proposal.Mutations.StartTs] == nil {
cd.pendingEvents[proposal.Mutations.StartTs] = make([]CDCEvent, 0)
}
if proposal.Mutations.StartTs == 10005 {
glog.Infof("pending event is %v", proposal.Mutations)
}
cd.pendingEvents[proposal.Mutations.StartTs] =
append(cd.pendingEvents[proposal.Mutations.StartTs], events...)
}

if proposal.Delta != nil {
for _, ts := range proposal.Delta.Txns {
cd.maxReadTs = x.Max(cd.maxReadTs, ts.StartTs)
if !x.WorkerConfig.LudicrousMode {
pending := cd.pendingEvents[ts.StartTs]
if ts.CommitTs > 0 && len(pending) > 0 {
if err := sendEvents(ts, pending); err != nil {
return
}
pending := cd.pendingEvents[ts.StartTs]
if ts.CommitTs > 0 && len(pending) > 0 {
if err := sendEvents(ts, pending); err != nil {
return
}
// delete from pending events once events are sent
delete(cd.pendingEvents, ts.StartTs)
_ = cd.evaluateAndSetMinReadTs()
}
// delete from pending events once events are sent
delete(cd.pendingEvents, ts.StartTs)
_ = cd.evaluateAndSetMinReadTs()
}
}

cd.cdcIndex = entry.Index
}
}
Expand All @@ -240,7 +233,6 @@ func (cd *ChangeData) processCDCEvents() {
iter = 0
minTs := cd.evaluateAndSetMinReadTs()
glog.V(2).Infof("proposing CDC minReadTs %d", minTs)
glog.V(2).Infof("lenght of pending events %d", len(cd.pendingEvents))
if err := groups().Node.proposeCDCMinReadTs(minTs); err != nil {
glog.Errorf("not able to propose cdc minReadTs %v", err)
}
Expand All @@ -256,6 +248,12 @@ func (cd *ChangeData) processCDCEvents() {
// thus making followers to clear raft logs between next proposal.
// In this way we can loose some events.
func (cd *ChangeData) evaluateAndSetMinReadTs() uint64 {
if cd == nil {
return math.MaxUint64
}
if x.WorkerConfig.LudicrousMode {
return cd.getCDCMinReadTs()
}
min := cd.maxReadTs
for ts := range cd.pendingEvents {
if ts < min {
Expand Down
5 changes: 5 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,11 @@ func (n *node) retrieveSnapshot(snap pb.Snapshot) error {
}

func (n *node) proposeCDCMinReadTs(minTs uint64) error {
// in case of ludicrous mode it could be zero. no need to send it
if minTs == 0 {
return nil
}

proposal := &pb.Proposal{
CDCMinReadTs: minTs,
}
Expand Down

0 comments on commit ffb28bd

Please sign in to comment.