Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
traffic controller: watch Pod events
Browse files Browse the repository at this point in the history
The traffic controller needs to be notified of changes to pods, so it
can update the status of TrafficTargets, and make changes to other pods
if necessary. Currently, it relies on a resync of the TrafficTarget to
trigger any changes to pods that need to receive traffic.

If we run shipper without resyncs for long enough, it's possible that
pods will die and be re-scheduled without ever receiving traffic again.

Now, we apply the same technique we used in the other controllers to
watch for events in the application cluster objects.
  • Loading branch information
juliogreff committed Oct 1, 2019
1 parent 2076bd6 commit 7ed9c74
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 8 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/installation/installation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,13 @@ func (c *Controller) enqueueInstallationTargetFromObject(obj interface{}) {
// boundaries. While technically it's probably ok, I feel like it'd be
// abusing the feature.
rel := kubeobj.GetLabels()[shipper.ReleaseLabel]
tt, err := c.getInstallationTargetForReleaseAndNamespace(rel, kubeobj.GetNamespace())
it, err := c.getInstallationTargetForReleaseAndNamespace(rel, kubeobj.GetNamespace())
if err != nil {
runtime.HandleError(fmt.Errorf("cannot get installation target for release '%s/%s': %#v", rel, kubeobj.GetNamespace(), err))
return
}

c.enqueueInstallationTarget(tt)
c.enqueueInstallationTarget(it)
}

func (c *Controller) getInstallationTargetForReleaseAndNamespace(release, namespace string) (*shipper.InstallationTarget, error) {
Expand Down
56 changes: 53 additions & 3 deletions pkg/controller/traffic/traffic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
listers "github.com/bookingcom/shipper/pkg/client/listers/shipper/v1alpha1"
"github.com/bookingcom/shipper/pkg/clusterclientstore"
"github.com/bookingcom/shipper/pkg/conditions"
shippercontroller "github.com/bookingcom/shipper/pkg/controller"
shippererrors "github.com/bookingcom/shipper/pkg/errors"
)

Expand Down Expand Up @@ -79,13 +80,21 @@ func NewController(
DeleteFunc: controller.enqueueTrafficTarget,
})

store.AddSubscriptionCallback(func(informerFactory kubeinformers.SharedInformerFactory) {
informerFactory.Core().V1().Pods().Informer()
})
store.AddSubscriptionCallback(controller.subscribeToPodEvents)
store.AddEventHandlerCallback(controller.registerPodEventHandlers)

return controller
}

func (c *Controller) registerPodEventHandlers(informerFactory kubeinformers.SharedInformerFactory, clusterName string) {
handler := shippercontroller.NewAppClusterEventHandler(c.enqueueTrafficTargetFromPod)
informerFactory.Core().V1().Pods().Informer().AddEventHandler(handler)
}

func (c *Controller) subscribeToPodEvents(informerFactory kubeinformers.SharedInformerFactory) {
informerFactory.Core().V1().Pods().Informer()
}

// Run will set up the event handlers for types we are interested in, as well as
// syncing informer caches and starting workers. It will block until stopCh is
// closed, at which point it will shutdown the workqueue and wait for workers to
Expand Down Expand Up @@ -382,3 +391,44 @@ func (c *Controller) enqueueTrafficTarget(obj interface{}) {

c.workqueue.Add(key)
}

func (c *Controller) enqueueTrafficTargetFromPod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
runtime.HandleError(fmt.Errorf("not a corev1.Pod: %#v", obj))
return
}

// Using ReleaseLabel here instead of the full set of labels because we
// can't guarantee that there isn't extra stuff there that was put
// directly in the chart.
// Also not using ObjectReference here because it would go over cluster
// boundaries. While technically it's probably ok, I feel like it'd be
// abusing the feature.
rel := pod.GetLabels()[shipper.ReleaseLabel]
tt, err := c.getTrafficTargetForReleaseAndNamespace(rel, pod.GetNamespace())
if err != nil {
runtime.HandleError(fmt.Errorf("cannot get traffic target for release '%s/%s': %#v", rel, pod.GetNamespace(), err))
return
}

c.enqueueTrafficTarget(tt)
}

func (c *Controller) getTrafficTargetForReleaseAndNamespace(release, namespace string) (*shipper.TrafficTarget, error) {
selector := labels.Set{shipper.ReleaseLabel: release}.AsSelector()
gvk := shipper.SchemeGroupVersion.WithKind("TrafficTarget")

trafficTargets, err := c.trafficTargetsLister.TrafficTargets(namespace).List(selector)
if err != nil {
return nil, shippererrors.NewKubeclientListError(gvk, namespace, selector, err)
}

expected := 1
if got := len(trafficTargets); got != 1 {
return nil, shippererrors.NewUnexpectedObjectCountFromSelectorError(
selector, gvk, expected, got)
}

return trafficTargets[0], nil
}
6 changes: 3 additions & 3 deletions pkg/controller/traffic/traffic_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func (f *fixture) run() {
},
}

informer.Start(stopCh)
informer.WaitForCacheSync(stopCh)

go store.Run(stopCh)

wait.PollUntil(
Expand All @@ -203,9 +206,6 @@ func (f *fixture) run() {
stopCh,
)

informer.Start(stopCh)
informer.WaitForCacheSync(stopCh)

wait.PollUntil(
10*time.Millisecond,
func() (bool, error) { return controller.workqueue.Len() >= f.trafficTargetCount, nil },
Expand Down

0 comments on commit 7ed9c74

Please sign in to comment.