From 10df6d459b5bca7b42471f9409182417fbc3f253 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Wed, 16 Dec 2020 18:58:45 -0500 Subject: [PATCH] Define TestableConfig in k/apiserver/pkg/util/flowcontrol Collect the parameters of newTestableController into a named type. Also tolerate the surprising situation in which a request's user groups include neither `system:authenticated` nor `system:unauthenticated` --- because this is observed to happen in some tests. Also a few other minor fixups. --- .../pkg/util/flowcontrol/apf_controller.go | 56 +++++++------------ .../pkg/util/flowcontrol/apf_filter.go | 52 +++++++++++------ .../pkg/util/flowcontrol/controller_test.go | 16 +++--- 3 files changed, 64 insertions(+), 60 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 8075970127aff..713fb5f7fd9d8 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -42,7 +42,6 @@ import ( fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" "k8s.io/apiserver/pkg/util/flowcontrol/metrics" - kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -61,7 +60,7 @@ import ( // undesired becomes completely unused, all the config objects are // read and processed as a whole. -// StartFunction begins the process of handlig a request. If the +// StartFunction begins the process of handling a request. If the // request gets queued then this function uses the given hashValue as // the source of entropy as it shuffle-shards the request into a // queue. The descr1 and descr2 values play no role in the logic but @@ -151,34 +150,22 @@ type priorityLevelState struct { } // NewTestableController is extra flexible to facilitate testing -func newTestableController( - informerFactory kubeinformers.SharedInformerFactory, - flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface, - serverConcurrencyLimit int, - requestWaitLimit time.Duration, - obsPairGenerator metrics.TimedObserverPairGenerator, - queueSetFactory fq.QueueSetFactory, -) *configController { +func newTestableController(config TestableConfig) *configController { cfgCtlr := &configController{ - queueSetFactory: queueSetFactory, - obsPairGenerator: obsPairGenerator, - serverConcurrencyLimit: serverConcurrencyLimit, - requestWaitLimit: requestWaitLimit, - flowcontrolClient: flowcontrolClient, + queueSetFactory: config.QueueSetFactory, + obsPairGenerator: config.ObsPairGenerator, + serverConcurrencyLimit: config.ServerConcurrencyLimit, + requestWaitLimit: config.RequestWaitLimit, + flowcontrolClient: config.FlowcontrolClient, priorityLevelStates: make(map[string]*priorityLevelState), } - klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", serverConcurrencyLimit, requestWaitLimit) - cfgCtlr.initializeConfigController(informerFactory) + klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit) + // Start with longish delay because conflicts will be between + // different processes, so take some time to go away. + cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") // ensure the data structure reflects the mandatory config cfgCtlr.lockAndDigestConfigObjects(nil, nil) - return cfgCtlr -} - -// initializeConfigController sets up the controller that processes -// config API objects. -func (cfgCtlr *configController) initializeConfigController(informerFactory kubeinformers.SharedInformerFactory) { - cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") - fci := informerFactory.Flowcontrol().V1beta1() + fci := config.InformerFactory.Flowcontrol().V1beta1() pli := fci.PriorityLevelConfigurations() fsi := fci.FlowSchemas() cfgCtlr.plLister = pli.Lister() @@ -225,6 +212,7 @@ func (cfgCtlr *configController) initializeConfigController(informerFactory kube cfgCtlr.configQueue.Add(0) }}) + return cfgCtlr } // MaintainObservations keeps the observers from @@ -330,7 +318,7 @@ type cfgMeal struct { fsStatusUpdates []fsStatusUpdate } -// A buffered set of status updates for a FlowSchema +// A buffered set of status updates for FlowSchemas type fsStatusUpdate struct { flowSchema *flowcontrol.FlowSchema condition flowcontrol.FlowSchemaCondition @@ -349,7 +337,7 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error())) } klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue)) - _, err = cfgCtlr.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: "api-priority-and-fairness-config-consumer-v1"}, "status") + _, err = cfgCtlr.flowcontrolClient.FlowSchemas().Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))), metav1.PatchOptions{FieldManager: ConfigConsumerAsFieldManager}, "status") if err != nil { errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name))) } @@ -654,30 +642,28 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig klog.V(7).Infof("startRequest(%#+v)", rd) cfgCtlr.lock.Lock() defer cfgCtlr.lock.Unlock() - var selectedFlowSchema *flowcontrol.FlowSchema + var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema for _, fs := range cfgCtlr.flowSchemas { if matchesFlowSchema(rd, fs) { selectedFlowSchema = fs break } + if fs.Name == flowcontrol.FlowSchemaNameCatchAll { + catchAllFlowSchema = fs + } } if selectedFlowSchema == nil { // This should never happen. If the requestDigest's User is a part of // system:authenticated or system:unauthenticated, the catch-all flow // schema should match it. However, if that invariant somehow fails, // fallback to the catch-all flow schema anyway. - for _, fs := range cfgCtlr.flowSchemas { - if fs.Name == flowcontrol.FlowSchemaNameCatchAll { - selectedFlowSchema = fs - break - } - } - if selectedFlowSchema == nil { + if catchAllFlowSchema == nil { // This should absolutely never, ever happen! APF guarantees two // undeletable flow schemas at all times: an exempt flow schema and a // catch-all flow schema. panic(fmt.Sprintf("no fallback catch-all flow schema found for request %#+v and user %#+v", rd.RequestInfo, rd.User)) } + selectedFlowSchema = catchAllFlowSchema klog.Warningf("no match found for request %#+v and user %#+v; selecting catchAll=%s as fallback flow schema", rd.RequestInfo, rd.User, fcfmt.Fmt(selectedFlowSchema)) } plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index 327d50c1482ba..b95c790d07374 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -34,6 +34,10 @@ import ( flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1" ) +// ConfigConsumerAsFieldManager is how the config consuminng +// controller appears in an ObjectMeta ManagedFieldsEntry.Manager +const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer-v1" + // Interface defines how the API Priority and Fairness filter interacts with the underlying system. type Interface interface { // Handle takes care of queuing and dispatching a request @@ -74,26 +78,40 @@ func New( requestWaitLimit time.Duration, ) Interface { grc := counter.NoOp{} - return NewTestable( - informerFactory, - flowcontrolClient, - serverConcurrencyLimit, - requestWaitLimit, - metrics.PriorityLevelConcurrencyObserverPairGenerator, - fqs.NewQueueSetFactory(&clock.RealClock{}, grc), - ) + return NewTestable(TestableConfig{ + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + ServerConcurrencyLimit: serverConcurrencyLimit, + RequestWaitLimit: requestWaitLimit, + ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + QueueSetFactory: fqs.NewQueueSetFactory(&clock.RealClock{}, grc), + }) +} + +// TestableConfig carries the parameters to an implementation that is testable +type TestableConfig struct { + // InformerFactory to use in building the controller + InformerFactory kubeinformers.SharedInformerFactory + + // FlowcontrolClient to use for manipulating config objects + FlowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface + + // ServerConcurrencyLimit for the controller to enforce + ServerConcurrencyLimit int + + // RequestWaitLimit configured on the server + RequestWaitLimit time.Duration + + // ObsPairGenerator for metrics + ObsPairGenerator metrics.TimedObserverPairGenerator + + // QueueSetFactory for the queuing implementation + QueueSetFactory fq.QueueSetFactory } // NewTestable is extra flexible to facilitate testing -func NewTestable( - informerFactory kubeinformers.SharedInformerFactory, - flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface, - serverConcurrencyLimit int, - requestWaitLimit time.Duration, - obsPairGenerator metrics.TimedObserverPairGenerator, - queueSetFactory fq.QueueSetFactory, -) Interface { - return newTestableController(informerFactory, flowcontrolClient, serverConcurrencyLimit, requestWaitLimit, obsPairGenerator, queueSetFactory) +func NewTestable(config TestableConfig) Interface { + return newTestableController(config) } func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest, diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go index 701bfd92e9c99..a2eacf7d8dab2 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/controller_test.go @@ -230,14 +230,14 @@ func TestConfigConsumer(t *testing.T) { heldRequestsMap: map[string][]heldRequest{}, queues: map[string]*ctlrTestQueueSet{}, } - ctlr := newTestableController( - informerFactory, - flowcontrolClient, - 100, // server concurrency limit - time.Minute, // request wait limit - metrics.PriorityLevelConcurrencyObserverPairGenerator, - cts, - ) + ctlr := newTestableController(TestableConfig{ + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + ServerConcurrencyLimit: 100, + RequestWaitLimit: time.Minute, + ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + QueueSetFactory: cts, + }) cts.cfgCtlr = ctlr persistingPLNames := sets.NewString() trialStep := fmt.Sprintf("trial%d-0", i)