From 5974c0eddfb5be88238a506d058c8f229b7d4895 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Wed, 12 Nov 2025 11:03:14 +0000 Subject: [PATCH 1/2] :bug: `parallelisation` Ensure that execution options are propagated to compount execution group when creating a PriorityExecutionGroup --- changes/20251112110252.bugfix | 1 + utils/parallelisation/priority_group.go | 4 ++- utils/parallelisation/priority_group_test.go | 28 ++++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 changes/20251112110252.bugfix diff --git a/changes/20251112110252.bugfix b/changes/20251112110252.bugfix new file mode 100644 index 0000000000..0598704399 --- /dev/null +++ b/changes/20251112110252.bugfix @@ -0,0 +1 @@ +:bug: `parallelisation` Ensure that execution options are propagated to compount execution group when creating a PriorityExecutionGroup diff --git a/utils/parallelisation/priority_group.go b/utils/parallelisation/priority_group.go index 6c8049d562..c7b9f1e8f7 100644 --- a/utils/parallelisation/priority_group.go +++ b/utils/parallelisation/priority_group.go @@ -95,7 +95,9 @@ func (g *PriorityExecutionGroup[T]) executors() (executor *CompoundExecutionGrou g.mu.RLock() defer g.mu.RUnlock() - executor = NewCompoundExecutionGroup(DefaultOptions().MergeWithOptions(Sequential).Options()...) + opts := DefaultOptions().MergeWithOptions(g.options...) + opts.MergeWithOptions(Sequential) + executor = NewCompoundExecutionGroup(opts.Options()...) for _, key := range slices.Sorted(maps.Keys(g.groups)) { executor.RegisterExecutor(g.groups[key]) } diff --git a/utils/parallelisation/priority_group_test.go b/utils/parallelisation/priority_group_test.go index 5d5968d5e2..2507c3e5de 100644 --- a/utils/parallelisation/priority_group_test.go +++ b/utils/parallelisation/priority_group_test.go @@ -392,4 +392,32 @@ func TestPriority(t *testing.T) { err := priorityGroup.Execute(ctx) errortest.AssertError(t, err, commonerrors.ErrTimeout) }) + + t.Run("stop on first error propagates properly across priorities", func(t *testing.T) { + defer goleak.VerifyNone(t) + + var lowerPriorityCalled, samePriorityCalledAfter atomic.Bool + + priorityGroup := NewPriorityExecutionGroup(Sequential, StopOnFirstError) + + priorityGroup.RegisterFunctionWithPriority(0, testExecutorFunc(func(ctx context.Context) (err error) { + err = commonerrors.ErrConflict + return + })) + + priorityGroup.RegisterFunctionWithPriority(0, testExecutorFunc(func(ctx context.Context) (err error) { + samePriorityCalledAfter.Store(true) + return + })) + + priorityGroup.RegisterFunctionWithPriority(1, testExecutorFunc(func(ctx context.Context) (err error) { + lowerPriorityCalled.Store(true) + return + })) + + err := priorityGroup.Execute(context.Background()) + errortest.AssertError(t, err, commonerrors.ErrConflict) + assert.False(t, samePriorityCalledAfter.Load()) + assert.False(t, lowerPriorityCalled.Load()) + }) } From e81254a1222de3a6430ce30006c87bcac5a9e684 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Wed, 12 Nov 2025 11:04:20 +0000 Subject: [PATCH 2/2] spelling --- changes/20251112110252.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/20251112110252.bugfix b/changes/20251112110252.bugfix index 0598704399..772569e03f 100644 --- a/changes/20251112110252.bugfix +++ b/changes/20251112110252.bugfix @@ -1 +1 @@ -:bug: `parallelisation` Ensure that execution options are propagated to compount execution group when creating a PriorityExecutionGroup +:bug: `parallelisation` Ensure that execution options are propagated to compound execution group when creating a PriorityExecutionGroup