Skip to content

Commit

Permalink
address review lni#1
Browse files Browse the repository at this point in the history
  • Loading branch information
abbccdda committed Nov 25, 2019
1 parent eaf0d0f commit 54bbe16
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Expand Up @@ -245,14 +245,14 @@ type NodeHostConfig struct {
WALDir string
// NodeHostDir is where everything else is stored.
NodeHostDir string
// RTTMillisecond defines the average Rround Trip Time (RTT) in milliseconds
// RTTMillisecond defines the average Round Trip Time (RTT) in milliseconds
// between two NodeHost instances. Such a RTT interval is internally used as
// a logical clock tick, Raft heartbeat and election intervals are both
// defined in term of how many such RTT intervals.
// Note that RTTMillisecond is the combined delays between two NodeHost
// instances including all delays caused by network transmission, delays
// caused by NodeHost queuing and processing. As an example, when fully
// loaded, the average Rround Trip Time between two of our NodeHost instances
// loaded, the average Round Trip Time between two of our NodeHost instances
// used for benchmarking purposes is up to 500 microseconds when the ping time
// between them is 100 microseconds. Set RTTMillisecond to 1 when it is less
// than 1 million in your environment.
Expand Down
10 changes: 3 additions & 7 deletions execengine.go
Expand Up @@ -348,12 +348,12 @@ func (s *execEngine) taskWorkerMain(workerID uint64) {
return
case <-ticker.C:
nodes, cci = s.loadSMs(workerID, cci, nodes)
s.execSMs(workerID, make(map[uint64]struct{}), nodes, batch, entries, true)
s.execSMs(workerID, make(map[uint64]struct{}), nodes, batch, entries)
batch = make([]rsm.Task, 0, taskBatchSize)
entries = make([]sm.Entry, 0, taskBatchSize)
case <-s.taskWorkReady.waitCh(workerID):
clusterIDMap := s.taskWorkReady.getReadyMap(workerID)
s.execSMs(workerID, clusterIDMap, nodes, batch, entries, false)
s.execSMs(workerID, clusterIDMap, nodes, batch, entries)
}
}
}
Expand All @@ -376,7 +376,7 @@ func (s *execEngine) loadSMs(workerID uint64, cci uint64,

func (s *execEngine) execSMs(workerID uint64,
idmap map[uint64]struct{},
nodes map[uint64]*node, batch []rsm.Task, entries []sm.Entry, fromTick bool) {
nodes map[uint64]*node, batch []rsm.Task, entries []sm.Entry) {
if len(idmap) == 0 {
for k := range nodes {
idmap[k] = struct{}{}
Expand All @@ -403,10 +403,6 @@ func (s *execEngine) execSMs(workerID uint64,
if task.IsSnapshotTask() {
node.handleSnapshotTask(task)
}

if fromTick {
node.maybeUpdateApTracker()
}
}
if p != nil {
p.exec.end()
Expand Down
7 changes: 7 additions & 0 deletions nodehost.go
Expand Up @@ -1677,6 +1677,7 @@ func (nh *NodeHost) tickWorkerMain() {
idx, nodes, qs = nh.getCurrentClusters(idx, nodes, qs)
nh.snapshotStatus.pushReady(nh.getTick())
nh.sendTickMessage(nodes, qs)
nh.checkNodePromotion(nodes)
}
return false
}
Expand Down Expand Up @@ -1756,6 +1757,12 @@ func (nh *NodeHost) sendTickMessage(clusters []*node,
}
}

func (nh *NodeHost) checkNodePromotion(nodes []*node) {
for _, node := range nodes {
node.maybeUpdateApTracker()
}
}

func (nh *NodeHost) checkTransportLatency(clusterID uint64,
to uint64, from uint64, term uint64) {
v := atomic.AddUint64(&nh.msgCount, 1)
Expand Down
11 changes: 11 additions & 0 deletions promotion_manager.go
Expand Up @@ -35,6 +35,7 @@ type promotionManager struct {
readIndex func(resultHandler *PromotionReadIndexResultHandler, readIndexTimeout time.Duration) (*RequestState, error)
promote func(configChangeTimeout time.Duration) (*RequestState, error)
electionTimeout uint64
readIndexC chan RequestResult
promoC chan RequestResult
}

Expand Down Expand Up @@ -72,6 +73,12 @@ func (m *promotionManager) process(currentTime time.Time) {
m.promoC = nil
default:
}
} else if m.hasPendingReadIndex() {
select {
case r := <-m.readIndexC:
m.processReadIndexResult(r.result, time.Now(), time.Now())
default:
}
}

if m.readIndexNeeded() {
Expand All @@ -92,6 +99,10 @@ func (m *promotionManager) readIndexNeeded() bool {
return m.incomplete && !m.hasPendingPromotion()
}

func (m *promotionManager) hasPendingReadIndex() bool {
return m.readIndexC != nil
}

func (m *promotionManager) hasPendingPromotion() bool {
return m.promoC != nil
}
Expand Down

0 comments on commit 54bbe16

Please sign in to comment.