Skip to content

Commit

Permalink
Ratelimit replica creation
Browse files Browse the repository at this point in the history
  • Loading branch information
bprashanth committed May 6, 2015
1 parent 9939f92 commit 64277e3
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/cloudprovider/nodecontroller/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
// NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown
// (regardless of its current value) in the master, without contacting kubelet.
if readyCondition == nil {
glog.V(2).Infof("node %v is never updated by kubelet")
glog.V(2).Infof("node %v is never updated by kubelet", node.Name)
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionUnknown,
Expand Down
10 changes: 6 additions & 4 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *RCExpectations) SatisfiedExpectations(rc *api.ReplicationController) bo
if podExp.Fulfilled() {
return true
} else {
glog.V(4).Infof("Controller %v still waiting on expectations %#v", podExp)
glog.V(4).Infof("Controller still waiting on expectations %#v", podExp)
return false
}
} else if err != nil {
Expand Down Expand Up @@ -124,9 +124,6 @@ func (r *RCExpectations) ExpectDeletions(rc *api.ReplicationController, dels int
// Decrements the expectation counts of the given rc.
func (r *RCExpectations) lowerExpectations(rc *api.ReplicationController, add, del int) {
if podExp, exists, err := r.GetExpectations(rc); err == nil && exists {
if podExp.add > 0 && podExp.del > 0 {
glog.V(2).Infof("Controller has both add and del expectations %+v", podExp)
}
podExp.Seen(int64(add), int64(del))
// The expectations might've been modified since the update on the previous line.
glog.V(4).Infof("Lowering expectations %+v", podExp)
Expand Down Expand Up @@ -167,6 +164,11 @@ func (e *PodExpectations) Fulfilled() bool {
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}

// getExpectations returns the add and del expectations of the pod.
func (e *PodExpectations) getExpectations() (int64, int64) {
return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
}

// NewRCExpectations returns a store for PodExpectations.
func NewRCExpectations() *RCExpectations {
return &RCExpectations{cache.NewTTLStore(expKeyFunc, ExpectationsTimeout)}
Expand Down
12 changes: 10 additions & 2 deletions pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ const (
// 3000 pods. Just creation is limited to 30qps, and watching happens with
// ~10-30s latency/pod at scale.
ExpectationsTimeout = 6 * time.Minute

// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
BurstReplicas = 500
)

// ReplicationManager is responsible for synchronizing ReplicationController objects stored
Expand Down Expand Up @@ -277,24 +281,27 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller
diff := len(filteredPods) - controller.Spec.Replicas
if diff < 0 {
diff *= -1
diff = util.Min(diff, BurstReplicas)
rm.expectations.ExpectCreations(controller, diff)
wait := sync.WaitGroup{}
wait.Add(diff)
glog.V(2).Infof("Too few %q replicas, creating %d", controller.Name, diff)
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", controller.Namespace, controller.Name, controller.Spec.Replicas, diff)
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
if err := rm.podControl.createReplica(controller.Namespace, controller); err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %v", controller.Name)
rm.expectations.CreationObserved(controller)
util.HandleError(err)
}
}()
}
wait.Wait()
} else if diff > 0 {
diff = util.Min(diff, BurstReplicas)
rm.expectations.ExpectDeletions(controller, diff)
glog.V(2).Infof("Too many %q replicas, deleting %d", controller.Name, diff)
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", controller.Namespace, controller.Name, controller.Spec.Replicas, diff)
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
Expand All @@ -307,6 +314,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller
defer wait.Done()
if err := rm.podControl.deletePod(controller.Namespace, filteredPods[ix].Name); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed deletion, decrementing expectations for controller %v", controller.Name)
rm.expectations.DeletionObserved(controller)
}
}(i)
Expand Down
62 changes: 60 additions & 2 deletions pkg/controller/replication_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ func TestRCExpectations(t *testing.T) {

// Expectations have been surpassed
if podExp, exists, err := e.GetExpectations(rc); err == nil && exists {
if podExp.add != -1 || podExp.del != -1 {
add, del := podExp.getExpectations()
if add != -1 || del != -1 {
t.Errorf("Unexpected pod expectations %#v", podExp)
}
} else {
Expand All @@ -500,7 +501,8 @@ func TestRCExpectations(t *testing.T) {
// Next round of rc sync, old expectations are cleared
e.setExpectations(rc, 1, 2)
if podExp, exists, err := e.GetExpectations(rc); err == nil && exists {
if podExp.add != 1 || podExp.del != 2 {
add, del := podExp.getExpectations()
if add != 1 || del != 2 {
t.Errorf("Unexpected pod expectations %#v", podExp)
}
} else {
Expand Down Expand Up @@ -844,3 +846,59 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates)
}
}

func TestControllerBurstReplicas(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client)
manager.podControl = &fakePodControl

scaleFactor := 6
numReplicas := scaleFactor*BurstReplicas - 600
controllerSpec := newReplicationController(numReplicas)
manager.controllerStore.Store.Add(controllerSpec)

// Since the manager is not watching rcs we need to kick off the first sync.
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, BurstReplicas, 0)
fakePodControl.clear()

// Make sure we automatically start another sync at the end of each BurstReplicas
expectedPods := BurstReplicas
pods := newPodList(nil, numReplicas, api.PodPending, controllerSpec)
for i := 0; i < scaleFactor-1; i++ {

// None of these should wake the controller because it has expectations==BurstReplicas.
for _, pod := range pods.Items[:expectedPods-1] {
manager.podStore.Store.Add(&pod)
manager.addPod(&pod)
}
if podExp, exists, err := manager.expectations.GetExpectations(controllerSpec); !exists || err != nil {
t.Fatalf("Did not find expectations for rc.")
} else if podExp.add != 1 {
t.Fatalf("Expectations are wrong %v", podExp)
}

// The last add pod will decrease the expectation of the rc to 0,
// which will cause it to create the remaining replicas upto BurstReplicas.
manager.podStore.Store.Add(&pods.Items[expectedPods-1])
manager.addPod(&pods.Items[expectedPods-1])

// The store accrues pods created in past iterations, so count them all as active.
// Note that the same store is checked by the sync to decide if replicas are needed.
activePods := len(manager.podStore.Store.List())
pods.Items = pods.Items[expectedPods:]

// The sync should create the remaining replicas
manager.syncReplicationController(getKey(controllerSpec, t))
expectedPods = util.Min(numReplicas-activePods, BurstReplicas)
validateSyncReplication(t, &fakePodControl, expectedPods, 0)

fakePodControl.clear()
}

activePods := len(manager.podStore.Store.List())
if activePods != numReplicas {
t.Fatalf("Unexpected number of active pods, expected %d, got %d", numReplicas, activePods)
}
}
8 changes: 8 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ func logPanic(r interface{}) {
glog.Errorf("Recovered from panic: %#v (%v)\n%v", r, r, callers)
}

// Min will return the smaller of the 2 given integers.
func Min(a, b int) int {
if a < b {
return a
}
return b
}

// ErrorHandlers is a list of functions which will be invoked when an unreturnable
// error occurs.
var ErrorHandlers = []func(error){logError}
Expand Down

0 comments on commit 64277e3

Please sign in to comment.