Skip to content

Commit

Permalink
Refactoring; don't allow multiple enqueued ticks for a task; allow to…
Browse files Browse the repository at this point in the history
…pology txns to back off much much further.

--HG--
branch : dev
  • Loading branch information
msackman committed Nov 14, 2016
1 parent e13e1d8 commit e81fd4a
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions network/topologytransmogrifier.go
Expand Up @@ -428,15 +428,19 @@ func (tt *TopologyTransmogrifier) selectGoal(goal *configuration.NextConfigurati
}
}

func (tt *TopologyTransmogrifier) enqueueTick(task topologyTask, backoff *server.BinaryBackoffEngine) {
backoff.After(func() {
tt.enqueueQuery(topologyTransmogrifierMsgExe(func() error {
if tt.task == task {
return tt.task.tick()
}
return nil
}))
})
func (tt *TopologyTransmogrifier) enqueueTick(task topologyTask, tc *targetConfig) {
if !tc.tickEnqueued {
tc.tickEnqueued = true
tc.backoff.After(func() {
tt.enqueueQuery(topologyTransmogrifierMsgExe(func() error {
tc.tickEnqueued = false
if tt.task == task {
return tt.task.tick()
}
return nil
}))
})
}
}

func (tt *TopologyTransmogrifier) migrationReceived(migration topologyTransmogrifierMsgMigration) error {
Expand Down Expand Up @@ -550,13 +554,16 @@ type topologyTask interface {

type targetConfig struct {
*TopologyTransmogrifier
config *configuration.NextConfiguration
sender paxos.ServerConnectionSubscriber
backoff *server.BinaryBackoffEngine
config *configuration.NextConfiguration
sender paxos.ServerConnectionSubscriber
backoff *server.BinaryBackoffEngine
tickEnqueued bool
}

func (task *targetConfig) tick() error {
task.backoff = nil
task.tickEnqueued = false

switch {
case task.active == nil:
log.Println("Topology: Ensuring local topology.")
Expand Down Expand Up @@ -711,7 +718,7 @@ func (task *targetConfig) isInRMs(rmIds common.RMIds) bool {

func (task *targetConfig) createOrAdvanceBackoff() {
if task.backoff == nil {
task.backoff = server.NewBinaryBackoffEngine(task.rng, server.SubmissionMinSubmitDelay, server.SubmissionMaxSubmitDelay)
task.backoff = server.NewBinaryBackoffEngine(task.rng, server.SubmissionMinSubmitDelay, time.Duration(len(task.config.Hosts))*server.SubmissionMaxSubmitDelay)
} else {
task.backoff.Advance()
}
Expand Down Expand Up @@ -910,22 +917,23 @@ func (task *installTargetOld) tick() error {
}
if resubmit {
task.createOrAdvanceBackoff()
task.enqueueTick(task, task.backoff)
task.enqueueTick(task, task.targetConfig)
return nil
}
targetTopology.Roots = append(targetTopology.Roots, roots...)
}

targetTopology.SetClusterUUId(task.active.ClusterUUId())
log.Println("Set cluster uuid", targetTopology.ClusterUUId())
server.Log("Set cluster uuid", targetTopology.ClusterUUId())

_, resubmit, err := task.rewriteTopology(task.active, targetTopology, active, passive)
if err != nil {
return task.fatal(err)
}
if resubmit {
server.Log("Topology: Installing to old requires resubmit.")
task.createOrAdvanceBackoff()
task.enqueueTick(task, task.backoff)
task.enqueueTick(task, task.targetConfig)
return nil
}
// Must be badread, which means again we should receive the
Expand Down Expand Up @@ -1221,9 +1229,9 @@ func (task *installTargetNew) tick() error {
return task.fatal(err)
}
if resubmit {
server.Log("Topology: Topology extension requires resubmit.")
server.Log("Topology: Installing to new requires resubmit.")
task.createOrAdvanceBackoff()
task.enqueueTick(task, task.backoff)
task.enqueueTick(task, task.targetConfig)
}
return nil
}
Expand Down Expand Up @@ -1290,7 +1298,7 @@ func (task *awaitBarrier1) tick() error {
if resubmit {
server.Log("Topology: Barrier1 reached. Requires resubmit.")
task.createOrAdvanceBackoff()
task.enqueueTick(task, task.backoff)
task.enqueueTick(task, task.targetConfig)
}

} else if activeNextConfig != task.installing {
Expand Down Expand Up @@ -1400,7 +1408,7 @@ func (task *awaitBarrier2) tick() error {
if resubmit {
server.Log("Topology: Barrier2 reached. Requires resubmit.")
task.createOrAdvanceBackoff()
task.enqueueTick(task, task.backoff)
task.enqueueTick(task, task.targetConfig)
}

} else if activeNextConfig != task.installing {
Expand Down Expand Up @@ -1502,7 +1510,7 @@ func (task *migrate) tick() error {
}
if resubmit {
task.createOrAdvanceBackoff()
task.enqueueTick(task, task.backoff)
task.enqueueTick(task, task.targetConfig)
return nil
}
// Must be badread, which means again we should receive the
Expand Down Expand Up @@ -1588,7 +1596,7 @@ func (task *installCompletion) tick() error {
}
if resubmit {
task.createOrAdvanceBackoff()
task.enqueueTick(task, task.backoff)
task.enqueueTick(task, task.targetConfig)
return nil
}
// Must be badread, which means again we should receive the
Expand Down Expand Up @@ -1831,7 +1839,7 @@ func (task *targetConfig) attemptCreateRoots(rootCount int) (bool, configuration
}
ctxn.SetActions(actions)
txnReader, result, err := task.localConnection.RunClientTransaction(&ctxn, nil, nil)
log.Println("Create root result", result, err)
server.Log("Create root result", result, err)
if err != nil {
return false, nil, err
}
Expand Down

0 comments on commit e81fd4a

Please sign in to comment.