Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions cmd/diff/cmd_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"sync"
"time"

"github.com/alecthomas/kong"
Expand All @@ -26,10 +27,15 @@ import (

"github.com/crossplane/crossplane-runtime/v2/pkg/errors"
"github.com/crossplane/crossplane-runtime/v2/pkg/logging"

"github.com/crossplane/crossplane/v2/cmd/crank/render"
)

// globalRenderMutex serializes all render operations globally across all diff processors.
// This prevents concurrent Docker container operations that can overwhelm the Docker daemon
// when processing multiple XRs with the same functions. See issue #59.
//
//nolint:gochecknoglobals // Required for global serialization across processors
var globalRenderMutex sync.Mutex

// CommonCmdFields contains common fields shared by both XR and Comp commands.
type CommonCmdFields struct {
// Configuration options
Expand Down Expand Up @@ -101,6 +107,5 @@ func defaultProcessorOptions() []dp.ProcessorOption {
return []dp.ProcessorOption{
dp.WithColorize(true),
dp.WithCompact(false),
dp.WithRenderFunc(render.Render),
}
}
1 change: 1 addition & 0 deletions cmd/diff/comp.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func makeDefaultCompProc(c *CompCmd, ctx *AppContext, log logging.Logger) dp.Com
dp.WithLogger(log),
dp.WithColorize(!c.NoColor), // Override default if NoColor is set
dp.WithCompact(c.Compact), // Override default if Compact is set
dp.WithRenderMutex(&globalRenderMutex),
)

// Create XR processor first (peer processor)
Expand Down
40 changes: 22 additions & 18 deletions cmd/diff/diff_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
run "runtime"
"strconv"
Expand Down Expand Up @@ -153,26 +152,12 @@ func runIntegrationTest(t *testing.T, testType DiffTestType, tests map[string]In
}

// Set up the test files
tempDir := t.TempDir()

var testFiles []string

// Handle any additional input files
for i, inputFile := range tt.inputFiles {
testFile := filepath.Join(tempDir, fmt.Sprintf("test_%d.yaml", i))

content, err := os.ReadFile(inputFile)
if err != nil {
t.Fatalf("failed to read input file: %v", err)
}

err = os.WriteFile(testFile, content, 0o644)
if err != nil {
t.Fatalf("failed to write test file: %v", err)
}

testFiles = append(testFiles, testFile)
}
// Note: NewCompositeLoader handles both individual files and directories,
// so we can pass paths directly without special handling
testFiles = append(testFiles, tt.inputFiles...)

// Create a buffer to capture the output
var stdout bytes.Buffer
Expand Down Expand Up @@ -1302,6 +1287,25 @@ Summary: 1 added`,
expectedError: false,
noColor: true,
},
"Concurrent rendering with multiple functions and XRs from directory": {
// This test reproduces issue #59 - concurrent function startup failures
// when processing multiple XR files from a directory
inputFiles: []string{
"testdata/diff/concurrent-xrs", // Pass the directory containing all XR files
},
setupFiles: []string{
"testdata/diff/resources/xrd-concurrent.yaml",
"testdata/diff/resources/composition-multi-functions.yaml",
"testdata/diff/resources/functions.yaml",
},
// We expect successful processing of all 5 XRs
// Each XR should produce 3 base resources + 2 additional resources = 5 resources per XR
// Plus the XR itself = 6 additions per XR
// Total: 5 XRs * 6 additions = 30 additions
expectedOutput: "Summary: 30 added",
expectedError: false,
noColor: true,
},
}

runIntegrationTest(t, XRDiffTest, tests)
Expand Down
10 changes: 10 additions & 0 deletions cmd/diff/diffprocessor/diff_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
k8 "github.com/crossplane-contrib/crossplane-diff/cmd/diff/client/kubernetes"
"github.com/crossplane-contrib/crossplane-diff/cmd/diff/renderer"
dt "github.com/crossplane-contrib/crossplane-diff/cmd/diff/renderer/types"
"github.com/crossplane-contrib/crossplane-diff/cmd/diff/serial"
"github.com/crossplane-contrib/crossplane-diff/cmd/diff/types"
un "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -70,6 +71,12 @@ func NewDiffProcessor(k8cs k8.Clients, xpcs xp.Clients, opts ...ProcessorOption)
// Set default factory functions if not provided
config.SetDefaultFactories()

// Wrap the RenderFunc with serialization if a mutex was provided
// This transparently handles serialization without requiring callers to worry about it
if config.RenderMutex != nil {
config.RenderFunc = serial.RenderFunc(config.RenderFunc, config.RenderMutex)
}

// Create the diff options based on configuration
diffOpts := config.GetDiffOptions()

Expand Down Expand Up @@ -210,6 +217,9 @@ func (p *DefaultDiffProcessor) DiffSingleResource(ctx context.Context, res *un.U
return nil, errors.Wrap(err, "cannot get functions from pipeline")
}

// Note: Serialization mutex prevents concurrent Docker operations.
// In e2e tests, named Docker containers (via annotations) reuse containers across renders.

// Apply XRD defaults before rendering
err = p.applyXRDDefaults(ctx, xr, resourceID)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions cmd/diff/diffprocessor/processor_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package diffprocessor

import (
"sync"

xp "github.com/crossplane-contrib/crossplane-diff/cmd/diff/client/crossplane"
k8 "github.com/crossplane-contrib/crossplane-diff/cmd/diff/client/kubernetes"
"github.com/crossplane-contrib/crossplane-diff/cmd/diff/renderer"
Expand All @@ -25,6 +27,9 @@ type ProcessorConfig struct {
// RenderFunc is the function to use for rendering resources
RenderFunc RenderFunc

// RenderMutex is the mutex used to serialize render operations (for internal use)
RenderMutex *sync.Mutex

// Factories provide factory functions for creating components
Factories ComponentFactories
}
Expand Down Expand Up @@ -85,6 +90,13 @@ func WithRenderFunc(renderFn RenderFunc) ProcessorOption {
}
}

// WithRenderMutex sets the mutex for serializing render operations.
func WithRenderMutex(mu *sync.Mutex) ProcessorOption {
return func(config *ProcessorConfig) {
config.RenderMutex = mu
}
}

// WithResourceManagerFactory sets the ResourceManager factory function.
func WithResourceManagerFactory(factory func(k8.ResourceClient, xp.DefinitionClient, logging.Logger) ResourceManager) ProcessorOption {
return func(config *ProcessorConfig) {
Expand Down
73 changes: 73 additions & 0 deletions cmd/diff/serial/serial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2025 The Crossplane Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package serial provides utilities for serializing render operations.
package serial

import (
"context"
"sync"
"time"

"github.com/crossplane/crossplane-runtime/v2/pkg/logging"

"github.com/crossplane/crossplane/v2/cmd/crank/render"
)

// RenderFunc wraps a render function to serialize all render calls using the provided mutex.
// This prevents concurrent Docker container operations that can overwhelm the Docker daemon
// when processing many XRs with the same functions. The serialization ensures:
//
// 1. Only one render operation runs at a time globally
// 2. Named Docker containers (via annotations) can be reused safely between renders
// 3. Container startup races are eliminated
//
// For e2e tests, combine this with versioned named container annotations for optimal performance.
// For production, this works without requiring users to annotate their Function resources.
func RenderFunc(
renderFunc func(context.Context, logging.Logger, render.Inputs) (render.Outputs, error),
mu *sync.Mutex,
) func(context.Context, logging.Logger, render.Inputs) (render.Outputs, error) {
renderCount := 0

return func(ctx context.Context, log logging.Logger, in render.Inputs) (render.Outputs, error) {
mu.Lock()
defer mu.Unlock()

renderCount++
log.Debug("Starting serialized render",
"renderNumber", renderCount,
"functionCount", len(in.Functions))

start := time.Now()
result, err := renderFunc(ctx, log, in)
duration := time.Since(start)

if err != nil {
log.Debug("Render completed with error",
"renderNumber", renderCount,
"error", err,
"duration", duration)
} else {
log.Debug("Render completed successfully",
"renderNumber", renderCount,
"duration", duration,
"composedResourceCount", len(result.ComposedResources))
}

return result, err
}
}
125 changes: 125 additions & 0 deletions cmd/diff/serial/serial_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright 2025 The Crossplane Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package serial

import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/crossplane/crossplane-runtime/v2/pkg/logging"
"github.com/crossplane/crossplane-runtime/v2/pkg/resource/unstructured/composed"

pkgv1 "github.com/crossplane/crossplane/v2/apis/pkg/v1"
"github.com/crossplane/crossplane/v2/cmd/crank/render"
)

func TestRenderFunc_Passthrough(t *testing.T) {
type ctxKey string
key := ctxKey("test")
ctx := context.WithValue(t.Context(), key, "test-value")
inputs := render.Inputs{Functions: []pkgv1.Function{{}, {}}}

var mu sync.Mutex
mockFunc := func(ctx context.Context, _ logging.Logger, in render.Inputs) (render.Outputs, error) {
// Verify context is passed through
if ctx.Value(key) != "test-value" {
t.Error("context not passed through")
}
// Verify inputs are passed through
if len(in.Functions) != 2 {
t.Errorf("expected 2 functions, got %d", len(in.Functions))
}
return render.Outputs{ComposedResources: []composed.Unstructured{*composed.New(), *composed.New()}}, nil
}

serialized := RenderFunc(mockFunc, &mu)
outputs, err := serialized(ctx, logging.NewNopLogger(), inputs)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Verify outputs are returned
if len(outputs.ComposedResources) != 2 {
t.Errorf("expected 2 composed resources, got %d", len(outputs.ComposedResources))
}
}

func TestRenderFunc_Error(t *testing.T) {
var mu sync.Mutex
expectedErr := errors.New("render failed")

mockFunc := func(_ context.Context, _ logging.Logger, _ render.Inputs) (render.Outputs, error) {
return render.Outputs{}, expectedErr
}

serialized := RenderFunc(mockFunc, &mu)
_, err := serialized(t.Context(), logging.NewNopLogger(), render.Inputs{})

if !errors.Is(err, expectedErr) {
t.Errorf("expected error %v, got %v", expectedErr, err)
}
}

func TestRenderFunc_Serialization(t *testing.T) {
var mu sync.Mutex
var concurrentCount atomic.Int32
var maxConcurrent atomic.Int32

mockFunc := func(_ context.Context, _ logging.Logger, _ render.Inputs) (render.Outputs, error) {
current := concurrentCount.Add(1)

// Update maxConcurrent if needed
for {
maxVal := maxConcurrent.Load()
if current <= maxVal || maxConcurrent.CompareAndSwap(maxVal, current) {
break
}
}

time.Sleep(10 * time.Millisecond)
concurrentCount.Add(-1)

return render.Outputs{}, nil
}

serialized := RenderFunc(mockFunc, &mu)

// Run multiple renders concurrently
const numCalls = 10
var wg sync.WaitGroup
wg.Add(numCalls)

for range numCalls {
go func() {
defer wg.Done()
if _, err := serialized(t.Context(), logging.NewNopLogger(), render.Inputs{}); err != nil {
t.Errorf("unexpected error: %v", err)
}
}()
}

wg.Wait()

// Verify that only one render ran at a time
if maxVal := maxConcurrent.Load(); maxVal != 1 {
t.Errorf("expected max concurrent executions to be 1, got %d", maxVal)
}
}
8 changes: 8 additions & 0 deletions cmd/diff/testdata/diff/concurrent-xrs/concurrent-xr-1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: concurrent.diff.example.org/v1alpha1
kind: XConcurrentTest
metadata:
name: concurrent-test-1
namespace: default
spec:
config: "test-config-1"
dataField: "data-1"
8 changes: 8 additions & 0 deletions cmd/diff/testdata/diff/concurrent-xrs/concurrent-xr-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: concurrent.diff.example.org/v1alpha1
kind: XConcurrentTest
metadata:
name: concurrent-test-2
namespace: default
spec:
config: "test-config-2"
dataField: "data-2"
8 changes: 8 additions & 0 deletions cmd/diff/testdata/diff/concurrent-xrs/concurrent-xr-3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: concurrent.diff.example.org/v1alpha1
kind: XConcurrentTest
metadata:
name: concurrent-test-3
namespace: default
spec:
config: "test-config-3"
dataField: "data-3"
Loading
Loading