Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
cclOfExempt[plName] = minCurrentCL
remainingServerCL -= minCurrentCL
} else {
// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
// - its seat demamd high watermark over the last adjustment period, and
// Lower bound on this priority level's adjusted concurrency limit is the lesser of:
// - its seat demand high watermark over the last adjustment period, and
// - its configured concurrency limit.
// BUT: we do not want this to be lower than the lower bound from configuration.
// See KEP-1040 for a more detailed explanation.
Expand Down Expand Up @@ -552,7 +552,7 @@ func (cfgCtlr *configController) syncOne() (specificDelay time.Duration, err err
// cope with the various dependencies between objects. The process of
// digestion is done in four passes over config objects --- three
// passes over PriorityLevelConfigurations and one pass over the
// FlowSchemas --- with the work dvided among the passes according to
// FlowSchemas --- with the work divided among the passes according to
// those dependencies.
type cfgMeal struct {
cfgCtlr *configController
Expand All @@ -563,6 +563,10 @@ type cfgMeal struct {
// new configuration
shareSum float64

// The sum of the lendable concurrency shares of the priority
// levels in the new configuration
lendableShareSum float64

// These keep track of which mandatory priority level config
// objects have been digested
haveExemptPL, haveCatchAllPL bool
Expand Down Expand Up @@ -700,7 +704,7 @@ func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontro

// The new config has been constructed
cfgCtlr.priorityLevelStates = meal.newPLStates
klog.V(5).InfoS("Switched to new API Priority and Fairness configuration", "maxWaitingRequests", meal.maxWaitingRequests, "maxExecutinRequests", meal.maxExecutingRequests)
klog.V(5).InfoS("Switched to new API Priority and Fairness configuration", "maxWaitingRequests", meal.maxWaitingRequests, "maxExecutingRequests", meal.maxExecutingRequests)

metrics.GetWaitingReadonlyConcurrency().SetDenominator(float64(meal.maxWaitingRequests))
metrics.GetWaitingMutatingConcurrency().SetDenominator(float64(meal.maxWaitingRequests))
Expand Down Expand Up @@ -738,8 +742,11 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name)
state.quiescing = false
}
nominalConcurrencyShares, _, _ := plSpecCommons(state.pl)
nominalConcurrencyShares, lendablePercent, _ := plSpecCommons(state.pl)
meal.shareSum += float64(*nominalConcurrencyShares)
if lendablePercent != nil {
meal.lendableShareSum += float64(*nominalConcurrencyShares) * float64(*lendablePercent) / 100
}
meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt
meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll
}
Expand Down Expand Up @@ -843,8 +850,11 @@ func (meal *cfgMeal) processOldPLsLocked() {
// priority level continues to get a concurrency
// allocation determined by all the share values in the
// regular way.
nominalConcurrencyShares, _, _ := plSpecCommons(plState.pl)
nominalConcurrencyShares, lendablePercent, _ := plSpecCommons(plState.pl)
meal.shareSum += float64(*nominalConcurrencyShares)
if lendablePercent != nil {
meal.lendableShareSum += float64(*nominalConcurrencyShares) * float64(*lendablePercent) / 100
}
meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt
meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll
meal.newPLStates[plName] = plState
Expand All @@ -865,10 +875,17 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
if lendablePercent != nil {
lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*lendablePercent) / 100))
}
// By default, the borrowing concurrency limit is set such
// that it does not exceed the total number of lendable seats
// and the max concurrency limit does not exceed the server
// concurrency limit. If borrowingLimitPercent is defined,
// the borrowing concurrency limit is set accordingly.
borrowingCL = meal.cfgCtlr.serverConcurrencyLimit - concurrencyLimit
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited {
borrowingCL = min(borrowingCL, int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit)*meal.lendableShareSum/meal.shareSum))-lendableCL)
}
if borrowingLimitPercent != nil {
borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*borrowingLimitPercent) / 100))
} else {
borrowingCL = meal.cfgCtlr.serverConcurrencyLimit
borrowingCL = min(borrowingCL, int(math.Round(float64(concurrencyLimit)*float64(*borrowingLimitPercent)/100)))
}

metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit, concurrencyLimit-lendableCL, concurrencyLimit+borrowingCL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,66 @@ func TestUpdateBorrowing(t *testing.T) {
}

}

func TestMaxCL(t *testing.T) {
startTime := time.Now()
clk, _ := testeventclock.NewFake(startTime, 0, nil)
plcExempt := fcboot.MandatoryPriorityLevelConfigurationExempt
plcHigh := fcboot.SuggestedPriorityLevelConfigurationWorkloadHigh
plcMid := fcboot.SuggestedPriorityLevelConfigurationWorkloadLow
plcLow := fcboot.MandatoryPriorityLevelConfigurationCatchAll
plcs := []*flowcontrol.PriorityLevelConfiguration{plcHigh, plcExempt, plcMid, plcLow}
fses := []*flowcontrol.FlowSchema{}
k8sClient := clientsetfake.NewSimpleClientset(plcLow, plcExempt, plcHigh, plcMid)
informerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
flowcontrolClient := k8sClient.FlowcontrolV1()
serverCL := int(*plcHigh.Spec.Limited.NominalConcurrencyShares+
*plcMid.Spec.Limited.NominalConcurrencyShares+
*plcLow.Spec.Limited.NominalConcurrencyShares) * 6
config := TestableConfig{
Name: "test",
Clock: clk,
AsFieldManager: "testfm",
FoundToDangling: func(found bool) bool { return !found },
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: serverCL,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk),
}
ctlr := newTestableController(config)
_ = ctlr.lockAndDigestConfigObjects(plcs, fses)
if ctlr.nominalCLSum != serverCL {
t.Fatalf("Unexpected rounding: nominalCLSum=%d", ctlr.nominalCLSum)
}
stateExempt := ctlr.priorityLevelStates[plcExempt.Name]
stateHigh := ctlr.priorityLevelStates[plcHigh.Name]
stateMid := ctlr.priorityLevelStates[plcMid.Name]
stateLow := ctlr.priorityLevelStates[plcLow.Name]

expectedExempt := serverCL
expectedHigh := stateHigh.nominalCL + int(float64(stateMid.nominalCL)*float64(*stateMid.pl.Spec.Limited.LendablePercent)/100) + int(float64(stateLow.nominalCL)*float64(*stateLow.pl.Spec.Limited.LendablePercent)/100)
expectedMid := int(float64(stateHigh.nominalCL)*float64(*stateHigh.pl.Spec.Limited.LendablePercent)/100) + stateMid.nominalCL + int(float64(stateLow.nominalCL)*float64(*stateLow.pl.Spec.Limited.LendablePercent)/100)
expectedLow := int(float64(stateHigh.nominalCL)*float64(*stateHigh.pl.Spec.Limited.LendablePercent)/100) + int(float64(stateMid.nominalCL)*float64(*stateMid.pl.Spec.Limited.LendablePercent)/100) + stateLow.nominalCL
if expected, actual := expectedExempt, stateExempt.maxCL; expected != actual {
t.Errorf("MaxCL: expected %d, got %d for exempt", expected, actual)
} else {
t.Logf("MaxCL: expected and got %d for exempt", expected)
}
if expected, actual := expectedHigh, stateHigh.maxCL; expected != actual {
t.Errorf("MaxCL: expected %d, got %d for hi", expected, actual)
} else {
t.Logf("MaxCL: expected and got %d for hi", expected)
}
if expected, actual := expectedMid, stateMid.maxCL; expected != actual {
t.Errorf("MaxCL: expected %d, got %d for mid", expected, actual)
} else {
t.Logf("MaxCL: expected and got %d for mid", expected)
}
if expected, actual := expectedLow, stateLow.maxCL; expected != actual {
t.Errorf("MaxCL: expected %d, got %d for lo", expected, actual)
} else {
t.Logf("MaxCL: expected and got %d for lo", expected)
}
}
Loading