diff --git a/cmd/crank/beta/trace/internal/resource/xrm/client.go b/cmd/crank/beta/trace/internal/resource/xrm/client.go index 336c808609b..3bffc15d590 100644 --- a/cmd/crank/beta/trace/internal/resource/xrm/client.go +++ b/cmd/crank/beta/trace/internal/resource/xrm/client.go @@ -33,11 +33,14 @@ import ( "github.com/crossplane/crossplane/cmd/crank/beta/trace/internal/resource" ) +const defaultConcurrency = 5 + // Client to get a Resource with all its children. type Client struct { getConnectionSecrets bool - client client.Client + client client.Client + concurrency int } // ResourceClientOption is a functional option for a Client. @@ -50,12 +53,20 @@ func WithConnectionSecrets(v bool) ResourceClientOption { } } +// WithConcurrency is a functional option that sets the concurrency for the resource load. +func WithConcurrency(n int) ResourceClientOption { + return func(c *Client) { + c.concurrency = n + } +} + // NewClient returns a new Client. func NewClient(in client.Client, opts ...ResourceClientOption) (*Client, error) { uClient := xpunstructured.NewClient(in) c := &Client{ - client: uClient, + client: uClient, + concurrency: defaultConcurrency, } for _, o := range opts { @@ -67,25 +78,20 @@ func NewClient(in client.Client, opts ...ResourceClientOption) (*Client, error) // GetResourceTree returns the requested Crossplane Resource and all its children. func (kc *Client) GetResourceTree(ctx context.Context, root *resource.Resource) (*resource.Resource, error) { - // Set up a FIFO queue to traverse the resource tree breadth first. - queue := []*resource.Resource{root} - - for len(queue) > 0 { - // Pop the first element from the queue. - res := queue[0] - queue = queue[1:] - - refs := getResourceChildrenRefs(res, kc.getConnectionSecrets) - - for i := range refs { - child := resource.GetResource(ctx, kc.client, &refs[i]) + q := newLoader(root, kc) + q.load(ctx, kc.concurrency) + return root, nil +} - res.Children = append(res.Children, child) - queue = append(queue, child) - } - } +// loadResource returns the resource for the specified object reference. +func (kc *Client) loadResource(ctx context.Context, ref *v1.ObjectReference) *resource.Resource { + return resource.GetResource(ctx, kc.client, ref) +} - return root, nil +// getResourceChildrenRefs returns the references to the children for the given +// Resource, assuming it's a Crossplane resource, XR or XRC. +func (kc *Client) getResourceChildrenRefs(_ context.Context, r *resource.Resource) []v1.ObjectReference { + return getResourceChildrenRefs(r, kc.getConnectionSecrets) } // getResourceChildrenRefs returns the references to the children for the given diff --git a/cmd/crank/beta/trace/internal/resource/xrm/loader.go b/cmd/crank/beta/trace/internal/resource/xrm/loader.go new file mode 100644 index 00000000000..d6686f40211 --- /dev/null +++ b/cmd/crank/beta/trace/internal/resource/xrm/loader.go @@ -0,0 +1,129 @@ +/* +Copyright 2023 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package xrm + +import ( + "context" + "sync" + + v1 "k8s.io/api/core/v1" + + "github.com/crossplane/crossplane/cmd/crank/beta/trace/internal/resource" +) + +// channelCapacity is the buffer size of the processing channel, should be a high value +// so that there is no blocking. Correctness of processing does not depend on the channel capacity. +var channelCapacity = 1000 //nolint:gochecknoglobals // we treat this as constant only overrideable for tests. + +// workItem maintains the relationship of a resource to be loaded with its parent +// such that the resource that is loaded can be added as a child. +type workItem struct { + parent *resource.Resource + child v1.ObjectReference +} + +// resourceLoader is a delegate that loads resources and returns child resource refs. +type resourceLoader interface { + loadResource(ctx context.Context, ref *v1.ObjectReference) *resource.Resource + getResourceChildrenRefs(_ context.Context, r *resource.Resource) []v1.ObjectReference +} + +// loader loads resources concurrently. +type loader struct { + root *resource.Resource // the root resource for which the tree is loaded + l resourceLoader // the resource loader + resourceLock sync.Mutex // lock when updating the children of any resource + processing sync.WaitGroup // "counter" to track requests in flight + ch chan workItem // processing channel + done chan struct{} // done channel, signaled when all resources are loaded +} + +// newLoader creates a loader for the root resource. +func newLoader(root *resource.Resource, rl resourceLoader) *loader { + l := &loader{ + l: rl, + ch: make(chan workItem, channelCapacity), + done: make(chan struct{}), + root: root, + } + return l +} + +// load loads the full resource tree in a concurrent manner. +func (l *loader) load(ctx context.Context, concurrency int) { + // make sure counters are incremented for root child refs before starting concurrent processing + refs := l.l.getResourceChildrenRefs(ctx, l.root) + l.addRefs(l.root, refs) + + // signal the done channel after all items are processed + go func() { + l.processing.Wait() + close(l.done) + }() + + if concurrency < 1 { + concurrency = defaultConcurrency + } + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-l.done: + return + case item := <-l.ch: + l.processItem(ctx, item) + } + } + }() + } + wg.Wait() +} + +// addRefs adds work items to the queue. +func (l *loader) addRefs(parent *resource.Resource, refs []v1.ObjectReference) { + // ensure counters are updated synchronously + l.processing.Add(len(refs)) + // free up the current processing routine even if the channel blocks. + go func() { + for _, ref := range refs { + l.ch <- workItem{ + parent: parent, + child: ref, + } + } + }() +} + +// processItem processes a single work item in the queue and decrements the in-process counter +// after adding child references. +func (l *loader) processItem(ctx context.Context, item workItem) { + defer l.processing.Done() + res := l.l.loadResource(ctx, &item.child) + refs := l.l.getResourceChildrenRefs(ctx, res) + l.updateChild(item, res) + l.addRefs(res, refs) +} + +// updateChild adds the supplied child resource to its parent. +func (l *loader) updateChild(item workItem, res *resource.Resource) { + l.resourceLock.Lock() + item.parent.Children = append(item.parent.Children, res) + l.resourceLock.Unlock() +} diff --git a/cmd/crank/beta/trace/internal/resource/xrm/loader_test.go b/cmd/crank/beta/trace/internal/resource/xrm/loader_test.go new file mode 100644 index 00000000000..a57e74318d0 --- /dev/null +++ b/cmd/crank/beta/trace/internal/resource/xrm/loader_test.go @@ -0,0 +1,151 @@ +package xrm + +import ( + "context" + "fmt" + "math/rand" + "regexp" + "strconv" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/crossplane/crossplane/cmd/crank/beta/trace/internal/resource" +) + +var reNum = regexp.MustCompile(`-(\d+)$`) + +type simpleGenerator struct { + childDepth int + numItems int +} + +func (d *simpleGenerator) createResource(apiVersion, kind, name string) *resource.Resource { + obj := map[string]any{ + "apiVersion": apiVersion, + "kind": kind, + "metadata": map[string]any{ + "name": name, + }, + } + return &resource.Resource{Unstructured: unstructured.Unstructured{Object: obj}} +} + +func (d *simpleGenerator) createRefAtDepth(depth int) v1.ObjectReference { + prefix := "comp-res" + if depth == d.childDepth { + prefix = "managed-res" + } + return v1.ObjectReference{ + Kind: fmt.Sprintf("Depth%d", depth), + Name: fmt.Sprintf("%s-%d-%d", prefix, rand.Int(), depth), + APIVersion: "example.com/v1", + } +} + +func (d *simpleGenerator) createResourceFromRef(ref *v1.ObjectReference) *resource.Resource { + return d.createResource(ref.APIVersion, ref.Kind, ref.Name) +} + +func (d *simpleGenerator) loadResource(_ context.Context, ref *v1.ObjectReference) *resource.Resource { + return d.createResourceFromRef(ref) +} + +func (d *simpleGenerator) depthFromResource(res *resource.Resource) int { + ret := 0 + matches := reNum.FindStringSubmatch(res.Unstructured.GetName()) + if len(matches) > 0 { + n, err := strconv.Atoi(matches[1]) + if err != nil { + panic(err) + } + ret = n + } + return ret +} + +func (d *simpleGenerator) getResourceChildrenRefs(_ context.Context, r *resource.Resource) []v1.ObjectReference { + depth := d.depthFromResource(r) + if depth == d.childDepth { + return nil + } + var ret []v1.ObjectReference + for i := 0; i < d.numItems; i++ { + ret = append(ret, d.createRefAtDepth(depth+1)) + } + return ret +} + +var _ resourceLoader = &simpleGenerator{} + +func countItems(root *resource.Resource) int { + ret := 1 + for _, child := range root.Children { + ret += countItems(child) + } + return ret +} + +func TestLoader(t *testing.T) { + tests := []struct { + name string + childDepth int + numItems int + channelCapacity int + concurrency int + expectedResources int + }{ + { + name: "simple", + childDepth: 3, + numItems: 3, + expectedResources: 1 + 3 + 9 + 27, + }, + { + name: "blocking buffer", + channelCapacity: 1, + concurrency: 1, + childDepth: 3, + numItems: 10, + expectedResources: 1 + 10 + 100 + 1000, + }, + { + name: "no children at root", + childDepth: 0, + numItems: 0, + expectedResources: 1, + }, + { + name: "uses default concurrency", + concurrency: -1, + childDepth: 3, + numItems: 3, + expectedResources: 1 + 3 + 9 + 27, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + orig := channelCapacity + defer func() { channelCapacity = orig }() + + if test.channelCapacity > 0 { + channelCapacity = test.channelCapacity + } + concurrency := defaultConcurrency + if test.concurrency != 0 { + concurrency = test.concurrency + } + sg := &simpleGenerator{childDepth: test.childDepth, numItems: test.numItems} + rootRef := sg.createRefAtDepth(0) + root := sg.createResourceFromRef(&rootRef) + l := newLoader(root, sg) + l.load(context.Background(), concurrency) + n := countItems(root) + if test.expectedResources != n { + t.Errorf("resource count mismatch: want %d, got %d", test.expectedResources, n) + } + }) + } +} diff --git a/cmd/crank/beta/trace/trace.go b/cmd/crank/beta/trace/trace.go index 943f6d732cb..aad5c0c26c9 100644 --- a/cmd/crank/beta/trace/trace.go +++ b/cmd/crank/beta/trace/trace.go @@ -69,6 +69,7 @@ type Cmd struct { ShowPackageDependencies string `default:"unique" enum:"unique,all,none" help:"Show package dependencies in the output. One of: unique, all, none." name:"show-package-dependencies"` ShowPackageRevisions string `default:"active" enum:"active,all,none" help:"Show package revisions in the output. One of: active, all, none." name:"show-package-revisions"` ShowPackageRuntimeConfigs bool `default:"false" help:"Show package runtime configs in the output." name:"show-package-runtime-configs"` + Concurrency int `default:"5" help:"load concurrency" name:"concurrency"` } // Help returns help message for the trace command. @@ -126,6 +127,11 @@ func (c *Cmd) Run(k *kong.Context, logger logging.Logger) error { } logger.Debug("Found kubeconfig") + // XXX: this needs to be made configurable - see TODO on line 64 + // I used the values below for checking timing as concurrency increases + // kubeconfig.QPS = 50 + // kubeconfig.Burst = 100 + client, err := client.New(kubeconfig, client.Options{ Scheme: scheme.Scheme, }) @@ -195,7 +201,10 @@ func (c *Cmd) Run(k *kong.Context, logger logging.Logger) error { } default: logger.Debug("Requested resource is not a package, assumed to be an XR, XRC or MR") - treeClient, err = xrm.NewClient(client, xrm.WithConnectionSecrets(c.ShowConnectionSecrets)) + treeClient, err = xrm.NewClient(client, + xrm.WithConnectionSecrets(c.ShowConnectionSecrets), + xrm.WithConcurrency(c.Concurrency), + ) if err != nil { return errors.Wrap(err, errInitKubeClient) }