From 8bc9f5fef75f509014915ee07f03e3076290ac59 Mon Sep 17 00:00:00 2001 From: Piotr Szczesniak Date: Thu, 2 Apr 2015 17:13:13 +0200 Subject: [PATCH] Added rate limiting to pod deletion Fixes #6228 --- cmd/integration/integration.go | 3 +- .../app/controllermanager.go | 6 +- cmd/kubernetes/kubernetes.go | 5 +- .../controller/nodecontroller.go | 55 +++++++++++-------- .../controller/nodecontroller_test.go | 24 ++++---- pkg/util/throttle.go | 14 +++++ 6 files changed, 69 insertions(+), 38 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 6b0f877f4e07..5e0dcd2f10a1 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -223,7 +223,8 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), }} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute) + nodeController := nodeControllerPkg.NewNodeController( + nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter()) nodeController.Run(5*time.Second, true, false) cadvisorInterface := new(cadvisor.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 13a94d8c2fa1..90d0b2611d60 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -58,6 +58,8 @@ type CMServer struct { SyncNodeList bool SyncNodeStatus bool PodEvictionTimeout time.Duration + DeletingPodsQps float32 + DeletingPodsBurst int // TODO: Discover these by pinging the host machines, and rip out these params. NodeMilliCPU int64 @@ -104,6 +106,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates") fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.") + fs.Float32Var(&s.DeletingPodsQps, "deleting_pods_qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.") + fs.IntVar(&s.DeletingPodsBurst, "deleting_pods_burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.") fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+ "The number of retries for initial node registration. Retry interval equals node_sync_period.") fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.") @@ -177,7 +181,7 @@ func (s *CMServer) Run(_ []string) error { } nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, - kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout) + kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus) resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 4d0ef7804302..6c0b169aba7e 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -61,6 +61,8 @@ var ( nodeMemory = flag.Int64("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") masterServiceNamespace = flag.String("master_service_namespace", api.NamespaceDefault, "The namespace from which the kubernetes master services should be injected into pods") enableProfiling = flag.Bool("profiling", false, "Enable profiling via web interface host:port/debug/pprof/") + deletingPodsQps = flag.Float32("deleting_pods_qps", 0.1, "") + deletingPodsBurst = flag.Int("deleting_pods_burst", 10, "") ) type delegateHandler struct { @@ -128,7 +130,8 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, } kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute) + nodeController := nodeControllerPkg.NewNodeController( + nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)) nodeController.Run(10*time.Second, true, true) endpoints := service.NewEndpointController(cl) diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index ba3e087e62ac..b02ba1d5e7df 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -71,15 +71,17 @@ var ( ) type NodeController struct { - cloud cloudprovider.Interface - matchRE string - staticResources *api.NodeResources - nodes []string - kubeClient client.Interface - kubeletClient client.KubeletClient - recorder record.EventRecorder - registerRetryCount int - podEvictionTimeout time.Duration + cloud cloudprovider.Interface + matchRE string + staticResources *api.NodeResources + nodes []string + kubeClient client.Interface + kubeletClient client.KubeletClient + recorder record.EventRecorder + registerRetryCount int + podEvictionTimeout time.Duration + deletingPodsRateLimiter util.RateLimiter + // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) now func() util.Time @@ -94,7 +96,8 @@ func NewNodeController( kubeClient client.Interface, kubeletClient client.KubeletClient, registerRetryCount int, - podEvictionTimeout time.Duration) *NodeController { + podEvictionTimeout time.Duration, + deletingPodsRateLimiter util.RateLimiter) *NodeController { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) if kubeClient != nil { @@ -104,17 +107,18 @@ func NewNodeController( glog.Infof("No api server defined - no events will be sent to API server.") } return &NodeController{ - cloud: cloud, - matchRE: matchRE, - nodes: nodes, - staticResources: staticResources, - kubeClient: kubeClient, - kubeletClient: kubeletClient, - recorder: recorder, - registerRetryCount: registerRetryCount, - podEvictionTimeout: podEvictionTimeout, - lookupIP: net.LookupIP, - now: util.Now, + cloud: cloud, + matchRE: matchRE, + nodes: nodes, + staticResources: staticResources, + kubeClient: kubeClient, + kubeletClient: kubeletClient, + recorder: recorder, + registerRetryCount: registerRetryCount, + podEvictionTimeout: podEvictionTimeout, + deletingPodsRateLimiter: deletingPodsRateLimiter, + lookupIP: net.LookupIP, + now: util.Now, } } @@ -558,13 +562,18 @@ func (nc *NodeController) MonitorNodeStatus() error { if lastReadyCondition.Status == api.ConditionFalse && nc.now().After(lastReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) { // Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node. - nc.deletePods(node.Name) + // Makes sure we are not removing pods from to many nodes in the same time. + if nc.deletingPodsRateLimiter.CanAccept() { + nc.deletePods(node.Name) + } } if lastReadyCondition.Status == api.ConditionUnknown && nc.now().After(lastReadyCondition.LastProbeTime.Add(nc.podEvictionTimeout-gracePeriod)) { // Same as above. Note however, since condition unknown is posted by node controller, which means we // need to substract monitoring grace period in order to get the real 'podEvictionTimeout'. - nc.deletePods(node.Name) + if nc.deletingPodsRateLimiter.CanAccept() { + nc.deletePods(node.Name) + } } } } diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 6ff3c58dc9e7..d398916c185d 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -247,7 +247,7 @@ func TestRegisterNodes(t *testing.T) { for _, machine := range item.machines { nodes.Items = append(nodes.Items, *newNode(machine)) } - nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) + nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter()) err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond) if !item.expectedFail && err != nil { t.Errorf("unexpected error: %v", err) @@ -332,7 +332,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) { }, } for _, item := range table { - nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute) + nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute, util.NewFakeRateLimiter()) nodes, err := nodeController.GetStaticNodesWithSpec() if err != nil { t.Errorf("unexpected error: %v", err) @@ -393,7 +393,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute, util.NewFakeRateLimiter()) nodes, err := nodeController.GetCloudNodesWithSpec() if err != nil { t.Errorf("unexpected error: %v", err) @@ -499,7 +499,7 @@ func TestSyncCloudNodes(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter()) if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -585,7 +585,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter()) if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -687,7 +687,7 @@ func TestNodeConditionsCheck(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute) + nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter()) nodeController.now = func() util.Time { return fakeNow } conditions := nodeController.DoCheck(item.node) if !reflect.DeepEqual(item.expectedConditions, conditions) { @@ -718,7 +718,7 @@ func TestPopulateNodeAddresses(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute, util.NewFakeRateLimiter()) result, err := nodeController.PopulateAddresses(item.nodes) // In case of IP querying error, we should continue. if err != nil { @@ -821,7 +821,7 @@ func TestSyncProbedNodeStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter()) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -924,7 +924,7 @@ func TestSyncProbedNodeStatusTransitionTime(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter()) nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } nodeController.now = func() util.Time { return fakeNow } if err := nodeController.SyncProbedNodeStatus(); err != nil { @@ -1077,7 +1077,7 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute, util.NewFakeRateLimiter()) nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -1235,7 +1235,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout, util.NewFakeRateLimiter()) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -1417,7 +1417,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute, util.NewFakeRateLimiter()) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) diff --git a/pkg/util/throttle.go b/pkg/util/throttle.go index c961811faa85..34cba6a47833 100644 --- a/pkg/util/throttle.go +++ b/pkg/util/throttle.go @@ -51,6 +51,12 @@ func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { return rate } +type fakeRateLimiter struct{} + +func NewFakeRateLimiter() RateLimiter { + return &fakeRateLimiter{} +} + func newTokenBucketRateLimiterFromTicker(ticker <-chan time.Time, burst int) *tickRateLimiter { if burst < 1 { panic("burst must be a positive integer") @@ -109,3 +115,11 @@ func (t *tickRateLimiter) increment() { default: } } + +func (t *fakeRateLimiter) CanAccept() bool { + return true +} + +func (t *fakeRateLimiter) Stop() {} + +func (t *fakeRateLimiter) Accept() {}