Skip to content

Commit

Permalink
feat(perf): Add a flag to configure the number of workers to run
Browse files Browse the repository at this point in the history
This enables users to control how many webhook requests get processed in
parallel.

Signed-off-by: Marwan Aljubeh <maljubeh@snapchat.com>
  • Loading branch information
maljub01 authored and grzesuav committed Dec 14, 2020
1 parent 64f081c commit 3f07022
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 16 deletions.
8 changes: 5 additions & 3 deletions controller/composite/controller.go
Expand Up @@ -64,10 +64,12 @@ type parentController struct {
updateStrategy updateStrategyMap
childInformers common.InformerMap

numWorkers int

finalizer *finalizer.Manager
}

func newParentController(resources *dynamicdiscovery.ResourceMap, dynClient *dynamicclientset.Clientset, dynInformers *dynamicinformer.SharedInformerFactory, mcClient mcclientset.Interface, revisionLister mclisters.ControllerRevisionLister, cc *v1alpha1.CompositeController) (pc *parentController, newErr error) {
func newParentController(resources *dynamicdiscovery.ResourceMap, dynClient *dynamicclientset.Clientset, dynInformers *dynamicinformer.SharedInformerFactory, mcClient mcclientset.Interface, revisionLister mclisters.ControllerRevisionLister, cc *v1alpha1.CompositeController, numWorkers int) (pc *parentController, newErr error) {
// Make a dynamic client for the parent resource.
parentClient, err := dynClient.Resource(cc.Spec.ParentResource.APIVersion, cc.Spec.ParentResource.Resource)
if err != nil {
Expand Down Expand Up @@ -118,6 +120,7 @@ func newParentController(resources *dynamicdiscovery.ResourceMap, dynClient *dyn
revisionLister: revisionLister,
updateStrategy: updateStrategy,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CompositeController-"+cc.Name),
numWorkers: numWorkers,
finalizer: &finalizer.Manager{
Name: "metacontroller.io/compositecontroller-" + cc.Name,
Enabled: cc.Spec.Hooks.Finalize != nil,
Expand Down Expand Up @@ -178,9 +181,8 @@ func (pc *parentController) Start() {
return
}

// 5 workers ought to be enough for anyone.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
for i := 0; i < pc.numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
8 changes: 6 additions & 2 deletions controller/composite/metacontroller.go
Expand Up @@ -54,9 +54,11 @@ type Metacontroller struct {
parentControllers map[string]*parentController

stopCh, doneCh chan struct{}

numWorkers int
}

func NewMetacontroller(resources *dynamicdiscovery.ResourceMap, dynClient *dynamicclientset.Clientset, dynInformers *dynamicinformer.SharedInformerFactory, mcInformerFactory mcinformers.SharedInformerFactory, mcClient mcclientset.Interface) *Metacontroller {
func NewMetacontroller(resources *dynamicdiscovery.ResourceMap, dynClient *dynamicclientset.Clientset, dynInformers *dynamicinformer.SharedInformerFactory, mcInformerFactory mcinformers.SharedInformerFactory, mcClient mcclientset.Interface, numWorkers int) *Metacontroller {
mc := &Metacontroller{
resources: resources,
mcClient: mcClient,
Expand All @@ -70,6 +72,8 @@ func NewMetacontroller(resources *dynamicdiscovery.ResourceMap, dynClient *dynam

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CompositeController"),
parentControllers: make(map[string]*parentController),

numWorkers: numWorkers,
}

mc.ccInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -175,7 +179,7 @@ func (mc *Metacontroller) syncCompositeController(cc *v1alpha1.CompositeControll
delete(mc.parentControllers, cc.Name)
}

pc, err := newParentController(mc.resources, mc.dynClient, mc.dynInformers, mc.mcClient, mc.revisionLister, cc)
pc, err := newParentController(mc.resources, mc.dynClient, mc.dynInformers, mc.mcClient, mc.revisionLister, cc, mc.numWorkers)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions controller/decorator/controller.go
Expand Up @@ -66,10 +66,12 @@ type decoratorController struct {
parentInformers common.InformerMap
childInformers common.InformerMap

numWorkers int

finalizer *finalizer.Manager
}

func newDecoratorController(resources *dynamicdiscovery.ResourceMap, dynClient *dynamicclientset.Clientset, dynInformers *dynamicinformer.SharedInformerFactory, dc *v1alpha1.DecoratorController) (controller *decoratorController, newErr error) {
func newDecoratorController(resources *dynamicdiscovery.ResourceMap, dynClient *dynamicclientset.Clientset, dynInformers *dynamicinformer.SharedInformerFactory, dc *v1alpha1.DecoratorController, numWorkers int) (controller *decoratorController, newErr error) {
c := &decoratorController{
dc: dc,
resources: resources,
Expand All @@ -78,7 +80,8 @@ func newDecoratorController(resources *dynamicdiscovery.ResourceMap, dynClient *
parentInformers: make(common.InformerMap),
childInformers: make(common.InformerMap),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DecoratorController-"+dc.Name),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DecoratorController-"+dc.Name),
numWorkers: numWorkers,
finalizer: &finalizer.Manager{
Name: "metacontroller.io/decoratorcontroller-" + dc.Name,
Enabled: dc.Spec.Hooks.Finalize != nil,
Expand Down Expand Up @@ -198,9 +201,8 @@ func (c *decoratorController) Start() {
return
}

// 5 workers ought to be enough for anyone.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
for i := 0; i < c.numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
8 changes: 6 additions & 2 deletions controller/decorator/metacontroller.go
Expand Up @@ -50,9 +50,11 @@ type Metacontroller struct {
decoratorControllers map[string]*decoratorController

stopCh, doneCh chan struct{}

numWorkers int
}

func NewMetacontroller(resources *dynamicdiscovery.ResourceMap, dynClient *dynamicclientset.Clientset, dynInformers *dynamicinformer.SharedInformerFactory, mcInformerFactory mcinformers.SharedInformerFactory) *Metacontroller {
func NewMetacontroller(resources *dynamicdiscovery.ResourceMap, dynClient *dynamicclientset.Clientset, dynInformers *dynamicinformer.SharedInformerFactory, mcInformerFactory mcinformers.SharedInformerFactory, numWorkers int) *Metacontroller {
mc := &Metacontroller{
resources: resources,
dynClient: dynClient,
Expand All @@ -63,6 +65,8 @@ func NewMetacontroller(resources *dynamicdiscovery.ResourceMap, dynClient *dynam

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DecoratorController"),
decoratorControllers: make(map[string]*decoratorController),

numWorkers: numWorkers,
}

mc.dcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -168,7 +172,7 @@ func (mc *Metacontroller) syncDecoratorController(dc *v1alpha1.DecoratorControll
delete(mc.decoratorControllers, dc.Name)
}

c, err := newDecoratorController(mc.resources, mc.dynClient, mc.dynInformers, dc)
c, err := newDecoratorController(mc.resources, mc.dynClient, mc.dynInformers, dc, mc.numWorkers)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions docs/src/guide/install.md
Expand Up @@ -55,3 +55,4 @@ in `manifests/metacontroller.yaml`):
| `--client-config-path` | Path to kubeconfig file (same format as used by kubectl); if not specified, use in-cluster config (e.g. `--client-config-path=/path/to/kubeconfig`). |
| `--client-go-qps` | Number of queries per second client-go is allowed to make (default 5, e.g. `--client-go-qps=100`) |
| `--client-go-burst` |Allowed burst queries for client-go (default 10, e.g. `--client-go-burst=200`) |
| `--workers` | Number of sync workers to run (default 5, e.g. `--workers=100`) |
3 changes: 2 additions & 1 deletion main.go
Expand Up @@ -45,6 +45,7 @@ var (
clientConfigPath = flag.String("client-config-path", "", "Path to kubeconfig file (same format as used by kubectl); if not specified, use in-cluster config")
clientGoQPS = flag.Float64("client-go-qps", 5, "Number of queries per second client-go is allowed to make (default 5)")
clientGoBurst = flag.Int("client-go-burst", 10, "Allowed burst queries for client-go (default 10)")
workers = flag.Int("workers", 5, "Number of sync workers to run (default 5)")
)

func main() {
Expand All @@ -71,7 +72,7 @@ func main() {
config.QPS = float32(*clientGoQPS)
config.Burst = *clientGoBurst

stopServer, err := server.Start(config, *discoveryInterval, *informerRelist)
stopServer, err := server.Start(config, *discoveryInterval, *informerRelist, *workers)
if err != nil {
glog.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Expand Up @@ -41,7 +41,7 @@ type controller interface {
Stop()
}

func Start(config *rest.Config, discoveryInterval, informerRelist time.Duration) (stop func(), err error) {
func Start(config *rest.Config, discoveryInterval, informerRelist time.Duration, numWorkers int) (stop func(), err error) {
// Periodically refresh discovery to pick up newly-installed resources.
dc := discovery.NewDiscoveryClientForConfigOrDie(config)
resources := dynamicdiscovery.NewResourceMap(dc)
Expand All @@ -66,8 +66,8 @@ func Start(config *rest.Config, discoveryInterval, informerRelist time.Duration)
// Start metacontrollers (controllers that spawn controllers).
// Each one requests the informers it needs from the factory.
controllers := []controller{
composite.NewMetacontroller(resources, dynClient, dynInformers, mcInformerFactory, mcClient),
decorator.NewMetacontroller(resources, dynClient, dynInformers, mcInformerFactory),
composite.NewMetacontroller(resources, dynClient, dynInformers, mcInformerFactory, mcClient, numWorkers),
decorator.NewMetacontroller(resources, dynClient, dynInformers, mcInformerFactory, numWorkers),
}

// Start all requested informers.
Expand Down
2 changes: 1 addition & 1 deletion test/integration/framework/main.go
Expand Up @@ -108,7 +108,7 @@ func testMain(tests func() int) error {
// metacontroller StatefulSet will not actually run anything.
// Instead, we start the Metacontroller server locally inside the test binary,
// since that's part of the code under test.
stopServer, err := server.Start(ApiserverConfig(), 500*time.Millisecond, 30*time.Minute)
stopServer, err := server.Start(ApiserverConfig(), 500*time.Millisecond, 30*time.Minute, 5)
if err != nil {
return fmt.Errorf("cannot start metacontroller server: %v", err)
}
Expand Down

0 comments on commit 3f07022

Please sign in to comment.