diff --git a/network/topologytransmogrifier.go b/network/topologytransmogrifier.go index 718d08a..b8b72c2 100644 --- a/network/topologytransmogrifier.go +++ b/network/topologytransmogrifier.go @@ -428,15 +428,19 @@ func (tt *TopologyTransmogrifier) selectGoal(goal *configuration.NextConfigurati } } -func (tt *TopologyTransmogrifier) enqueueTick(task topologyTask, backoff *server.BinaryBackoffEngine) { - backoff.After(func() { - tt.enqueueQuery(topologyTransmogrifierMsgExe(func() error { - if tt.task == task { - return tt.task.tick() - } - return nil - })) - }) +func (tt *TopologyTransmogrifier) enqueueTick(task topologyTask, tc *targetConfig) { + if !tc.tickEnqueued { + tc.tickEnqueued = true + tc.backoff.After(func() { + tt.enqueueQuery(topologyTransmogrifierMsgExe(func() error { + tc.tickEnqueued = false + if tt.task == task { + return tt.task.tick() + } + return nil + })) + }) + } } func (tt *TopologyTransmogrifier) migrationReceived(migration topologyTransmogrifierMsgMigration) error { @@ -550,13 +554,16 @@ type topologyTask interface { type targetConfig struct { *TopologyTransmogrifier - config *configuration.NextConfiguration - sender paxos.ServerConnectionSubscriber - backoff *server.BinaryBackoffEngine + config *configuration.NextConfiguration + sender paxos.ServerConnectionSubscriber + backoff *server.BinaryBackoffEngine + tickEnqueued bool } func (task *targetConfig) tick() error { task.backoff = nil + task.tickEnqueued = false + switch { case task.active == nil: log.Println("Topology: Ensuring local topology.") @@ -711,7 +718,7 @@ func (task *targetConfig) isInRMs(rmIds common.RMIds) bool { func (task *targetConfig) createOrAdvanceBackoff() { if task.backoff == nil { - task.backoff = server.NewBinaryBackoffEngine(task.rng, server.SubmissionMinSubmitDelay, server.SubmissionMaxSubmitDelay) + task.backoff = server.NewBinaryBackoffEngine(task.rng, server.SubmissionMinSubmitDelay, time.Duration(len(task.config.Hosts))*server.SubmissionMaxSubmitDelay) } else { task.backoff.Advance() } @@ -910,22 +917,23 @@ func (task *installTargetOld) tick() error { } if resubmit { task.createOrAdvanceBackoff() - task.enqueueTick(task, task.backoff) + task.enqueueTick(task, task.targetConfig) return nil } targetTopology.Roots = append(targetTopology.Roots, roots...) } targetTopology.SetClusterUUId(task.active.ClusterUUId()) - log.Println("Set cluster uuid", targetTopology.ClusterUUId()) + server.Log("Set cluster uuid", targetTopology.ClusterUUId()) _, resubmit, err := task.rewriteTopology(task.active, targetTopology, active, passive) if err != nil { return task.fatal(err) } if resubmit { + server.Log("Topology: Installing to old requires resubmit.") task.createOrAdvanceBackoff() - task.enqueueTick(task, task.backoff) + task.enqueueTick(task, task.targetConfig) return nil } // Must be badread, which means again we should receive the @@ -1221,9 +1229,9 @@ func (task *installTargetNew) tick() error { return task.fatal(err) } if resubmit { - server.Log("Topology: Topology extension requires resubmit.") + server.Log("Topology: Installing to new requires resubmit.") task.createOrAdvanceBackoff() - task.enqueueTick(task, task.backoff) + task.enqueueTick(task, task.targetConfig) } return nil } @@ -1290,7 +1298,7 @@ func (task *awaitBarrier1) tick() error { if resubmit { server.Log("Topology: Barrier1 reached. Requires resubmit.") task.createOrAdvanceBackoff() - task.enqueueTick(task, task.backoff) + task.enqueueTick(task, task.targetConfig) } } else if activeNextConfig != task.installing { @@ -1400,7 +1408,7 @@ func (task *awaitBarrier2) tick() error { if resubmit { server.Log("Topology: Barrier2 reached. Requires resubmit.") task.createOrAdvanceBackoff() - task.enqueueTick(task, task.backoff) + task.enqueueTick(task, task.targetConfig) } } else if activeNextConfig != task.installing { @@ -1502,7 +1510,7 @@ func (task *migrate) tick() error { } if resubmit { task.createOrAdvanceBackoff() - task.enqueueTick(task, task.backoff) + task.enqueueTick(task, task.targetConfig) return nil } // Must be badread, which means again we should receive the @@ -1588,7 +1596,7 @@ func (task *installCompletion) tick() error { } if resubmit { task.createOrAdvanceBackoff() - task.enqueueTick(task, task.backoff) + task.enqueueTick(task, task.targetConfig) return nil } // Must be badread, which means again we should receive the @@ -1831,7 +1839,7 @@ func (task *targetConfig) attemptCreateRoots(rootCount int) (bool, configuration } ctxn.SetActions(actions) txnReader, result, err := task.localConnection.RunClientTransaction(&ctxn, nil, nil) - log.Println("Create root result", result, err) + server.Log("Create root result", result, err) if err != nil { return false, nil, err }