Skip to content
Permalink
Browse files

TrafficController enqueues all TrafficTarget objects on changes

The mechanics of the bug indicated by the unit test boils down to the
following: traffic controller ensures to bring traffic targets to the
desired state but once a contender tt has been scaled up, the incumbent
tt is being scaled down. This implies on rebalancing the weights of tt's
but none of the observant objects (neither tt nor pods) are effectively
changing on contender side: only on incumbent.

What this patch promotes is: traffic controller reacts to traffic target
changes and enqueues all known traffic targets for this application.
This guarantees converging on the weight balance under conditions of
non-changing objects.

Signed-off-by: Oleg Sidorov <oleg.sidorov@booking.com>
  • Loading branch information
icanhazbroccoli authored and parhamdoustdar committed Nov 19, 2019
1 parent 2b8f241 commit f026f163142f53fca080bdaadd6f4e7b11d39de8
Showing with 35 additions and 6 deletions.
  1. +35 −6 pkg/controller/traffic/traffic_controller.go
@@ -76,12 +76,12 @@ func NewController(
klog.Info("Setting up event handlers")
// Set up an event handler for when TrafficTarget resources change.
trafficTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueTrafficTarget,
AddFunc: controller.enqueueAllTrafficTargets,
UpdateFunc: func(old, new interface{}) {
controller.enqueueTrafficTarget(new)
controller.enqueueAllTrafficTargets(new)
},
// The sync handler needs to cope with the case where the object was deleted.
DeleteFunc: controller.enqueueTrafficTarget,
DeleteFunc: controller.enqueueAllTrafficTargets,
})

store.AddSubscriptionCallback(controller.subscribeToAppClusterEvents)
@@ -378,14 +378,43 @@ func (c *Controller) enqueueTrafficTarget(obj interface{}) {
c.workqueue.Add(key)
}

func (c *Controller) enqueueAllTrafficTargets(obj interface{}) {
trafficTarget, ok := obj.(*shipper.TrafficTarget)
if !ok {
runtime.HandleError(fmt.Errorf("not a shipper.TrafficTarget: %#v", obj))
return
}

namespace := trafficTarget.Namespace

appName, ok := trafficTarget.Labels[shipper.AppLabel]
if !ok {
runtime.HandleError(fmt.Errorf("TrafficTarget %s/%s is missing app label",
namespace, trafficTarget.Name))
return
}

selector := labels.Set{shipper.AppLabel: appName}.AsSelector()
trafficTargets, err := c.trafficTargetsLister.TrafficTargets(namespace).List(selector)
if err != nil {
runtime.HandleError(fmt.Errorf(
"cannot list traffic targets for app '%s/%s': %s",
namespace, appName, err))
}

for _, tt := range trafficTargets {
c.enqueueTrafficTarget(tt)
}
}

func (c *Controller) enqueueTrafficTargetsFromPod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
runtime.HandleError(fmt.Errorf("not a corev1.Pod: %#v", obj))
return
}

app, ok := pod.GetLabels()[shipper.AppLabel]
appName, ok := pod.GetLabels()[shipper.AppLabel]
if !ok {
runtime.HandleError(fmt.Errorf(
"object %q does not have label %s. FilterFunc not working?",
@@ -394,15 +423,15 @@ func (c *Controller) enqueueTrafficTargetsFromPod(obj interface{}) {
}

namespace := pod.Namespace
selector := labels.Set{shipper.AppLabel: app}.AsSelector()
selector := labels.Set{shipper.AppLabel: appName}.AsSelector()
trafficTargets, err := c.trafficTargetsLister.TrafficTargets(namespace).List(selector)
if err != nil {
err = shippererrors.NewKubeclientListError(
shipper.SchemeGroupVersion.WithKind("TrafficTarget"),
namespace, selector, err)
runtime.HandleError(fmt.Errorf(
"cannot list traffic targets for app '%s/%s': %s",
namespace, app, err))
namespace, appName, err))
}

for _, tt := range trafficTargets {

0 comments on commit f026f16

Please sign in to comment.
You can’t perform that action at this time.