Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of inflight requests #84

Merged
merged 5 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func init() {
rootCmd.Flags().StringVarP(&ketallOptions.Selector, constants.FlagSelector, "l", "", "Selector (label query) to filter on, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2).")
rootCmd.Flags().StringVar(&ketallOptions.FieldSelector, constants.FlagFieldSelector, "", "Selector (field query) to filter on, supports '=', '==', and '!='.(e.g. --field-selector key1=value1,key2=value2). The common field queries for all types are metadata.name and metadata.namespace.")
rootCmd.Flags().StringSliceVar(&ketallOptions.Exclusions, constants.FlagExclude, []string{"Event", "PodMetrics"}, "Filter by resource name (plural form or short name).")
rootCmd.Flags().Int64(constants.FlagConcurrency, 64, "Maximum number of inflight requests.")

ketallOptions.GenericCliFlags.AddFlags(rootCmd.Flags())
ketallOptions.PrintFlags.AddFlags(rootCmd)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
k8s.io/api v0.19.0
k8s.io/apimachinery v0.19.0
k8s.io/cli-runtime v0.19.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,9 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
132 changes: 64 additions & 68 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package client

import (
"context"
"fmt"
"sort"
"strings"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"golang.org/x/sync/semaphore"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -37,6 +39,8 @@ import (
"k8s.io/cli-runtime/pkg/resource"
)

var errEmpty = errors.New("no resources found")

// groupResource contains the APIGroup and APIResource
type groupResource struct {
APIGroup string
Expand All @@ -47,21 +51,19 @@ func GetAllServerResources(flags *genericclioptions.ConfigFlags) (runtime.Object
useCache := viper.GetBool(constants.FlagUseCache)
scope := viper.GetString(constants.FlagScope)

grs, err := fetchAvailableGroupResources(useCache, scope, flags)
grs, err := groupResources(useCache, scope, flags)
if err != nil {
return nil, errors.Wrap(err, "fetch available group resources")
}

resources := extractRelevantResources(grs, getExclusions())

start := time.Now()
response, err := fetchResourcesBulk(flags, resources...)
response, err := fetchResourcesBulk(flags, grs...)
logrus.Debugf("Initial fetchResourcesBulk done (%s)", duration.HumanDuration(time.Since(start)))
if err == nil {
return response, nil
}

return fetchResourcesIncremental(flags, resources...)
return fetchResourcesIncremental(context.TODO(), flags, grs...)
}

func getExclusions() []string {
Expand All @@ -77,7 +79,7 @@ func getExclusions() []string {
return exclusions
}

func fetchAvailableGroupResources(cache bool, scope string, flags *genericclioptions.ConfigFlags) ([]groupResource, error) {
func groupResources(cache bool, scope string, flags *genericclioptions.ConfigFlags) ([]groupResource, error) {
client, err := flags.ToDiscoveryClient()
if err != nil {
return nil, errors.Wrap(err, "discovery client")
Expand All @@ -87,7 +89,7 @@ func fetchAvailableGroupResources(cache bool, scope string, flags *genericcliopt
client.Invalidate()
}

skipCluster, skipNamespace, err := getResourceScope(scope)
scopeCluster, scopeNamespace, err := getResourceScope(scope)
if err != nil {
return nil, err
}
Expand All @@ -114,7 +116,7 @@ func fetchAvailableGroupResources(cache bool, scope string, flags *genericcliopt
continue
}

if (r.Namespaced && skipNamespace) || (!r.Namespaced && skipCluster) {
if (r.Namespaced && scopeCluster) || (!r.Namespaced && scopeNamespace) {
continue
}

Expand All @@ -130,42 +132,40 @@ func fetchAvailableGroupResources(cache bool, scope string, flags *genericcliopt
}
}

return grs, nil
}

func extractRelevantResources(grs []groupResource, exclusions []string) []groupResource {
sort.Stable(sortableGroupResource(grs))
forbidden := sets.NewString(exclusions...)
blocked := sets.NewString(getExclusions()...)

var result []groupResource
ret := grs[:0]
for _, r := range grs {
name := r.fullName()
name := r.String()
resourceIds := r.APIResource.ShortNames
resourceIds = append(resourceIds, r.APIResource.Name)
resourceIds = append(resourceIds, r.APIResource.Kind)
resourceIds = append(resourceIds, name)
if forbidden.HasAny(resourceIds...) {
if blocked.HasAny(resourceIds...) {
logrus.Debugf("Excluding %s", name)
continue
}
result = append(result, r)
ret = append(ret, r)
}

return result
return ret, nil
}

// Fetches all objects in bulk. This is much faster than incrementally but may fail due to missing rights
func fetchResourcesBulk(flags resource.RESTClientGetter, resourceTypes ...groupResource) (runtime.Object, error) {
resourceNames := ToResourceTypes(resourceTypes)
logrus.Debugf("Resources to fetch: %s", resourceNames)
func fetchResourcesBulk(flags resource.RESTClientGetter, grs ...groupResource) (runtime.Object, error) {
var resources []string
for _, gr := range grs {
resources = append(resources, gr.String())
}
logrus.Debugf("Resources to fetch: %s", resources)

ns := viper.GetString(constants.FlagNamespace)
selector := viper.GetString(constants.FlagSelector)
fieldSelector := viper.GetString(constants.FlagFieldSelector)

request := resource.NewBuilder(flags).
Unstructured().
ResourceTypes(resourceNames...).
ResourceTypes(resources...).
NamespaceParam(ns).DefaultNamespace().AllNamespaces(ns == "").
LabelSelectorParam(selector).FieldSelectorParam(fieldSelector).SelectAllParam(selector == "" && fieldSelector == "").
Flatten().
Expand All @@ -175,62 +175,66 @@ func fetchResourcesBulk(flags resource.RESTClientGetter, resourceTypes ...groupR
}

// Fetches all objects of the given resources one-by-one. This can be used as a fallback when fetchResourcesBulk fails.
func fetchResourcesIncremental(flags resource.RESTClientGetter, rs ...groupResource) (runtime.Object, error) {
func fetchResourcesIncremental(ctx context.Context, flags resource.RESTClientGetter, grs ...groupResource) (runtime.Object, error) {
// TODO(corneliusweig): this needs to properly pass ctx around
logrus.Debug("Fetch resources incrementally")
group := sync.WaitGroup{}

objsChan := make(chan runtime.Object)
for _, r := range rs {
r := r
group.Add(1)
go func(sendObj chan<- runtime.Object) {
defer group.Done()
if o, e := fetchResourcesBulk(flags, r); e != nil {
logrus.Warnf("Cannot fetch: %s", e)
} else {
sendObj <- o
}
}(objsChan)
}
start := time.Now()

maxInflight := viper.GetInt64(constants.FlagConcurrency)
sem := semaphore.NewWeighted(maxInflight) // restrict parallelism to 64 inflight requests

go func() {
start := time.Now()
group.Wait()
close(objsChan)
logrus.Debugf("Requests done (elapsed %s)", duration.HumanDuration(time.Since(start)))
}()
var mu sync.Mutex // mu guards ret
var ret []runtime.Object

var objs []runtime.Object
for o := range objsChan {
objs = append(objs, o)
var wg sync.WaitGroup
for _, gr := range grs {
wg.Add(1)
go func(gr groupResource) {
defer wg.Done()
if err := sem.Acquire(ctx, 1); err != nil {
return // context cancelled
}
defer sem.Release(1)
obj, err := fetchResourcesBulk(flags, gr)
if err != nil {
logrus.Warnf("Cannot fetch: %v", err)
return
}
mu.Lock()
ret = append(ret, obj)
mu.Unlock()
}(gr)
}
wg.Wait()
logrus.Debugf("Requests done (elapsed %s)", duration.HumanDuration(time.Since(start)))

if len(objs) == 0 {
return nil, fmt.Errorf("not authorized to list any resources, try to narrow the scope with --namespace")
if len(ret) == 0 {
logrus.Warnf("No resources found, are you authorized? Try to narrow the scope with --namespace.")
return nil, errEmpty
}

return util.ToV1List(objs), nil
return util.ToV1List(ret), nil
}

func getResourceScope(scope string) (skipCluster, skipNamespace bool, err error) {
func getResourceScope(scope string) (cluster, namespace bool, err error) {
switch scope {
case "":
skipCluster = viper.GetString(constants.FlagNamespace) != ""
skipNamespace = false
cluster = viper.GetString(constants.FlagNamespace) == ""
namespace = false
case "namespace":
skipCluster = true
skipNamespace = false
cluster = false
namespace = true
case "cluster":
skipCluster = false
skipNamespace = true
cluster = true
namespace = false
default:
err = fmt.Errorf("%s is not a valid resource scope (must be one of 'cluster' or 'namespace')", scope)
}
return
}

// Extracts the full name including APIGroup, e.g. 'deployment.apps'
func (g groupResource) fullName() string {
// String returns the canonical full name of the groupResource.
func (g groupResource) String() string {
if g.APIGroup == "" {
return g.APIResource.Name
}
Expand All @@ -239,14 +243,6 @@ func (g groupResource) fullName() string {

type sortableGroupResource []groupResource

func ToResourceTypes(in []groupResource) []string {
var result []string
for _, r := range in {
result = append(result, r.fullName())
}
return result
}

func (s sortableGroupResource) Len() int { return len(s) }
func (s sortableGroupResource) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortableGroupResource) Less(i, j int) bool {
Expand Down
91 changes: 0 additions & 91 deletions internal/client/client_test.go

This file was deleted.

1 change: 1 addition & 0 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import "github.com/sirupsen/logrus"
const (
DefaultLogLevel = logrus.WarnLevel

FlagConcurrency = "max-inflight"
FlagExclude = "exclude"
FlagNamespace = "namespace"
FlagScope = "only-scope"
Expand Down