Skip to content

Commit

Permalink
Merge pull request #41214 from ncdc/shared-informers-06-hpa
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 41248, 41214)

Switch hpa controller to shared informer

**What this PR does / why we need it**: switch the hpa controller to use a shared informer

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

**Special notes for your reviewer**: Only the last commit is relevant. The others are from #40759, #41114, #41148  

**Release note**:

```release-note
```

cc @smarterclayton @deads2k @sttts @liggitt @DirectXMan12 @timothysc @kubernetes/sig-scalability-pr-reviews @jszczepkowski @mwielgus @piosz
  • Loading branch information
Kubernetes Submit Queue committed Feb 10, 2017
2 parents fa93bab + d11aa98 commit 03bde62
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 242 deletions.
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/autoscaling.go
Expand Up @@ -44,6 +44,7 @@ func startHPAController(ctx ControllerContext) (bool, error) {
hpaClient.Extensions(),
hpaClient.Autoscaling(),
replicaCalc,
ctx.NewInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
ctx.Options.HorizontalPodAutoscalerSyncPeriod.Duration,
).Run(ctx.Stop)
return true, nil
Expand Down
7 changes: 6 additions & 1 deletion cmd/kube-controller-manager/app/policy.go
Expand Up @@ -30,7 +30,12 @@ func startDisruptionController(ctx ControllerContext) (bool, error) {
return false, nil
}
go disruption.NewDisruptionController(
ctx.InformerFactory.Pods().Informer(),
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
ctx.NewInformerFactory.Core().V1().ReplicationControllers(),
ctx.NewInformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.NewInformerFactory.Extensions().V1beta1().Deployments(),
ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.ClientBuilder.ClientOrDie("disruption-controller"),
).Run(ctx.Stop)
return true, nil
Expand Down
18 changes: 10 additions & 8 deletions pkg/controller/disruption/BUILD
Expand Up @@ -15,21 +15,25 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/policy/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/policy/v1beta1:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/informers/informers_generated/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/extensions/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/policy/v1beta1:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//pkg/client/listers/policy/v1beta1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache",
Expand All @@ -49,14 +53,12 @@ go_test(
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/policy/v1beta1:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/informers/informers_generated:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/client-go/util/workqueue",
],
)
Expand Down
181 changes: 66 additions & 115 deletions pkg/controller/disruption/disruption.go
Expand Up @@ -21,26 +21,30 @@ import (
"reflect"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
policyclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/legacylisters"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/apps/v1beta1"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/extensions/v1beta1"
policyinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/policy/v1beta1"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
policylisters "k8s.io/kubernetes/pkg/client/listers/policy/v1beta1"
"k8s.io/kubernetes/pkg/controller"

"github.com/golang/glog"
Expand All @@ -64,28 +68,23 @@ type updater func(*policy.PodDisruptionBudget) error
type DisruptionController struct {
kubeClient clientset.Interface

pdbStore cache.Store
pdbController cache.Controller
pdbLister listers.StoreToPodDisruptionBudgetLister
pdbLister policylisters.PodDisruptionBudgetLister
pdbListerSynced cache.InformerSynced

podController cache.Controller
podLister listers.StoreToPodLister
podLister corelisters.PodLister
podListerSynced cache.InformerSynced

rcIndexer cache.Indexer
rcController cache.Controller
rcLister listers.StoreToReplicationControllerLister
rcLister corelisters.ReplicationControllerLister
rcListerSynced cache.InformerSynced

rsStore cache.Store
rsController cache.Controller
rsLister listers.StoreToReplicaSetLister
rsLister extensionslisters.ReplicaSetLister
rsListerSynced cache.InformerSynced

dIndexer cache.Indexer
dController cache.Controller
dLister listers.StoreToDeploymentLister
dLister extensionslisters.DeploymentLister
dListerSynced cache.InformerSynced

ssStore cache.Store
ssController cache.Controller
ssLister listers.StoreToStatefulSetLister
ssLister appslisters.StatefulSetLister
ssListerSynced cache.InformerSynced

// PodDisruptionBudget keys that need to be synced.
queue workqueue.RateLimitingInterface
Expand All @@ -108,108 +107,55 @@ type controllerAndScale struct {
// controllers and their scale.
type podControllerFinder func(*v1.Pod) ([]controllerAndScale, error)

func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface) *DisruptionController {
func NewDisruptionController(
podInformer coreinformers.PodInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
rcInformer coreinformers.ReplicationControllerInformer,
rsInformer extensionsinformers.ReplicaSetInformer,
dInformer extensionsinformers.DeploymentInformer,
ssInformer appsinformers.StatefulSetInformer,
kubeClient clientset.Interface,
) *DisruptionController {
dc := &DisruptionController{
kubeClient: kubeClient,
podController: podInformer.GetController(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"),
broadcaster: record.NewBroadcaster(),
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"),
broadcaster: record.NewBroadcaster(),
}
dc.recorder = dc.broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "controllermanager"})

dc.getUpdater = func() updater { return dc.writePdbStatus }

dc.podLister.Indexer = podInformer.GetIndexer()

podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addPod,
UpdateFunc: dc.updatePod,
DeleteFunc: dc.deletePod,
})
dc.podLister = podInformer.Lister()
dc.podListerSynced = podInformer.Informer().HasSynced

dc.pdbStore, dc.pdbController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Policy().PodDisruptionBudgets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Policy().PodDisruptionBudgets(metav1.NamespaceAll).Watch(options)
},
},
&policy.PodDisruptionBudget{},
30*time.Second,
pdbInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDb,
UpdateFunc: dc.updateDb,
DeleteFunc: dc.removeDb,
},
)
dc.pdbLister.Store = dc.pdbStore

dc.rcIndexer, dc.rcController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).Watch(options)
},
},
&v1.ReplicationController{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.pdbLister = pdbInformer.Lister()
dc.pdbListerSynced = pdbInformer.Informer().HasSynced

dc.rcLister.Indexer = dc.rcIndexer
dc.rcLister = rcInformer.Lister()
dc.rcListerSynced = rcInformer.Informer().HasSynced

dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Extensions().ReplicaSets(metav1.NamespaceAll).Watch(options)
},
},
&extensions.ReplicaSet{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.rsStore = dc.rsLister.Indexer

dc.dIndexer, dc.dController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Extensions().Deployments(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Extensions().Deployments(metav1.NamespaceAll).Watch(options)
},
},
&extensions.Deployment{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.dLister.Indexer = dc.dIndexer

dc.ssStore, dc.ssController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).Watch(options)
},
},
&apps.StatefulSet{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
)
dc.ssLister.Store = dc.ssStore
dc.rsLister = rsInformer.Lister()
dc.rsListerSynced = rsInformer.Informer().HasSynced

dc.dLister = dInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced

dc.ssLister = ssInformer.Lister()
dc.ssListerSynced = ssInformer.Informer().HasSynced

return dc
}
Expand Down Expand Up @@ -317,19 +263,22 @@ func (dc *DisruptionController) getPodReplicationControllers(pod *v1.Pod) ([]con
}

func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()

glog.V(0).Infof("Starting disruption controller")

if !cache.WaitForCacheSync(stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}

if dc.kubeClient != nil {
glog.V(0).Infof("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(dc.kubeClient.Core().RESTClient()).Events("")})
} else {
glog.V(0).Infof("No api server defined - no events will be sent to API server.")
}
go dc.pdbController.Run(stopCh)
go dc.podController.Run(stopCh)
go dc.rcController.Run(stopCh)
go dc.rsController.Run(stopCh)
go dc.dController.Run(stopCh)
go dc.ssController.Run(stopCh)
go wait.Until(dc.worker, time.Second, stopCh)
go wait.Until(dc.recheckWorker, time.Second, stopCh)

Expand Down Expand Up @@ -442,7 +391,7 @@ func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionB
glog.Warning(msg)
dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
}
return &pdbs[0]
return pdbs[0]
}

func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) {
Expand Down Expand Up @@ -510,17 +459,19 @@ func (dc *DisruptionController) sync(key string) error {
glog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Now().Sub(startTime))
}()

obj, exists, err := dc.pdbLister.Store.GetByKey(key)
if !exists {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.V(4).Infof("PodDisruptionBudget %q has been deleted", key)
return nil
}
if err != nil {
return err
}

pdb := obj.(*policy.PodDisruptionBudget)

if err := dc.trySync(pdb); err != nil {
glog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
return dc.failSafe(pdb)
Expand Down

0 comments on commit 03bde62

Please sign in to comment.