Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added rate limiting to pod deleting #6355

Merged
merged 1 commit into from Apr 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/integration/integration.go
Expand Up @@ -223,7 +223,8 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
}}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
nodeController := nodeControllerPkg.NewNodeController(
nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter())
nodeController.Run(5*time.Second, true, false)
cadvisorInterface := new(cadvisor.Fake)

Expand Down
6 changes: 5 additions & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Expand Up @@ -58,6 +58,8 @@ type CMServer struct {
SyncNodeList bool
SyncNodeStatus bool
PodEvictionTimeout time.Duration
DeletingPodsQps float32
DeletingPodsBurst int

// TODO: Discover these by pinging the host machines, and rip out these params.
NodeMilliCPU int64
Expand Down Expand Up @@ -104,6 +106,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
fs.Float32Var(&s.DeletingPodsQps, "deleting_pods_qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
fs.IntVar(&s.DeletingPodsBurst, "deleting_pods_burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
Expand Down Expand Up @@ -177,7 +181,7 @@ func (s *CMServer) Run(_ []string) error {
}

nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout)
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst))
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus)

resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
Expand Down
5 changes: 4 additions & 1 deletion cmd/kubernetes/kubernetes.go
Expand Up @@ -61,6 +61,8 @@ var (
nodeMemory = flag.Int64("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
masterServiceNamespace = flag.String("master_service_namespace", api.NamespaceDefault, "The namespace from which the kubernetes master services should be injected into pods")
enableProfiling = flag.Bool("profiling", false, "Enable profiling via web interface host:port/debug/pprof/")
deletingPodsQps = flag.Float32("deleting_pods_qps", 0.1, "")
deletingPodsBurst = flag.Int("deleting_pods_burst", 10, "")
)

type delegateHandler struct {
Expand Down Expand Up @@ -128,7 +130,8 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
}
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute)
nodeController := nodeControllerPkg.NewNodeController(
nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst))
nodeController.Run(10*time.Second, true, true)

endpoints := service.NewEndpointController(cl)
Expand Down
55 changes: 32 additions & 23 deletions pkg/cloudprovider/controller/nodecontroller.go
Expand Up @@ -71,15 +71,17 @@ var (
)

type NodeController struct {
cloud cloudprovider.Interface
matchRE string
staticResources *api.NodeResources
nodes []string
kubeClient client.Interface
kubeletClient client.KubeletClient
recorder record.EventRecorder
registerRetryCount int
podEvictionTimeout time.Duration
cloud cloudprovider.Interface
matchRE string
staticResources *api.NodeResources
nodes []string
kubeClient client.Interface
kubeletClient client.KubeletClient
recorder record.EventRecorder
registerRetryCount int
podEvictionTimeout time.Duration
deletingPodsRateLimiter util.RateLimiter

// Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
now func() util.Time
Expand All @@ -94,7 +96,8 @@ func NewNodeController(
kubeClient client.Interface,
kubeletClient client.KubeletClient,
registerRetryCount int,
podEvictionTimeout time.Duration) *NodeController {
podEvictionTimeout time.Duration,
deletingPodsRateLimiter util.RateLimiter) *NodeController {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
if kubeClient != nil {
Expand All @@ -104,17 +107,18 @@ func NewNodeController(
glog.Infof("No api server defined - no events will be sent to API server.")
}
return &NodeController{
cloud: cloud,
matchRE: matchRE,
nodes: nodes,
staticResources: staticResources,
kubeClient: kubeClient,
kubeletClient: kubeletClient,
recorder: recorder,
registerRetryCount: registerRetryCount,
podEvictionTimeout: podEvictionTimeout,
lookupIP: net.LookupIP,
now: util.Now,
cloud: cloud,
matchRE: matchRE,
nodes: nodes,
staticResources: staticResources,
kubeClient: kubeClient,
kubeletClient: kubeletClient,
recorder: recorder,
registerRetryCount: registerRetryCount,
podEvictionTimeout: podEvictionTimeout,
deletingPodsRateLimiter: deletingPodsRateLimiter,
lookupIP: net.LookupIP,
now: util.Now,
}
}

Expand Down Expand Up @@ -558,13 +562,18 @@ func (nc *NodeController) MonitorNodeStatus() error {
if lastReadyCondition.Status == api.ConditionFalse &&
nc.now().After(lastReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) {
// Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node.
nc.deletePods(node.Name)
// Makes sure we are not removing pods from to many nodes in the same time.
if nc.deletingPodsRateLimiter.CanAccept() {
nc.deletePods(node.Name)
}
}
if lastReadyCondition.Status == api.ConditionUnknown &&
nc.now().After(lastReadyCondition.LastProbeTime.Add(nc.podEvictionTimeout-gracePeriod)) {
// Same as above. Note however, since condition unknown is posted by node controller, which means we
// need to substract monitoring grace period in order to get the real 'podEvictionTimeout'.
nc.deletePods(node.Name)
if nc.deletingPodsRateLimiter.CanAccept() {
nc.deletePods(node.Name)
}
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/cloudprovider/controller/nodecontroller_test.go
Expand Up @@ -247,7 +247,7 @@ func TestRegisterNodes(t *testing.T) {
for _, machine := range item.machines {
nodes.Items = append(nodes.Items, *newNode(machine))
}
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter())
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
if !item.expectedFail && err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) {
},
}
for _, item := range table {
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute)
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute, util.NewFakeRateLimiter())
nodes, err := nodeController.GetStaticNodesWithSpec()
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute)
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute, util.NewFakeRateLimiter())
nodes, err := nodeController.GetCloudNodesWithSpec()
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestSyncCloudNodes(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter())
if err := nodeController.SyncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter())
if err := nodeController.SyncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -687,7 +687,7 @@ func TestNodeConditionsCheck(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute)
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter())
nodeController.now = func() util.Time { return fakeNow }
conditions := nodeController.DoCheck(item.node)
if !reflect.DeepEqual(item.expectedConditions, conditions) {
Expand Down Expand Up @@ -718,7 +718,7 @@ func TestPopulateNodeAddresses(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute)
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute, util.NewFakeRateLimiter())
result, err := nodeController.PopulateAddresses(item.nodes)
// In case of IP querying error, we should continue.
if err != nil {
Expand Down Expand Up @@ -821,7 +821,7 @@ func TestSyncProbedNodeStatus(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter())
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.SyncProbedNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -924,7 +924,7 @@ func TestSyncProbedNodeStatusTransitionTime(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter())
nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.SyncProbedNodeStatus(); err != nil {
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute)
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute, util.NewFakeRateLimiter())
nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
if err := nodeController.SyncProbedNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -1235,7 +1235,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout)
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout, util.NewFakeRateLimiter())
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.MonitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -1417,7 +1417,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute, util.NewFakeRateLimiter())
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.MonitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down
14 changes: 14 additions & 0 deletions pkg/util/throttle.go
Expand Up @@ -51,6 +51,12 @@ func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
return rate
}

type fakeRateLimiter struct{}

func NewFakeRateLimiter() RateLimiter {
return &fakeRateLimiter{}
}

func newTokenBucketRateLimiterFromTicker(ticker <-chan time.Time, burst int) *tickRateLimiter {
if burst < 1 {
panic("burst must be a positive integer")
Expand Down Expand Up @@ -109,3 +115,11 @@ func (t *tickRateLimiter) increment() {
default:
}
}

func (t *fakeRateLimiter) CanAccept() bool {
return true
}

func (t *fakeRateLimiter) Stop() {}

func (t *fakeRateLimiter) Accept() {}