Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

installation controller: reinstall deleted objects #205

Merged
merged 3 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions pkg/clusterclientstore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"k8s.io/client-go/rest"
)

type ClientProvider interface {
GetClient(clusterName string, ua string) (kubernetes.Interface, error)
GetConfig(clusterName string) (*rest.Config, error)
}

type Interface interface {
AddSubscriptionCallback(SubscriptionRegisterFunc)
AddEventHandlerCallback(EventHandlerRegisterFunc)
AddSubscriptionCallback(SubscriptionRegisterFunc)
GetClient(clusterName string, ua string) (kubernetes.Interface, error)
GetConfig(clusterName string) (*rest.Config, error)
GetInformerFactory(string) (kubeinformers.SharedInformerFactory, error)
}
17 changes: 6 additions & 11 deletions pkg/controller/capacity/capacity_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand All @@ -25,6 +24,7 @@ import (
listers "github.com/bookingcom/shipper/pkg/client/listers/shipper/v1alpha1"
"github.com/bookingcom/shipper/pkg/clusterclientstore"
"github.com/bookingcom/shipper/pkg/conditions"
shippercontroller "github.com/bookingcom/shipper/pkg/controller"
shippererrors "github.com/bookingcom/shipper/pkg/errors"
"github.com/bookingcom/shipper/pkg/util/replicas"
)
Expand All @@ -43,7 +43,7 @@ const (
// Controller is the controller implementation for CapacityTarget resources
type Controller struct {
shipperclientset clientset.Interface
clusterClientStore clusterClientStoreInterface
clusterClientStore clusterclientstore.Interface
capacityTargetsLister listers.CapacityTargetLister
capacityTargetsSynced cache.InformerSynced
releasesLister listers.ReleaseLister
Expand All @@ -56,7 +56,7 @@ type Controller struct {
func NewController(
shipperclientset clientset.Interface,
shipperInformerFactory informers.SharedInformerFactory,
store clusterClientStoreInterface,
store clusterclientstore.Interface,
recorder record.EventRecorder,
) *Controller {

Expand Down Expand Up @@ -305,21 +305,16 @@ func (c *Controller) enqueueCapacityTarget(obj interface{}) {
}

func (c *Controller) registerDeploymentEventHandlers(informerFactory kubeinformers.SharedInformerFactory, clusterName string) {
informerFactory.Apps().V1().Deployments().Informer().AddEventHandler(c.NewDeploymentResourceEventHandler(clusterName))
informerFactory.Apps().V1().Deployments().Informer().AddEventHandler(
shippercontroller.NewAppClusterEventHandler(c.enqueueCapacityTargetFromDeployment),
)
}

func (c *Controller) subscribeToDeployments(informerFactory kubeinformers.SharedInformerFactory) {
informerFactory.Apps().V1().Deployments().Informer()
informerFactory.Core().V1().Pods().Informer()
}

type clusterClientStoreInterface interface {
AddSubscriptionCallback(clusterclientstore.SubscriptionRegisterFunc)
AddEventHandlerCallback(clusterclientstore.EventHandlerRegisterFunc)
GetClient(string, string) (kubernetes.Interface, error)
GetInformerFactory(string) (kubeinformers.SharedInformerFactory, error)
}

func (c *Controller) getSadPods(targetDeployment *appsv1.Deployment, clusterStatus *shipper.ClusterCapacityStatus) ([]shipper.PodStatus, bool, error) {
podCount, sadPodsCount, sadPods, err := c.getSadPodsForDeploymentOnCluster(targetDeployment, clusterStatus.Name)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/capacity/capacity_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (f *fixture) initializeFixture() {
for i := int32(0); i < numClus; i++ {
clusterNames = append(clusterNames, fmt.Sprintf("cluster_%d", i))
}
f.store = shippertesting.NewFakeClusterClientStore(f.targetClusterClientset, f.targetClusterInformerFactory, clusterNames)
f.store = shippertesting.NewSimpleFakeClusterClientStore(f.targetClusterClientset, f.targetClusterInformerFactory, clusterNames)
}

func (f *fixture) newController() *Controller {
Expand Down
45 changes: 9 additions & 36 deletions pkg/controller/capacity/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

shipper "github.com/bookingcom/shipper/pkg/apis/shipper/v1alpha1"
shippererrors "github.com/bookingcom/shipper/pkg/errors"
)

func (c Controller) NewDeploymentResourceEventHandler(clusterName string) cache.ResourceEventHandler {
return cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
deploy, ok := obj.(*appsv1.Deployment)
if !ok {
klog.Warningf("Received something that's not a appsv1/Deployment: %v", obj)
return false
}

_, ok = deploy.GetLabels()[shipper.ReleaseLabel]

return ok
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueCapacityTargetFromDeployment(obj, clusterName)
},
UpdateFunc: func(old, new interface{}) {
c.enqueueCapacityTargetFromDeployment(new, clusterName)
},
DeleteFunc: func(obj interface{}) {
c.enqueueCapacityTargetFromDeployment(obj, clusterName)
},
},
}
}

func (c *Controller) enqueueCapacityTargetFromDeployment(obj interface{}, clusterName string) {
func (c *Controller) enqueueCapacityTargetFromDeployment(obj interface{}) {
deployment, ok := obj.(*appsv1.Deployment)
if !ok {
runtime.HandleError(fmt.Errorf("not a Deployment: %#v", obj))
Expand All @@ -60,7 +31,7 @@ func (c *Controller) enqueueCapacityTargetFromDeployment(obj interface{}, cluste
rel := deployment.GetLabels()[shipper.ReleaseLabel]
ct, err := c.getCapacityTargetForReleaseAndNamespace(rel, deployment.GetNamespace())
if err != nil {
runtime.HandleError(fmt.Errorf("cannot get capacity target for release '%s/%s': %#v", rel, deployment.GetNamespace(), obj))
runtime.HandleError(fmt.Errorf("cannot get capacity target for release '%s/%s': %#v", rel, deployment.GetNamespace(), err))
return
}

Expand All @@ -69,15 +40,17 @@ func (c *Controller) enqueueCapacityTargetFromDeployment(obj interface{}, cluste

func (c Controller) getCapacityTargetForReleaseAndNamespace(release, namespace string) (*shipper.CapacityTarget, error) {
selector := labels.Set{shipper.ReleaseLabel: release}.AsSelector()
gvk := shipper.SchemeGroupVersion.WithKind("CapacityTarget")

capacityTargets, err := c.capacityTargetsLister.CapacityTargets(namespace).List(selector)
if err != nil {
return nil, shippererrors.NewKubeclientListError(
shipper.SchemeGroupVersion.WithKind("CapacityTarget"),
namespace, selector, err)
return nil, shippererrors.NewKubeclientListError(gvk, namespace, selector, err)
}

if l := len(capacityTargets); l != 1 {
return nil, shippererrors.NewInvalidCapacityTargetError(release, l)
expected := 1
if got := len(capacityTargets); got != 1 {
return nil, shippererrors.NewUnexpectedObjectCountFromSelectorError(
selector, gvk, expected, got)
}

return capacityTargets[0], nil
Expand Down
35 changes: 35 additions & 0 deletions pkg/controller/eventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package controller

import (
shipper "github.com/bookingcom/shipper/pkg/apis/shipper/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

func NewAppClusterEventHandler(callback func(obj interface{})) cache.ResourceEventHandler {
return cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
kubeobj, ok := obj.(metav1.Object)
if !ok {
klog.Warningf("Received something that's not a metav1/Object: %v", obj)
return false
}

_, ok = kubeobj.GetLabels()[shipper.ReleaseLabel]

return ok
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
callback(obj)
},
UpdateFunc: func(old, new interface{}) {
callback(new)
},
DeleteFunc: func(obj interface{}) {
callback(obj)
},
},
}
}
68 changes: 61 additions & 7 deletions pkg/controller/installation/installation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -44,7 +47,7 @@ const (
// objects.
type Controller struct {
shipperclientset shipperclient.Interface
clusterClientStore clusterclientstore.ClientProvider
clusterClientStore clusterclientstore.Interface

workqueue workqueue.RateLimitingInterface

Expand All @@ -67,7 +70,7 @@ type Controller struct {
func NewController(
shipperclientset shipperclient.Interface,
shipperInformerFactory shipperinformers.SharedInformerFactory,
store clusterclientstore.ClientProvider,
store clusterclientstore.Interface,
dynamicClientBuilderFunc DynamicClientBuilderFunc,
chartFetcher shipperrepo.ChartFetcher,
recorder record.EventRecorder,
Expand Down Expand Up @@ -102,9 +105,24 @@ func NewController(
},
})

store.AddSubscriptionCallback(controller.subscribeToAppClusterEvents)
store.AddEventHandlerCallback(controller.registerAppClusterEventHandlers)

return controller
}

func (c *Controller) registerAppClusterEventHandlers(informerFactory kubeinformers.SharedInformerFactory, clusterName string) {
handler := shippercontroller.NewAppClusterEventHandler(c.enqueueInstallationTargetFromObject)

informerFactory.Apps().V1().Deployments().Informer().AddEventHandler(handler)
informerFactory.Core().V1().Services().Informer().AddEventHandler(handler)
}

func (c *Controller) subscribeToAppClusterEvents(informerFactory kubeinformers.SharedInformerFactory) {
informerFactory.Apps().V1().Deployments().Informer()
informerFactory.Core().V1().Services().Informer()
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
Expand Down Expand Up @@ -213,14 +231,50 @@ func (c *Controller) enqueueInstallationTarget(obj interface{}) {
c.workqueue.Add(key)
}

func (c *Controller) enqueueInstallationTargetFromObject(obj interface{}) {
kubeobj, ok := obj.(metav1.Object)
if !ok {
runtime.HandleError(fmt.Errorf("not a metav1.Object: %#v", obj))
return
}

// Using ReleaseLabel here instead of the full set of labels because we
// can't guarantee that there isn't extra stuff there that was put
// directly in the chart.
// Also not using ObjectReference here because it would go over cluster
// boundaries. While technically it's probably ok, I feel like it'd be
// abusing the feature.
rel := kubeobj.GetLabels()[shipper.ReleaseLabel]
tt, err := c.getInstallationTargetForReleaseAndNamespace(rel, kubeobj.GetNamespace())
if err != nil {
runtime.HandleError(fmt.Errorf("cannot get installation target for release '%s/%s': %#v", rel, kubeobj.GetNamespace(), err))
return
}

c.enqueueInstallationTarget(tt)
}

func (c *Controller) getInstallationTargetForReleaseAndNamespace(release, namespace string) (*shipper.InstallationTarget, error) {
selector := labels.Set{shipper.ReleaseLabel: release}.AsSelector()
gvk := shipper.SchemeGroupVersion.WithKind("InstallationTarget")

installationTargets, err := c.installationTargetsLister.InstallationTargets(namespace).List(selector)
if err != nil {
return nil, shippererrors.NewKubeclientListError(gvk, namespace, selector, err)
}

expected := 1
if got := len(installationTargets); got != 1 {
return nil, shippererrors.NewUnexpectedObjectCountFromSelectorError(
selector, gvk, expected, got)
}

return installationTargets[0], nil
}

// processInstallation attempts to install the related InstallationTarget on
// all target clusters.
func (c *Controller) processInstallation(it *shipper.InstallationTarget) error {
if !it.Spec.CanOverride {
klog.V(3).Infof("InstallationTarget %q is not allowed to override, skipping", shippercontroller.MetaKey(it))
return nil
}

// Build .status over based on the current .spec.clusters.
newClusterStatuses := make([]*shipper.ClusterInstallationStatus, 0, len(it.Spec.Clusters))

Expand Down
Loading