From 7b495018d6775583fed886a8dffff96c222908d2 Mon Sep 17 00:00:00 2001 From: Adrien CABARBAYE Date: Mon, 17 Nov 2025 17:56:21 +0000 Subject: [PATCH] :sparkles: `[parallelisation]` few helpers to ease parallel processing --- changes/20251117175436.feature | 1 + changes/20251117175538.feature | 1 + utils/parallelisation/contextual.go | 6 ++++ utils/parallelisation/contextual_test.go | 14 +++++++++ utils/parallelisation/transform.go | 39 ++++++++++++++++++++++-- utils/parallelisation/transform_test.go | 7 +++++ 6 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 changes/20251117175436.feature create mode 100644 changes/20251117175538.feature diff --git a/changes/20251117175436.feature b/changes/20251117175436.feature new file mode 100644 index 0000000000..f35cbbdae9 --- /dev/null +++ b/changes/20251117175436.feature @@ -0,0 +1 @@ +:sparkles: `[parallelisation]` Added Transform helpers to ease the creation `transform` operations diff --git a/changes/20251117175538.feature b/changes/20251117175538.feature new file mode 100644 index 0000000000..db6756968d --- /dev/null +++ b/changes/20251117175538.feature @@ -0,0 +1 @@ +:sparkles: `[parallelisation]` Added `BreakOnErrorOrEOF` to help performing actions in parallel but for which `EOF` is considered as termination rather than an error diff --git a/utils/parallelisation/contextual.go b/utils/parallelisation/contextual.go index f5b559d735..988a5c6c0e 100644 --- a/utils/parallelisation/contextual.go +++ b/utils/parallelisation/contextual.go @@ -2,6 +2,7 @@ package parallelisation import ( "context" + "io" "github.com/ARM-software/golang-utils/utils/commonerrors" ) @@ -55,3 +56,8 @@ func BreakOnError(ctx context.Context, executionOptions *StoreOptions, contextua group.RegisterFunction(contextualFunc...) return group.Execute(ctx) } + +// BreakOnErrorOrEOF is similar to BreakOnError but also stops on EOF. However, in this case, no error is returned +func BreakOnErrorOrEOF(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error { + return commonerrors.Ignore(BreakOnError(ctx, executionOptions, contextualFunc...), commonerrors.ErrEOF, io.EOF) +} diff --git a/utils/parallelisation/contextual_test.go b/utils/parallelisation/contextual_test.go index cfcc62afdb..1832c6ade6 100644 --- a/utils/parallelisation/contextual_test.go +++ b/utils/parallelisation/contextual_test.go @@ -3,6 +3,7 @@ package parallelisation import ( "context" "errors" + "io" "testing" "time" @@ -35,6 +36,7 @@ func TestForEach(t *testing.T) { closeError := commonerrors.ErrUnexpected errortest.AssertError(t, ForEach(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError) errortest.AssertError(t, BreakOnError(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError) + errortest.AssertError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError) }) t.Run("close with cancellation", func(t *testing.T) { @@ -43,11 +45,23 @@ func TestForEach(t *testing.T) { cancel() errortest.AssertError(t, ForEach(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled) errortest.AssertError(t, BreakOnError(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled) + errortest.AssertError(t, BreakOnErrorOrEOF(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled) }) t.Run("break on error with no error", func(t *testing.T) { require.NoError(t, BreakOnError(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc))) }) + t.Run("break on error or EOF with no error", func(t *testing.T) { + require.NoError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc))) + }) + t.Run("break on error or EOF with no error", func(t *testing.T) { + require.NoError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), func(_ context.Context) error { + return commonerrors.ErrEOF + })) + require.NoError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), func(_ context.Context) error { + return io.EOF + })) + }) t.Run("for each with no error", func(t *testing.T) { require.NoError(t, ForEach(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc))) }) diff --git a/utils/parallelisation/transform.go b/utils/parallelisation/transform.go index d12fa94294..96aef32143 100644 --- a/utils/parallelisation/transform.go +++ b/utils/parallelisation/transform.go @@ -2,6 +2,8 @@ package parallelisation import ( "context" + "iter" + "slices" "go.uber.org/atomic" @@ -101,12 +103,17 @@ func (g *TransformGroup[I, O]) appendResult(o *resultElement[O]) { // Inputs registers inputs to transform. func (g *TransformGroup[I, O]) Inputs(ctx context.Context, i ...I) error { - for j := range i { + return g.InputSequence(ctx, slices.Values(i)) +} + +// InputSequence registers inputs to transform. +func (g *TransformGroup[I, O]) InputSequence(ctx context.Context, i iter.Seq[I]) error { + for e := range i { err := DetermineContextError(ctx) if err != nil { return err } - g.RegisterFunction(i[j]) + g.RegisterFunction(e) } return nil } @@ -159,3 +166,31 @@ func NewTransformGroup[I any, O any](transform TransformFunc[I, O], options ...S }, options...) return g } + +func transformF[I any, O any](ctx context.Context, inputs iter.Seq[I], transform TransformFunc[I, O], ordered bool, options ...StoreOption) (result []O, err error) { + grp := NewTransformGroup[I, O](transform, options...) + err = grp.InputSequence(ctx, inputs) + if err != nil { + return + } + err = grp.Transform(ctx) + if err != nil { + return + } + if ordered { + result, err = grp.OrderedOutputs(ctx) + } else { + result, err = grp.Outputs(ctx) + } + return +} + +// Transform transforms inputs into outputs using the transform function. +func Transform[I any, O any](ctx context.Context, inputs iter.Seq[I], transform TransformFunc[I, O], options ...StoreOption) ([]O, error) { + return transformF[I, O](ctx, inputs, transform, false, options...) +} + +// TransformInOrder transforms inputs into outputs using the transform function but returns the output in the same order as the input. +func TransformInOrder[I any, O any](ctx context.Context, inputs iter.Seq[I], transform TransformFunc[I, O], options ...StoreOption) ([]O, error) { + return transformF[I, O](ctx, inputs, transform, true, options...) +} diff --git a/utils/parallelisation/transform_test.go b/utils/parallelisation/transform_test.go index 457822581d..c7121a150d 100644 --- a/utils/parallelisation/transform_test.go +++ b/utils/parallelisation/transform_test.go @@ -2,6 +2,7 @@ package parallelisation import ( "context" + "slices" "strconv" "testing" @@ -47,6 +48,9 @@ func TestNewTransformGroup(t *testing.T) { o, err = g.Outputs(context.Background()) require.NoError(t, err) assert.ElementsMatch(t, in, o) + o, err = Transform[string, int](context.Background(), slices.Values(in2), tr, RetainAfterExecution, Parallel) + require.NoError(t, err) + assert.ElementsMatch(t, in, o) o, err = g.OrderedOutputs(context.Background()) require.NoError(t, err) assert.Empty(t, o) @@ -55,6 +59,9 @@ func TestNewTransformGroup(t *testing.T) { o, err = g.OrderedOutputs(context.Background()) require.NoError(t, err) assert.Equal(t, in, o) + o, err = TransformInOrder[string, int](context.Background(), slices.Values(in2), tr, RetainAfterExecution, Parallel) + require.NoError(t, err) + assert.Equal(t, in, o) err = g.Inputs(context.Background(), in2...) require.NoError(t, err) assert.Equal(t, 2*numberOfInput, g.Len())