From ff2c82d4176acd1df3f2400d741f4ca6f12ec0a7 Mon Sep 17 00:00:00 2001 From: Dave Cheney Date: Mon, 21 Oct 2019 12:48:39 +1100 Subject: [PATCH] internal/contour: only write status updates if we're the leader Fixes #1425 Fixes #1385 Updates #499 This PR threads the leader elected signal throught to contour.EventHandler allowing it to skip writing status back to the API unless it is currently the leader. This should fixes #1425 by removing the condition where several Contours would fight to update status. This updates #499 by continuing to reduce the number of updates that Contour generates, thereby processes. This PR does create a condition where during startup no Contour may be the leader and the xDS tables reach steady state before anyone is elected. This would mean the status of an object would be stale until the next update from the API server after leadership was established. To address this a mechanism to force a rebuild of the dag is added to the EventHandler and wired to election success. Signed-off-by: Dave Cheney --- cmd/contour/serve.go | 30 ++++++++++++++++++++++++++++-- internal/contour/handler.go | 27 +++++++++++++++++++++++---- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index db2a309eb1c..c9e28b6213d 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -37,6 +37,7 @@ import ( "gopkg.in/alecthomas/kingpin.v2" "gopkg.in/yaml.v2" coreinformers "k8s.io/client-go/informers" + "k8s.io/client-go/tools/leaderelection" ) // registerServe registers the serve subcommand and flags @@ -242,8 +243,9 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { // step 11. if enabled, register leader election if !ctx.DisableLeaderElection { - log := log.WithField("context", "leaderelection") - le, _, deposed := newLeaderElector(log, ctx, client, coordinationClient) + var le *leaderelection.LeaderElector + var deposed chan struct{} + le, eh.IsLeader, deposed = newLeaderElector(log, ctx, client, coordinationClient) g.AddContext(func(electionCtx context.Context) { log.WithFields(logrus.Fields{ @@ -255,6 +257,25 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { log.Info("stopped") }) + g.Add(func(stop <-chan struct{}) error { + log := log.WithField("context", "leaderelection-elected") + leader := eh.IsLeader + for { + select { + case <-stop: + // shut down + log.Info("stopped") + return nil + case <-leader: + log.Info("elected as leader, triggering rebuild") + eh.UpdateNow() + + // disable this case + leader = nil + } + } + }) + g.Add(func(stop <-chan struct{}) error { // If we get deposed as leader, shut it down. log := log.WithField("context", "leaderelection-deposer") @@ -269,6 +290,11 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { }) } else { log.Info("Leader election disabled") + + // leadership election disabled, hardwire IsLeader to be always readable. + leader := make(chan struct{}) + close(leader) + eh.IsLeader = leader } // step 12. register our custom metrics and plumb into cache handler diff --git a/internal/contour/handler.go b/internal/contour/handler.go index 3ad35790b8f..5fe06002980 100644 --- a/internal/contour/handler.go +++ b/internal/contour/handler.go @@ -46,6 +46,11 @@ type EventHandler struct { logrus.FieldLogger + // IsLeader will become ready to read when this EventHandler becomes + // the leader. If IsLeader is not readable, or nil, status events will + // be suppressed. + IsLeader chan struct{} + update chan interface{} // last holds the last time CacheHandler.OnUpdate was called. @@ -86,6 +91,11 @@ func (e *EventHandler) OnDelete(obj interface{}) { e.update <- opDelete{obj: obj} } +// UpdateNow enqueues a DAG update subject to the holdoff timer. +func (e *EventHandler) UpdateNow() { + e.update <- true +} + // Start initializes the EventHandler and returns a function suitable // for registration with a workgroup.Group. func (e *EventHandler) Start() func(<-chan struct{}) error { @@ -195,6 +205,8 @@ func (e *EventHandler) onUpdate(op interface{}) bool { return remove || insert case opDelete: return e.Builder.Source.Remove(op.obj) + case bool: + return op default: return false } @@ -216,11 +228,18 @@ func (e *EventHandler) incSequence() { func (e *EventHandler) updateDAG() { dag := e.Builder.Build() e.CacheHandler.OnChange(dag) - statuses := dag.Statuses() - e.setStatus(statuses) - metrics := calculateIngressRouteMetric(statuses) - e.Metrics.SetIngressRouteMetric(metrics) + select { + case <-e.IsLeader: + // we're the leader, update status and metrics + statuses := dag.Statuses() + e.setStatus(statuses) + + metrics := calculateIngressRouteMetric(statuses) + e.Metrics.SetIngressRouteMetric(metrics) + default: + e.Debug("skipping status update: not the leader") + } e.last = time.Now() }