Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
installation controller: reinstall deleted objects
Browse files Browse the repository at this point in the history
One of the biggest complaints we get from users not being able to finish
their rollouts is that their incumbents are unhealthy, and one of the
biggest causes for that is when Deployments disappear[1] from the target
clusters. This happens because the installation controller does not even
try to replace objects after its first pass is completed, and
CanOverride is set to false.

Now, the installation controller will do a full run of the installer on
every sync, reinstalling any objects that are gone, whatever their kind.
Additionally, we'll start to listen to Service and Deployment events, so
that we can sync the InstallationTarget whenever someone messes with
these objects in the target clusters. We do this only for Deployments
and Services because they're the only ones that can cause shipper to
hang, as it depends on them in the capacity and traffic controller,
respectively.

[1] Users swear they haven't deleted them. We have yet to find another
cause for this mistery.
  • Loading branch information
juliogreff committed Sep 30, 2019
1 parent 58734d3 commit 2076bd6
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 73 deletions.
5 changes: 4 additions & 1 deletion pkg/controller/capacity/capacity_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
listers "github.com/bookingcom/shipper/pkg/client/listers/shipper/v1alpha1"
"github.com/bookingcom/shipper/pkg/clusterclientstore"
"github.com/bookingcom/shipper/pkg/conditions"
shippercontroller "github.com/bookingcom/shipper/pkg/controller"
shippererrors "github.com/bookingcom/shipper/pkg/errors"
"github.com/bookingcom/shipper/pkg/util/replicas"
)
Expand Down Expand Up @@ -304,7 +305,9 @@ func (c *Controller) enqueueCapacityTarget(obj interface{}) {
}

func (c *Controller) registerDeploymentEventHandlers(informerFactory kubeinformers.SharedInformerFactory, clusterName string) {
informerFactory.Apps().V1().Deployments().Informer().AddEventHandler(c.NewDeploymentResourceEventHandler(clusterName))
informerFactory.Apps().V1().Deployments().Informer().AddEventHandler(
shippercontroller.NewAppClusterEventHandler(c.enqueueCapacityTargetFromDeployment),
)
}

func (c *Controller) subscribeToDeployments(informerFactory kubeinformers.SharedInformerFactory) {
Expand Down
45 changes: 9 additions & 36 deletions pkg/controller/capacity/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

shipper "github.com/bookingcom/shipper/pkg/apis/shipper/v1alpha1"
shippererrors "github.com/bookingcom/shipper/pkg/errors"
)

func (c Controller) NewDeploymentResourceEventHandler(clusterName string) cache.ResourceEventHandler {
return cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
deploy, ok := obj.(*appsv1.Deployment)
if !ok {
klog.Warningf("Received something that's not a appsv1/Deployment: %v", obj)
return false
}

_, ok = deploy.GetLabels()[shipper.ReleaseLabel]

return ok
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueCapacityTargetFromDeployment(obj, clusterName)
},
UpdateFunc: func(old, new interface{}) {
c.enqueueCapacityTargetFromDeployment(new, clusterName)
},
DeleteFunc: func(obj interface{}) {
c.enqueueCapacityTargetFromDeployment(obj, clusterName)
},
},
}
}

func (c *Controller) enqueueCapacityTargetFromDeployment(obj interface{}, clusterName string) {
func (c *Controller) enqueueCapacityTargetFromDeployment(obj interface{}) {
deployment, ok := obj.(*appsv1.Deployment)
if !ok {
runtime.HandleError(fmt.Errorf("not a Deployment: %#v", obj))
Expand All @@ -60,7 +31,7 @@ func (c *Controller) enqueueCapacityTargetFromDeployment(obj interface{}, cluste
rel := deployment.GetLabels()[shipper.ReleaseLabel]
ct, err := c.getCapacityTargetForReleaseAndNamespace(rel, deployment.GetNamespace())
if err != nil {
runtime.HandleError(fmt.Errorf("cannot get capacity target for release '%s/%s': %#v", rel, deployment.GetNamespace(), obj))
runtime.HandleError(fmt.Errorf("cannot get capacity target for release '%s/%s': %#v", rel, deployment.GetNamespace(), err))
return
}

Expand All @@ -69,15 +40,17 @@ func (c *Controller) enqueueCapacityTargetFromDeployment(obj interface{}, cluste

func (c Controller) getCapacityTargetForReleaseAndNamespace(release, namespace string) (*shipper.CapacityTarget, error) {
selector := labels.Set{shipper.ReleaseLabel: release}.AsSelector()
gvk := shipper.SchemeGroupVersion.WithKind("CapacityTarget")

capacityTargets, err := c.capacityTargetsLister.CapacityTargets(namespace).List(selector)
if err != nil {
return nil, shippererrors.NewKubeclientListError(
shipper.SchemeGroupVersion.WithKind("CapacityTarget"),
namespace, selector, err)
return nil, shippererrors.NewKubeclientListError(gvk, namespace, selector, err)
}

if l := len(capacityTargets); l != 1 {
return nil, shippererrors.NewInvalidCapacityTargetError(release, l)
expected := 1
if got := len(capacityTargets); got != 1 {
return nil, shippererrors.NewUnexpectedObjectCountFromSelectorError(
selector, gvk, expected, got)
}

return capacityTargets[0], nil
Expand Down
35 changes: 35 additions & 0 deletions pkg/controller/eventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package controller

import (
shipper "github.com/bookingcom/shipper/pkg/apis/shipper/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

func NewAppClusterEventHandler(callback func(obj interface{})) cache.ResourceEventHandler {
return cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
kubeobj, ok := obj.(metav1.Object)
if !ok {
klog.Warningf("Received something that's not a metav1/Object: %v", obj)
return false
}

_, ok = kubeobj.GetLabels()[shipper.ReleaseLabel]

return ok
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
callback(obj)
},
UpdateFunc: func(old, new interface{}) {
callback(new)
},
DeleteFunc: func(obj interface{}) {
callback(obj)
},
},
}
}
64 changes: 59 additions & 5 deletions pkg/controller/installation/installation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -102,9 +105,24 @@ func NewController(
},
})

store.AddSubscriptionCallback(controller.subscribeToAppClusterEvents)
store.AddEventHandlerCallback(controller.registerAppClusterEventHandlers)

return controller
}

func (c *Controller) registerAppClusterEventHandlers(informerFactory kubeinformers.SharedInformerFactory, clusterName string) {
handler := shippercontroller.NewAppClusterEventHandler(c.enqueueInstallationTargetFromObject)

informerFactory.Apps().V1().Deployments().Informer().AddEventHandler(handler)
informerFactory.Core().V1().Services().Informer().AddEventHandler(handler)
}

func (c *Controller) subscribeToAppClusterEvents(informerFactory kubeinformers.SharedInformerFactory) {
informerFactory.Apps().V1().Deployments().Informer()
informerFactory.Core().V1().Services().Informer()
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
Expand Down Expand Up @@ -213,14 +231,50 @@ func (c *Controller) enqueueInstallationTarget(obj interface{}) {
c.workqueue.Add(key)
}

func (c *Controller) enqueueInstallationTargetFromObject(obj interface{}) {
kubeobj, ok := obj.(metav1.Object)
if !ok {
runtime.HandleError(fmt.Errorf("not a metav1.Object: %#v", obj))
return
}

// Using ReleaseLabel here instead of the full set of labels because we
// can't guarantee that there isn't extra stuff there that was put
// directly in the chart.
// Also not using ObjectReference here because it would go over cluster
// boundaries. While technically it's probably ok, I feel like it'd be
// abusing the feature.
rel := kubeobj.GetLabels()[shipper.ReleaseLabel]
tt, err := c.getInstallationTargetForReleaseAndNamespace(rel, kubeobj.GetNamespace())
if err != nil {
runtime.HandleError(fmt.Errorf("cannot get installation target for release '%s/%s': %#v", rel, kubeobj.GetNamespace(), err))
return
}

c.enqueueInstallationTarget(tt)
}

func (c *Controller) getInstallationTargetForReleaseAndNamespace(release, namespace string) (*shipper.InstallationTarget, error) {
selector := labels.Set{shipper.ReleaseLabel: release}.AsSelector()
gvk := shipper.SchemeGroupVersion.WithKind("InstallationTarget")

installationTargets, err := c.installationTargetsLister.InstallationTargets(namespace).List(selector)
if err != nil {
return nil, shippererrors.NewKubeclientListError(gvk, namespace, selector, err)
}

expected := 1
if got := len(installationTargets); got != 1 {
return nil, shippererrors.NewUnexpectedObjectCountFromSelectorError(
selector, gvk, expected, got)
}

return installationTargets[0], nil
}

// processInstallation attempts to install the related InstallationTarget on
// all target clusters.
func (c *Controller) processInstallation(it *shipper.InstallationTarget) error {
if !it.Spec.CanOverride {
klog.V(3).Infof("InstallationTarget %q is not allowed to override, skipping", shippercontroller.MetaKey(it))
return nil
}

// Build .status over based on the current .spec.clusters.
newClusterStatuses := make([]*shipper.ClusterInstallationStatus, 0, len(it.Spec.Clusters))

Expand Down
37 changes: 30 additions & 7 deletions pkg/controller/installation/installation_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package installation

import (
"fmt"
"testing"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -306,7 +308,7 @@ func TestTargetClusterMissesGVK(t *testing.T) {
installationTarget := buildInstallationTarget(testNs, appName, []string{cluster.Name}, &chart)

clientsPerCluster, shipperclientset, fakeDynamicClientBuilder, shipperInformerFactory :=
initializeClients([]*v1.APIResourceList{}, []runtime.Object{cluster, installationTarget}, objectsPerClusterMap{cluster.Name: nil})
initializeClients([]*metav1.APIResourceList{}, []runtime.Object{cluster, installationTarget}, objectsPerClusterMap{cluster.Name: nil})

fakeRecorder := record.NewFakeRecorder(42)
fakeClientProvider := shippertesting.NewFakeClusterClientStore(clientsPerCluster, nil)
Expand Down Expand Up @@ -446,7 +448,7 @@ func TestManagementServerMissesCluster(t *testing.T) {
}

// TestInstallNoOverride verifies that an InstallationTarget with disabled
// overrides does not get processed.
// overrides does not update any existing objects.
func TestInstallNoOverride(t *testing.T) {
cluster := buildCluster("minikube-a")
appName := "reviews-api"
Expand All @@ -456,9 +458,20 @@ func TestInstallNoOverride(t *testing.T) {
it := buildInstallationTarget(testNs, appName, []string{cluster.Name}, &chart)
it.Spec.CanOverride = false

labels := map[string]string{shipper.InstallationTargetOwnerLabel: appName}
meta := metav1.ObjectMeta{
Namespace: testNs,
Name: fmt.Sprintf("%s-%s", appName, appName),
Labels: labels,
}

existingObjs := objectsPerClusterMap{cluster.Name: []runtime.Object{
&appsv1.Deployment{ObjectMeta: meta},
&corev1.Service{ObjectMeta: meta},
}}

clientsPerCluster, shipperclientset, fakeDynamicClientBuilder, shipperInformerFactory :=
initializeClients(apiResourceList, []runtime.Object{cluster, it},
objectsPerClusterMap{cluster.Name: []runtime.Object{}})
initializeClients(apiResourceList, []runtime.Object{cluster, it}, existingObjs)

clusterPair := clientsPerCluster[cluster.Name]

Expand All @@ -472,7 +485,17 @@ func TestInstallNoOverride(t *testing.T) {
t.Fatal("Could not process work item")
}

expectedActions := []kubetesting.Action{}
shippertesting.ShallowCheckActions(expectedActions, clusterPair.DynamicClient.Actions(), t)
expectedDynamicActions := []kubetesting.Action{
kubetesting.NewGetAction(schema.GroupVersionResource{Resource: "services", Version: "v1"}, testNs, "0.0.1-reviews-api"),
kubetesting.NewGetAction(schema.GroupVersionResource{Resource: "deployments", Version: "v1", Group: "apps"}, testNs, "0.0.1-reviews-api"),
}
shippertesting.ShallowCheckActions(expectedDynamicActions, clusterPair.DynamicClient.Actions(), t)

expectedActions := []kubetesting.Action{
kubetesting.NewGetAction(schema.GroupVersionResource{Resource: "configmaps", Version: "v1"}, testNs, "0.0.1-anchor"),
kubetesting.NewCreateAction(schema.GroupVersionResource{Resource: "configmaps", Version: "v1"}, testNs, nil),
shippertesting.NewDiscoveryAction("services"),
shippertesting.NewDiscoveryAction("deployments"),
}
shippertesting.ShallowCheckActions(expectedActions, clusterPair.Client.Actions(), t)
}
24 changes: 0 additions & 24 deletions pkg/errors/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,6 @@ import (
"fmt"
)

type InvalidCapacityTargetError struct {
releaseName string
count int
}

func (e InvalidCapacityTargetError) Error() string {
if e.count < 1 {
return fmt.Sprintf("missing capacity target with release label %q", e.releaseName)
}

return fmt.Sprintf("expected one capacity target for release label %q, got %d instead", e.releaseName, e.count)
}

func (e InvalidCapacityTargetError) ShouldRetry() bool {
return true
}

func NewInvalidCapacityTargetError(releaseName string, count int) InvalidCapacityTargetError {
return InvalidCapacityTargetError{
releaseName: releaseName,
count: count,
}
}

type TargetDeploymentCountError struct {
cluster string
namespace string
Expand Down
32 changes: 32 additions & 0 deletions pkg/errors/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,41 @@ package errors

import (
"fmt"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)

type UnexpectedObjectCountFromSelectorError struct {
selector labels.Selector
gvk schema.GroupVersionKind
expected int
got int
}

func (e UnexpectedObjectCountFromSelectorError) Error() string {
return fmt.Sprintf("expected %d %s for selector %q, got %d instead",
e.expected, e.gvk.String(), e.selector.String(), e.got)
}

func NewUnexpectedObjectCountFromSelectorError(
selector labels.Selector,
gvk schema.GroupVersionKind,
expected, got int,
) UnexpectedObjectCountFromSelectorError {
return UnexpectedObjectCountFromSelectorError{
selector: selector,
gvk: gvk,
expected: expected,
got: got,
}
}

func (e UnexpectedObjectCountFromSelectorError) ShouldRetry() bool {
return false
}

type MultipleOwnerReferencesError string

func (e MultipleOwnerReferencesError) Error() string {
Expand Down
Loading

0 comments on commit 2076bd6

Please sign in to comment.