Skip to content

Commit

Permalink
improve trace client performance using concurrent resource load
Browse files Browse the repository at this point in the history
fixes crossplane#5707

Add a loader with configurable concurrency to load resources in concurrent manner.
The xrm client delegates to the loader for resource load and supports a functional
option to set the concurrency.

Add a `--concurrency` flag for the `crank beta trace` command and configure the
xrm client appropriately.

Signed-off-by: gotwarlost <kananthmail-github@yahoo.com>
  • Loading branch information
gotwarlost committed May 22, 2024
1 parent 6292789 commit b5d8a44
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 20 deletions.
44 changes: 25 additions & 19 deletions cmd/crank/beta/trace/internal/resource/xrm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ import (
"github.com/crossplane/crossplane/cmd/crank/beta/trace/internal/resource"
)

const defaultConcurrency = 5

// Client to get a Resource with all its children.
type Client struct {
getConnectionSecrets bool

client client.Client
client client.Client
concurrency int
}

// ResourceClientOption is a functional option for a Client.
Expand All @@ -50,12 +53,20 @@ func WithConnectionSecrets(v bool) ResourceClientOption {
}
}

// WithConcurrency is a functional option that sets the concurrency for the resource load.
func WithConcurrency(n int) ResourceClientOption {
return func(c *Client) {
c.concurrency = n
}
}

// NewClient returns a new Client.
func NewClient(in client.Client, opts ...ResourceClientOption) (*Client, error) {
uClient := xpunstructured.NewClient(in)

c := &Client{
client: uClient,
client: uClient,
concurrency: defaultConcurrency,
}

for _, o := range opts {
Expand All @@ -67,25 +78,20 @@ func NewClient(in client.Client, opts ...ResourceClientOption) (*Client, error)

// GetResourceTree returns the requested Crossplane Resource and all its children.
func (kc *Client) GetResourceTree(ctx context.Context, root *resource.Resource) (*resource.Resource, error) {
// Set up a FIFO queue to traverse the resource tree breadth first.
queue := []*resource.Resource{root}

for len(queue) > 0 {
// Pop the first element from the queue.
res := queue[0]
queue = queue[1:]

refs := getResourceChildrenRefs(res, kc.getConnectionSecrets)

for i := range refs {
child := resource.GetResource(ctx, kc.client, &refs[i])
q := newLoader(root, kc)
q.load(ctx, kc.concurrency)
return root, nil
}

res.Children = append(res.Children, child)
queue = append(queue, child)
}
}
// loadResource returns the resource for the specified object reference.
func (kc *Client) loadResource(ctx context.Context, ref *v1.ObjectReference) *resource.Resource {
return resource.GetResource(ctx, kc.client, ref)
}

return root, nil
// getResourceChildrenRefs returns the references to the children for the given
// Resource, assuming it's a Crossplane resource, XR or XRC.
func (kc *Client) getResourceChildrenRefs(_ context.Context, r *resource.Resource) []v1.ObjectReference {
return getResourceChildrenRefs(r, kc.getConnectionSecrets)
}

// getResourceChildrenRefs returns the references to the children for the given
Expand Down
129 changes: 129 additions & 0 deletions cmd/crank/beta/trace/internal/resource/xrm/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Copyright 2023 The Crossplane Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package xrm

import (
"context"
"sync"

v1 "k8s.io/api/core/v1"

"github.com/crossplane/crossplane/cmd/crank/beta/trace/internal/resource"
)

// channelCapacity is the buffer size of the processing channel, should be a high value
// so that there is no blocking. Correctness of processing does not depend on the channel capacity.
var channelCapacity = 1000 //nolint:gochecknoglobals // we treat this as constant only overrideable for tests.

// workItem maintains the relationship of a resource to be loaded with its parent
// such that the resource that is loaded can be added as a child.
type workItem struct {
parent *resource.Resource
child v1.ObjectReference
}

// resourceLoader is a delegate that loads resources and returns child resource refs.
type resourceLoader interface {
loadResource(ctx context.Context, ref *v1.ObjectReference) *resource.Resource
getResourceChildrenRefs(_ context.Context, r *resource.Resource) []v1.ObjectReference
}

// loader loads resources concurrently.
type loader struct {
root *resource.Resource // the root resource for which the tree is loaded
l resourceLoader // the resource loader
resourceLock sync.Mutex // lock when updating the children of any resource
processing sync.WaitGroup // "counter" to track requests in flight
ch chan workItem // processing channel
done chan struct{} // done channel, signaled when all resources are loaded
}

// newLoader creates a loader for the root resource.
func newLoader(root *resource.Resource, rl resourceLoader) *loader {
l := &loader{
l: rl,
ch: make(chan workItem, channelCapacity),
done: make(chan struct{}),
root: root,
}
return l
}

// load loads the full resource tree in a concurrent manner.
func (l *loader) load(ctx context.Context, concurrency int) {
// make sure counters are incremented for root child refs before starting concurrent processing
refs := l.l.getResourceChildrenRefs(ctx, l.root)
l.addRefs(l.root, refs)

// signal the done channel after all items are processed
go func() {
l.processing.Wait()
close(l.done)
}()

if concurrency < 1 {
concurrency = defaultConcurrency
}
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-l.done:
return
case item := <-l.ch:
l.processItem(ctx, item)
}
}
}()
}
wg.Wait()
}

// addRefs adds work items to the queue.
func (l *loader) addRefs(parent *resource.Resource, refs []v1.ObjectReference) {
// ensure counters are updated synchronously
l.processing.Add(len(refs))
// free up the current processing routine even if the channel blocks.
go func() {
for _, ref := range refs {
l.ch <- workItem{
parent: parent,
child: ref,
}
}
}()
}

// processItem processes a single work item in the queue and decrements the in-process counter
// after adding child references.
func (l *loader) processItem(ctx context.Context, item workItem) {
defer l.processing.Done()
res := l.l.loadResource(ctx, &item.child)
refs := l.l.getResourceChildrenRefs(ctx, res)
l.updateChild(item, res)
l.addRefs(res, refs)
}

// updateChild adds the supplied child resource to its parent.
func (l *loader) updateChild(item workItem, res *resource.Resource) {
l.resourceLock.Lock()
item.parent.Children = append(item.parent.Children, res)
l.resourceLock.Unlock()
}
151 changes: 151 additions & 0 deletions cmd/crank/beta/trace/internal/resource/xrm/loader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package xrm

import (
"context"
"fmt"
"math/rand"
"regexp"
"strconv"
"testing"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/crossplane/crossplane/cmd/crank/beta/trace/internal/resource"
)

var reNum = regexp.MustCompile(`-(\d+)$`)

type simpleGenerator struct {
childDepth int
numItems int
}

func (d *simpleGenerator) createResource(apiVersion, kind, name string) *resource.Resource {
obj := map[string]any{
"apiVersion": apiVersion,
"kind": kind,
"metadata": map[string]any{
"name": name,
},
}
return &resource.Resource{Unstructured: unstructured.Unstructured{Object: obj}}
}

func (d *simpleGenerator) createRefAtDepth(depth int) v1.ObjectReference {
prefix := "comp-res"
if depth == d.childDepth {
prefix = "managed-res"
}
return v1.ObjectReference{
Kind: fmt.Sprintf("Depth%d", depth),
Name: fmt.Sprintf("%s-%d-%d", prefix, rand.Int(), depth),
APIVersion: "example.com/v1",
}
}

func (d *simpleGenerator) createResourceFromRef(ref *v1.ObjectReference) *resource.Resource {
return d.createResource(ref.APIVersion, ref.Kind, ref.Name)
}

func (d *simpleGenerator) loadResource(_ context.Context, ref *v1.ObjectReference) *resource.Resource {
return d.createResourceFromRef(ref)
}

func (d *simpleGenerator) depthFromResource(res *resource.Resource) int {
ret := 0
matches := reNum.FindStringSubmatch(res.Unstructured.GetName())
if len(matches) > 0 {
n, err := strconv.Atoi(matches[1])
if err != nil {
panic(err)
}
ret = n
}
return ret
}

func (d *simpleGenerator) getResourceChildrenRefs(_ context.Context, r *resource.Resource) []v1.ObjectReference {
depth := d.depthFromResource(r)
if depth == d.childDepth {
return nil
}
var ret []v1.ObjectReference
for i := 0; i < d.numItems; i++ {
ret = append(ret, d.createRefAtDepth(depth+1))
}
return ret
}

var _ resourceLoader = &simpleGenerator{}

func countItems(root *resource.Resource) int {
ret := 1
for _, child := range root.Children {
ret += countItems(child)
}
return ret
}

func TestLoader(t *testing.T) {
tests := []struct {
name string
childDepth int
numItems int
channelCapacity int
concurrency int
expectedResources int
}{
{
name: "simple",
childDepth: 3,
numItems: 3,
expectedResources: 1 + 3 + 9 + 27,
},
{
name: "blocking buffer",
channelCapacity: 1,
concurrency: 1,
childDepth: 3,
numItems: 10,
expectedResources: 1 + 10 + 100 + 1000,
},
{
name: "no children at root",
childDepth: 0,
numItems: 0,
expectedResources: 1,
},
{
name: "uses default concurrency",
concurrency: -1,
childDepth: 3,
numItems: 3,
expectedResources: 1 + 3 + 9 + 27,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
orig := channelCapacity
defer func() { channelCapacity = orig }()

if test.channelCapacity > 0 {
channelCapacity = test.channelCapacity
}
concurrency := defaultConcurrency
if test.concurrency != 0 {
concurrency = test.concurrency
}
sg := &simpleGenerator{childDepth: test.childDepth, numItems: test.numItems}
rootRef := sg.createRefAtDepth(0)
root := sg.createResourceFromRef(&rootRef)
l := newLoader(root, sg)
l.load(context.Background(), concurrency)
n := countItems(root)
if test.expectedResources != n {
t.Errorf("resource count mismatch: want %d, got %d", test.expectedResources, n)
}
})
}
}
11 changes: 10 additions & 1 deletion cmd/crank/beta/trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Cmd struct {
ShowPackageDependencies string `default:"unique" enum:"unique,all,none" help:"Show package dependencies in the output. One of: unique, all, none." name:"show-package-dependencies"`
ShowPackageRevisions string `default:"active" enum:"active,all,none" help:"Show package revisions in the output. One of: active, all, none." name:"show-package-revisions"`
ShowPackageRuntimeConfigs bool `default:"false" help:"Show package runtime configs in the output." name:"show-package-runtime-configs"`
Concurrency int `default:"5" help:"load concurrency" name:"concurrency"`
}

// Help returns help message for the trace command.
Expand Down Expand Up @@ -126,6 +127,11 @@ func (c *Cmd) Run(k *kong.Context, logger logging.Logger) error {
}
logger.Debug("Found kubeconfig")

// XXX: this needs to be made configurable - see TODO on line 64
// I used the values below for checking timing as concurrency increases
// kubeconfig.QPS = 50
// kubeconfig.Burst = 100

client, err := client.New(kubeconfig, client.Options{
Scheme: scheme.Scheme,
})
Expand Down Expand Up @@ -195,7 +201,10 @@ func (c *Cmd) Run(k *kong.Context, logger logging.Logger) error {
}
default:
logger.Debug("Requested resource is not a package, assumed to be an XR, XRC or MR")
treeClient, err = xrm.NewClient(client, xrm.WithConnectionSecrets(c.ShowConnectionSecrets))
treeClient, err = xrm.NewClient(client,
xrm.WithConnectionSecrets(c.ShowConnectionSecrets),
xrm.WithConcurrency(c.Concurrency),
)
if err != nil {
return errors.Wrap(err, errInitKubeClient)
}
Expand Down

0 comments on commit b5d8a44

Please sign in to comment.