Skip to content

Commit

Permalink
internal/contour: only write status updates if we're the leader
Browse files Browse the repository at this point in the history
Fixes projectcontour#1425
Fixes projectcontour#1385
Updates projectcontour#499

This PR threads the leader elected signal throught to
contour.EventHandler allowing it to skip writing status back to the API
unless it is currently the leader.

This should fixes projectcontour#1425 by removing the condition where several Contours
would fight to update status. This updates projectcontour#499 by continuing to reduce
the number of updates that Contour generates, thereby processes.

This PR does create a condition where during startup no Contour may be
the leader and the xDS tables reach steady state before anyone is
elected. This would mean the status of an object would be stale until
the next update from the API server after leadership was established.
To address this a mechanism to force a rebuild of the dag is added to
the EventHandler and wired to election success.

Signed-off-by: Dave Cheney <dave@cheney.net>
  • Loading branch information
davecheney committed Oct 21, 2019
1 parent c05b03d commit ff2c82d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 6 deletions.
30 changes: 28 additions & 2 deletions cmd/contour/serve.go
Expand Up @@ -37,6 +37,7 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
coreinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/leaderelection"
)

// registerServe registers the serve subcommand and flags
Expand Down Expand Up @@ -242,8 +243,9 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

// step 11. if enabled, register leader election
if !ctx.DisableLeaderElection {
log := log.WithField("context", "leaderelection")
le, _, deposed := newLeaderElector(log, ctx, client, coordinationClient)
var le *leaderelection.LeaderElector
var deposed chan struct{}
le, eh.IsLeader, deposed = newLeaderElector(log, ctx, client, coordinationClient)

g.AddContext(func(electionCtx context.Context) {
log.WithFields(logrus.Fields{
Expand All @@ -255,6 +257,25 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
log.Info("stopped")
})

g.Add(func(stop <-chan struct{}) error {
log := log.WithField("context", "leaderelection-elected")
leader := eh.IsLeader
for {
select {
case <-stop:
// shut down
log.Info("stopped")
return nil
case <-leader:
log.Info("elected as leader, triggering rebuild")
eh.UpdateNow()

// disable this case
leader = nil
}
}
})

g.Add(func(stop <-chan struct{}) error {
// If we get deposed as leader, shut it down.
log := log.WithField("context", "leaderelection-deposer")
Expand All @@ -269,6 +290,11 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
})
} else {
log.Info("Leader election disabled")

// leadership election disabled, hardwire IsLeader to be always readable.
leader := make(chan struct{})
close(leader)
eh.IsLeader = leader
}

// step 12. register our custom metrics and plumb into cache handler
Expand Down
27 changes: 23 additions & 4 deletions internal/contour/handler.go
Expand Up @@ -46,6 +46,11 @@ type EventHandler struct {

logrus.FieldLogger

// IsLeader will become ready to read when this EventHandler becomes
// the leader. If IsLeader is not readable, or nil, status events will
// be suppressed.
IsLeader chan struct{}

update chan interface{}

// last holds the last time CacheHandler.OnUpdate was called.
Expand Down Expand Up @@ -86,6 +91,11 @@ func (e *EventHandler) OnDelete(obj interface{}) {
e.update <- opDelete{obj: obj}
}

// UpdateNow enqueues a DAG update subject to the holdoff timer.
func (e *EventHandler) UpdateNow() {
e.update <- true
}

// Start initializes the EventHandler and returns a function suitable
// for registration with a workgroup.Group.
func (e *EventHandler) Start() func(<-chan struct{}) error {
Expand Down Expand Up @@ -195,6 +205,8 @@ func (e *EventHandler) onUpdate(op interface{}) bool {
return remove || insert
case opDelete:
return e.Builder.Source.Remove(op.obj)
case bool:
return op
default:
return false
}
Expand All @@ -216,11 +228,18 @@ func (e *EventHandler) incSequence() {
func (e *EventHandler) updateDAG() {
dag := e.Builder.Build()
e.CacheHandler.OnChange(dag)
statuses := dag.Statuses()
e.setStatus(statuses)

metrics := calculateIngressRouteMetric(statuses)
e.Metrics.SetIngressRouteMetric(metrics)
select {
case <-e.IsLeader:
// we're the leader, update status and metrics
statuses := dag.Statuses()
e.setStatus(statuses)

metrics := calculateIngressRouteMetric(statuses)
e.Metrics.SetIngressRouteMetric(metrics)
default:
e.Debug("skipping status update: not the leader")
}

e.last = time.Now()
}
Expand Down

0 comments on commit ff2c82d

Please sign in to comment.