Skip to content

Commit

Permalink
internal/k8s: Add StatusUpdateHandler to centralize status updates (#…
Browse files Browse the repository at this point in the history
…2530)

This is step 1 in a three phase plan:
- Add a StatusUpdateHandler that takes status updates for Ingress objects and applies them if they are a change to the apiserver.
- Add support for HTTPProxy to StatusUpdateHandler.
- Move the existing `internal/dag` status updates to use `StatusUpdateHandler`

For full details, see #2371.

Also add StatusUpdater as an interface, to allow testing of the IngressStatusUpdater Informer.

Rename `k8s.IngressStatusUpdater` to `k8s.StatusAddressUpdater`, in preparation for it being used for HTTPProxy objects as well.

Updates #2371

Signed-off-by: Nick Young <ynick@vmware.com>
  • Loading branch information
youngnick committed May 22, 2020
1 parent e596934 commit dd51af0
Show file tree
Hide file tree
Showing 7 changed files with 688 additions and 213 deletions.
21 changes: 11 additions & 10 deletions cmd/contour/ingressstatus.go
Expand Up @@ -39,11 +39,12 @@ import (
// is been received, operation restarts at step 3.
// 5. If the worker is stopped, any existing informer is stopped before the worker stops.
type loadBalancerStatusWriter struct {
log logrus.FieldLogger
clients *k8s.Clients
isLeader chan struct{}
lbStatus chan v1.LoadBalancerStatus
ingressClass string
log logrus.FieldLogger
clients *k8s.Clients
isLeader chan struct{}
lbStatus chan v1.LoadBalancerStatus
statusUpdater k8s.StatusUpdater
ingressClass string
}

func (isw *loadBalancerStatusWriter) Start(stop <-chan struct{}) error {
Expand Down Expand Up @@ -87,11 +88,11 @@ func (isw *loadBalancerStatusWriter) Start(stop <-chan struct{}) error {
// Create new informer for the new LoadBalancerStatus
factory := isw.clients.NewInformerFactory()
inf := factory.Networking().V1beta1().Ingresses().Informer()
inf.AddEventHandler(&k8s.IngressStatusUpdater{
Client: isw.clients.ClientSet(),
Logger: log,
Status: lbs,
IngressClass: isw.ingressClass,
inf.AddEventHandler(&k8s.StatusAddressUpdater{
Logger: log,
LBStatus: lbs,
IngressClass: isw.ingressClass,
StatusUpdater: isw.statusUpdater,
})

shutdown = make(chan struct{})
Expand Down
20 changes: 15 additions & 5 deletions cmd/contour/serve.go
Expand Up @@ -316,13 +316,23 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
// step 11. register leadership election.
eventHandler.IsLeader = setupLeadershipElection(&g, log, ctx, clients, eventHandler.UpdateNow)

sh := k8s.StatusUpdateHandler{
Log: log.WithField("context", "StatusUpdateWriter"),
Clients: clients,
LeaderElected: eventHandler.IsLeader,
Converter: converter,
}
suw := sh.Writer()
g.Add(sh.Start)

// step 11. set up ingress load balancer status writer
lbsw := loadBalancerStatusWriter{
log: log.WithField("context", "loadBalancerStatusWriter"),
clients: clients,
isLeader: eventHandler.IsLeader,
lbStatus: make(chan v1.LoadBalancerStatus, 1),
ingressClass: ctx.ingressClass,
log: log.WithField("context", "loadBalancerStatusWriter"),
clients: clients,
isLeader: eventHandler.IsLeader,
lbStatus: make(chan v1.LoadBalancerStatus, 1),
ingressClass: ctx.ingressClass,
statusUpdater: suw,
}
g.Add(lbsw.Start)

Expand Down
38 changes: 38 additions & 0 deletions internal/k8s/helpers.go
@@ -0,0 +1,38 @@
// Copyright © 2020 VMware
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8s

import (
"github.com/google/go-cmp/cmp"
"k8s.io/api/networking/v1beta1"
)

// IsStatusEqual checks that two objects of supported Kubernetes types
// have equivalent Status structs.
// Currently supports:
// networking.k8s.io/ingress/v1beta1
func IsStatusEqual(objA, objB interface{}) bool {

switch a := objA.(type) {
case *v1beta1.Ingress:
switch b := objB.(type) {
case *v1beta1.Ingress:
if cmp.Equal(a.Status, b.Status) {
return true
}
}
}

return false
}
161 changes: 0 additions & 161 deletions internal/k8s/ingressstatus_test.go

This file was deleted.

75 changes: 38 additions & 37 deletions internal/k8s/ingressstatus.go → internal/k8s/statusaddress.go
Expand Up @@ -14,86 +14,87 @@
package k8s

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/projectcontour/contour/internal/annotation"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
clientset "k8s.io/client-go/kubernetes"
)

// IngressStatusUpdater observes informer OnAdd events and
// StatusAddressUpdater observes informer OnAdd and OnUpdate events and
// updates the ingress.status.loadBalancer field on all Ingress
// objects that match the ingress class (if used).
type IngressStatusUpdater struct {
Client clientset.Interface
Logger logrus.FieldLogger
Status v1.LoadBalancerStatus
IngressClass string
// Note that this is intended to handle updating the status.loadBalancer struct only,
// not more general status updates. That's a job for the StatusUpdater.
type StatusAddressUpdater struct {
Logger logrus.FieldLogger
LBStatus v1.LoadBalancerStatus
IngressClass string
StatusUpdater StatusUpdater
}

func (s *IngressStatusUpdater) OnAdd(obj interface{}) {
func (s *StatusAddressUpdater) OnAdd(obj interface{}) {

ing := obj.(*v1beta1.Ingress).DeepCopy()
ing.SetGroupVersionKind(v1beta1.SchemeGroupVersion.WithKind("ingress"))
if !annotation.MatchesIngressClass(ing, s.IngressClass) {
s.Logger.
WithField("name", ing.GetName()).
WithField("namespace", ing.GetNamespace()).
WithField("ingress-class", annotation.IngressClass(ing)).
WithField("configured-ingress-class", s.IngressClass).
Debug("unmatched ingress class, skip status update")
return
}

ing.Status.LoadBalancer = s.Status
_, err := s.Client.NetworkingV1beta1().Ingresses(ing.GetNamespace()).UpdateStatus(context.TODO(), ing, metav1.UpdateOptions{})
if err != nil {
s.Logger.
WithField("name", ing.GetName()).
WithField("namespace", ing.GetNamespace()).
WithError(err).Error("unable to update status")
}
s.StatusUpdater.Update(
ing.GetName(),
ing.GetNamespace(),
v1beta1.SchemeGroupVersion.WithResource("ingresses"),
StatusMutatorFunc(ingressMutator(s.LBStatus)),
)
}

func (s *IngressStatusUpdater) OnUpdate(oldObj, newObj interface{}) {
func (s *StatusAddressUpdater) OnUpdate(oldObj, newObj interface{}) {

oldIng := oldObj.(*v1beta1.Ingress).DeepCopy()
newIng := newObj.(*v1beta1.Ingress).DeepCopy()

// We need to only act when things come *into* our ingressclass scope. When they fall out, we don't care about them any
// more, and it's the new controller's job to fix things.
// Note that this also handles the case where someone deletes the annotation
if !annotation.MatchesIngressClass(oldIng, s.IngressClass) && annotation.MatchesIngressClass(newIng, s.IngressClass) {
newIng.SetGroupVersionKind(v1beta1.SchemeGroupVersion.WithKind("ingress"))
if annotation.MatchesIngressClass(newIng, s.IngressClass) {
// Add status because we started matching ingress-class.
s.Logger.
WithField("name", newIng.GetName()).
WithField("namespace", newIng.GetNamespace()).
WithField("ingress-class", annotation.IngressClass(newIng)).
WithField("configured-ingress-class", s.IngressClass).
Debug("Updated Ingress is in scope, updating")
newIng.Status.LoadBalancer = s.Status
_, err := s.Client.NetworkingV1beta1().Ingresses(newIng.GetNamespace()).UpdateStatus(context.TODO(), newIng, metav1.UpdateOptions{})
if err != nil {
s.Logger.
WithField("name", newIng.GetName()).
WithField("namespace", newIng.GetNamespace()).
WithError(err).Error("unable to update status")
}
s.StatusUpdater.Update(
newIng.GetName(),
newIng.GetNamespace(),
v1beta1.SchemeGroupVersion.WithResource("ingresses"),
StatusMutatorFunc(ingressMutator(s.LBStatus)),
)
}

// TODO(youngnick): There is a possibility that someone else may have edited the status, and we would then have
// no way to fix the object, because we're only operating on ingress-class change. After consideration, we've decided that
// editing the status subresource is hard enough that if someone does, they must have a reason. We can revisit if required.
// Checking annotation.MatchesIngressClass(newIng, s.IngressClass) && !reflect.DeepEqual(newIng.Status.Loadbalancer, s.Status)
// Checking annotation.MatchesIngressClass(newIng, s.IngressClass) && !reflect.DeepEqual(newIng.Status.Loadbalancer, s.LBStatus)
// would probably do it, but we have no way to verify for now.
}

func (s *IngressStatusUpdater) OnDelete(obj interface{}) {
func (s *StatusAddressUpdater) OnDelete(obj interface{}) {
// we don't need to update the status on resources that
// have been deleted.
}

func ingressMutator(lbstatus v1.LoadBalancerStatus) func(obj interface{}) interface{} {

return func(obj interface{}) interface{} {
ing := obj.(*v1beta1.Ingress).DeepCopy()
ing.Status.LoadBalancer = lbstatus
return ing
}
}

// ServiceStatusLoadBalancerWatcher implements ResourceEventHandler and
// watches for changes to the status.loadbalancer field
// Note that we specifically *don't* inspect inside the struct, as sending empty values
Expand Down

0 comments on commit dd51af0

Please sign in to comment.