diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index c707884e133db..526dcbaac1472 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -412,8 +412,8 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta cclOfExempt[plName] = minCurrentCL remainingServerCL -= minCurrentCL } else { - // Lower bound on this priority level's adjusted concurreny limit is the lesser of: - // - its seat demamd high watermark over the last adjustment period, and + // Lower bound on this priority level's adjusted concurrency limit is the lesser of: + // - its seat demand high watermark over the last adjustment period, and // - its configured concurrency limit. // BUT: we do not want this to be lower than the lower bound from configuration. // See KEP-1040 for a more detailed explanation. @@ -552,7 +552,7 @@ func (cfgCtlr *configController) syncOne() (specificDelay time.Duration, err err // cope with the various dependencies between objects. The process of // digestion is done in four passes over config objects --- three // passes over PriorityLevelConfigurations and one pass over the -// FlowSchemas --- with the work dvided among the passes according to +// FlowSchemas --- with the work divided among the passes according to // those dependencies. type cfgMeal struct { cfgCtlr *configController @@ -563,6 +563,10 @@ type cfgMeal struct { // new configuration shareSum float64 + // The sum of the lendable concurrency shares of the priority + // levels in the new configuration + lendableShareSum float64 + // These keep track of which mandatory priority level config // objects have been digested haveExemptPL, haveCatchAllPL bool @@ -700,7 +704,7 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro // The new config has been constructed cfgCtlr.priorityLevelStates = meal.newPLStates - klog.V(5).InfoS("Switched to new API Priority and Fairness configuration", "maxWaitingRequests", meal.maxWaitingRequests, "maxExecutinRequests", meal.maxExecutingRequests) + klog.V(5).InfoS("Switched to new API Priority and Fairness configuration", "maxWaitingRequests", meal.maxWaitingRequests, "maxExecutingRequests", meal.maxExecutingRequests) metrics.GetWaitingReadonlyConcurrency().SetDenominator(float64(meal.maxWaitingRequests)) metrics.GetWaitingMutatingConcurrency().SetDenominator(float64(meal.maxWaitingRequests)) @@ -738,8 +742,11 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name) state.quiescing = false } - nominalConcurrencyShares, _, _ := plSpecCommons(state.pl) + nominalConcurrencyShares, lendablePercent, _ := plSpecCommons(state.pl) meal.shareSum += float64(*nominalConcurrencyShares) + if lendablePercent != nil { + meal.lendableShareSum += float64(*nominalConcurrencyShares) * float64(*lendablePercent) / 100 + } meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll } @@ -843,8 +850,11 @@ func (meal *cfgMeal) processOldPLsLocked() { // priority level continues to get a concurrency // allocation determined by all the share values in the // regular way. - nominalConcurrencyShares, _, _ := plSpecCommons(plState.pl) + nominalConcurrencyShares, lendablePercent, _ := plSpecCommons(plState.pl) meal.shareSum += float64(*nominalConcurrencyShares) + if lendablePercent != nil { + meal.lendableShareSum += float64(*nominalConcurrencyShares) * float64(*lendablePercent) / 100 + } meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll meal.newPLStates[plName] = plState @@ -865,10 +875,17 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() { if lendablePercent != nil { lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*lendablePercent) / 100)) } + // By default, the borrowing concurrency limit is set such + // that it does not exceed the total number of lendable seats + // and the max concurrency limit does not exceed the server + // concurrency limit. If borrowingLimitPercent is defined, + // the borrowing concurrency limit is set accordingly. + borrowingCL = meal.cfgCtlr.serverConcurrencyLimit - concurrencyLimit + if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited { + borrowingCL = min(borrowingCL, int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit)*meal.lendableShareSum/meal.shareSum))-lendableCL) + } if borrowingLimitPercent != nil { - borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*borrowingLimitPercent) / 100)) - } else { - borrowingCL = meal.cfgCtlr.serverConcurrencyLimit + borrowingCL = min(borrowingCL, int(math.Round(float64(concurrencyLimit)*float64(*borrowingLimitPercent)/100))) } metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit, concurrencyLimit-lendableCL, concurrencyLimit+borrowingCL) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/exempt_borrowing_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/exempt_borrowing_test.go index 308c972f7cbfa..4845a8c3c6c2f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/exempt_borrowing_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/exempt_borrowing_test.go @@ -162,3 +162,66 @@ func TestUpdateBorrowing(t *testing.T) { } } + +func TestMaxCL(t *testing.T) { + startTime := time.Now() + clk, _ := testeventclock.NewFake(startTime, 0, nil) + plcExempt := fcboot.MandatoryPriorityLevelConfigurationExempt + plcHigh := fcboot.SuggestedPriorityLevelConfigurationWorkloadHigh + plcMid := fcboot.SuggestedPriorityLevelConfigurationWorkloadLow + plcLow := fcboot.MandatoryPriorityLevelConfigurationCatchAll + plcs := []*flowcontrol.PriorityLevelConfiguration{plcHigh, plcExempt, plcMid, plcLow} + fses := []*flowcontrol.FlowSchema{} + k8sClient := clientsetfake.NewSimpleClientset(plcLow, plcExempt, plcHigh, plcMid) + informerFactory := informers.NewSharedInformerFactory(k8sClient, 0) + flowcontrolClient := k8sClient.FlowcontrolV1() + serverCL := int(*plcHigh.Spec.Limited.NominalConcurrencyShares+ + *plcMid.Spec.Limited.NominalConcurrencyShares+ + *plcLow.Spec.Limited.NominalConcurrencyShares) * 6 + config := TestableConfig{ + Name: "test", + Clock: clk, + AsFieldManager: "testfm", + FoundToDangling: func(found bool) bool { return !found }, + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + ServerConcurrencyLimit: serverCL, + ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec, + ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec, + QueueSetFactory: fqs.NewQueueSetFactory(clk), + } + ctlr := newTestableController(config) + _ = ctlr.lockAndDigestConfigObjects(plcs, fses) + if ctlr.nominalCLSum != serverCL { + t.Fatalf("Unexpected rounding: nominalCLSum=%d", ctlr.nominalCLSum) + } + stateExempt := ctlr.priorityLevelStates[plcExempt.Name] + stateHigh := ctlr.priorityLevelStates[plcHigh.Name] + stateMid := ctlr.priorityLevelStates[plcMid.Name] + stateLow := ctlr.priorityLevelStates[plcLow.Name] + + expectedExempt := serverCL + expectedHigh := stateHigh.nominalCL + int(float64(stateMid.nominalCL)*float64(*stateMid.pl.Spec.Limited.LendablePercent)/100) + int(float64(stateLow.nominalCL)*float64(*stateLow.pl.Spec.Limited.LendablePercent)/100) + expectedMid := int(float64(stateHigh.nominalCL)*float64(*stateHigh.pl.Spec.Limited.LendablePercent)/100) + stateMid.nominalCL + int(float64(stateLow.nominalCL)*float64(*stateLow.pl.Spec.Limited.LendablePercent)/100) + expectedLow := int(float64(stateHigh.nominalCL)*float64(*stateHigh.pl.Spec.Limited.LendablePercent)/100) + int(float64(stateMid.nominalCL)*float64(*stateMid.pl.Spec.Limited.LendablePercent)/100) + stateLow.nominalCL + if expected, actual := expectedExempt, stateExempt.maxCL; expected != actual { + t.Errorf("MaxCL: expected %d, got %d for exempt", expected, actual) + } else { + t.Logf("MaxCL: expected and got %d for exempt", expected) + } + if expected, actual := expectedHigh, stateHigh.maxCL; expected != actual { + t.Errorf("MaxCL: expected %d, got %d for hi", expected, actual) + } else { + t.Logf("MaxCL: expected and got %d for hi", expected) + } + if expected, actual := expectedMid, stateMid.maxCL; expected != actual { + t.Errorf("MaxCL: expected %d, got %d for mid", expected, actual) + } else { + t.Logf("MaxCL: expected and got %d for mid", expected) + } + if expected, actual := expectedLow, stateLow.maxCL; expected != actual { + t.Errorf("MaxCL: expected %d, got %d for lo", expected, actual) + } else { + t.Logf("MaxCL: expected and got %d for lo", expected) + } +}