Skip to content

Commit

Permalink
Merge pull request kubernetes#97353 from MikeSpreitzer/testable-config
Browse files Browse the repository at this point in the history
Define TestableConfig in k/apiserver/pkg/util/flowcontrol
  • Loading branch information
k8s-ci-robot committed Dec 17, 2020
2 parents e11e9d4 + 10df6d4 commit 8b8de03
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 60 deletions.
56 changes: 21 additions & 35 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand All @@ -61,7 +60,7 @@ import (
// undesired becomes completely unused, all the config objects are
// read and processed as a whole.

// StartFunction begins the process of handlig a request. If the
// StartFunction begins the process of handling a request. If the
// request gets queued then this function uses the given hashValue as
// the source of entropy as it shuffle-shards the request into a
// queue. The descr1 and descr2 values play no role in the logic but
Expand Down Expand Up @@ -151,34 +150,22 @@ type priorityLevelState struct {
}

// NewTestableController is extra flexible to facilitate testing
func newTestableController(
informerFactory kubeinformers.SharedInformerFactory,
flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface,
serverConcurrencyLimit int,
requestWaitLimit time.Duration,
obsPairGenerator metrics.TimedObserverPairGenerator,
queueSetFactory fq.QueueSetFactory,
) *configController {
func newTestableController(config TestableConfig) *configController {
cfgCtlr := &configController{
queueSetFactory: queueSetFactory,
obsPairGenerator: obsPairGenerator,
serverConcurrencyLimit: serverConcurrencyLimit,
requestWaitLimit: requestWaitLimit,
flowcontrolClient: flowcontrolClient,
queueSetFactory: config.QueueSetFactory,
obsPairGenerator: config.ObsPairGenerator,
serverConcurrencyLimit: config.ServerConcurrencyLimit,
requestWaitLimit: config.RequestWaitLimit,
flowcontrolClient: config.FlowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState),
}
klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", serverConcurrencyLimit, requestWaitLimit)
cfgCtlr.initializeConfigController(informerFactory)
klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit)
// Start with longish delay because conflicts will be between
// different processes, so take some time to go away.
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
// ensure the data structure reflects the mandatory config
cfgCtlr.lockAndDigestConfigObjects(nil, nil)
return cfgCtlr
}

// initializeConfigController sets up the controller that processes
// config API objects.
func (cfgCtlr *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) {
cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
fci := informerFactory.Flowcontrol().V1beta1()
fci := config.InformerFactory.Flowcontrol().V1beta1()
pli := fci.PriorityLevelConfigurations()
fsi := fci.FlowSchemas()
cfgCtlr.plLister = pli.Lister()
Expand Down Expand Up @@ -225,6 +212,7 @@ func (cfgCtlr *configController) initializeConfigController(informerFactory kube
cfgCtlr.configQueue.Add(0)

}})
return cfgCtlr
}

// MaintainObservations keeps the observers from
Expand Down Expand Up @@ -330,7 +318,7 @@ type cfgMeal struct {
fsStatusUpdates []fsStatusUpdate
}

// A buffered set of status updates for a FlowSchema
// A buffered set of status updates for FlowSchemas
type fsStatusUpdate struct {
flowSchema *flowcontrol.FlowSchema
condition flowcontrol.FlowSchemaCondition
Expand All @@ -349,7 +337,7 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior
panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error()))
}
klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue))
_, err = cfgCtlr.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer-v1"}, "status")
_, err = cfgCtlr.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: ConfigConsumerAsFieldManager}, "status")
if err != nil {
errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name)))
}
Expand Down Expand Up @@ -654,30 +642,28 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
var selectedFlowSchema *flowcontrol.FlowSchema
var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema
for _, fs := range cfgCtlr.flowSchemas {
if matchesFlowSchema(rd, fs) {
selectedFlowSchema = fs
break
}
if fs.Name == flowcontrol.FlowSchemaNameCatchAll {
catchAllFlowSchema = fs
}
}
if selectedFlowSchema == nil {
// This should never happen. If the requestDigest's User is a part of
// system:authenticated or system:unauthenticated, the catch-all flow
// schema should match it. However, if that invariant somehow fails,
// fallback to the catch-all flow schema anyway.
for _, fs := range cfgCtlr.flowSchemas {
if fs.Name == flowcontrol.FlowSchemaNameCatchAll {
selectedFlowSchema = fs
break
}
}
if selectedFlowSchema == nil {
if catchAllFlowSchema == nil {
// This should absolutely never, ever happen! APF guarantees two
// undeletable flow schemas at all times: an exempt flow schema and a
// catch-all flow schema.
panic(fmt.Sprintf("no fallback catch-all flow schema found for request %#+v and user %#+v", rd.RequestInfo, rd.User))
}
selectedFlowSchema = catchAllFlowSchema
klog.Warningf("no match found for request %#+v and user %#+v; selecting catchAll=%s as fallback flow schema", rd.RequestInfo, rd.User, fcfmt.Fmt(selectedFlowSchema))
}
plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
Expand Down
52 changes: 35 additions & 17 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import (
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
)

// ConfigConsumerAsFieldManager is how the config consuminng
// controller appears in an ObjectMeta ManagedFieldsEntry.Manager
const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer-v1"

// Interface defines how the API Priority and Fairness filter interacts with the underlying system.
type Interface interface {
// Handle takes care of queuing and dispatching a request
Expand Down Expand Up @@ -74,26 +78,40 @@ func New(
requestWaitLimit time.Duration,
) Interface {
grc := counter.NoOp{}
return NewTestable(
informerFactory,
flowcontrolClient,
serverConcurrencyLimit,
requestWaitLimit,
metrics.PriorityLevelConcurrencyObserverPairGenerator,
fqs.NewQueueSetFactory(&clock.RealClock{}, grc),
)
return NewTestable(TestableConfig{
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: serverConcurrencyLimit,
RequestWaitLimit: requestWaitLimit,
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
QueueSetFactory: fqs.NewQueueSetFactory(&clock.RealClock{}, grc),
})
}

// TestableConfig carries the parameters to an implementation that is testable
type TestableConfig struct {
// InformerFactory to use in building the controller
InformerFactory kubeinformers.SharedInformerFactory

// FlowcontrolClient to use for manipulating config objects
FlowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface

// ServerConcurrencyLimit for the controller to enforce
ServerConcurrencyLimit int

// RequestWaitLimit configured on the server
RequestWaitLimit time.Duration

// ObsPairGenerator for metrics
ObsPairGenerator metrics.TimedObserverPairGenerator

// QueueSetFactory for the queuing implementation
QueueSetFactory fq.QueueSetFactory
}

// NewTestable is extra flexible to facilitate testing
func NewTestable(
informerFactory kubeinformers.SharedInformerFactory,
flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface,
serverConcurrencyLimit int,
requestWaitLimit time.Duration,
obsPairGenerator metrics.TimedObserverPairGenerator,
queueSetFactory fq.QueueSetFactory,
) Interface {
return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, obsPairGenerator, queueSetFactory)
func NewTestable(config TestableConfig) Interface {
return newTestableController(config)
}

func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,14 @@ func TestConfigConsumer(t *testing.T) {
heldRequestsMap: map[string][]heldRequest{},
queues: map[string]*ctlrTestQueueSet{},
}
ctlr := newTestableController(
informerFactory,
flowcontrolClient,
100, // server concurrency limit
time.Minute, // request wait limit
metrics.PriorityLevelConcurrencyObserverPairGenerator,
cts,
)
ctlr := newTestableController(TestableConfig{
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
ServerConcurrencyLimit: 100,
RequestWaitLimit: time.Minute,
ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator,
QueueSetFactory: cts,
})
cts.cfgCtlr = ctlr
persistingPLNames := sets.NewString()
trialStep := fmt.Sprintf("trial%d-0", i)
Expand Down

0 comments on commit 8b8de03

Please sign in to comment.