Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/20251117175436.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: `[parallelisation]` Added Transform helpers to ease the creation `transform` operations
1 change: 1 addition & 0 deletions changes/20251117175538.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: `[parallelisation]` Added `BreakOnErrorOrEOF` to help performing actions in parallel but for which `EOF` is considered as termination rather than an error
6 changes: 6 additions & 0 deletions utils/parallelisation/contextual.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package parallelisation

import (
"context"
"io"

"github.com/ARM-software/golang-utils/utils/commonerrors"
)
Expand Down Expand Up @@ -55,3 +56,8 @@ func BreakOnError(ctx context.Context, executionOptions *StoreOptions, contextua
group.RegisterFunction(contextualFunc...)
return group.Execute(ctx)
}

// BreakOnErrorOrEOF is similar to BreakOnError but also stops on EOF. However, in this case, no error is returned
func BreakOnErrorOrEOF(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error {
return commonerrors.Ignore(BreakOnError(ctx, executionOptions, contextualFunc...), commonerrors.ErrEOF, io.EOF)
}
14 changes: 14 additions & 0 deletions utils/parallelisation/contextual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parallelisation
import (
"context"
"errors"
"io"
"testing"
"time"

Expand Down Expand Up @@ -35,6 +36,7 @@ func TestForEach(t *testing.T) {
closeError := commonerrors.ErrUnexpected
errortest.AssertError(t, ForEach(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
errortest.AssertError(t, BreakOnError(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
errortest.AssertError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
})

t.Run("close with cancellation", func(t *testing.T) {
Expand All @@ -43,11 +45,23 @@ func TestForEach(t *testing.T) {
cancel()
errortest.AssertError(t, ForEach(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled)
errortest.AssertError(t, BreakOnError(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled)
errortest.AssertError(t, BreakOnErrorOrEOF(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled)
})

t.Run("break on error with no error", func(t *testing.T) {
require.NoError(t, BreakOnError(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)))
})
t.Run("break on error or EOF with no error", func(t *testing.T) {
require.NoError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)))
})
t.Run("break on error or EOF with no error", func(t *testing.T) {
require.NoError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), func(_ context.Context) error {
return commonerrors.ErrEOF
}))
require.NoError(t, BreakOnErrorOrEOF(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), func(_ context.Context) error {
return io.EOF
}))
})
t.Run("for each with no error", func(t *testing.T) {
require.NoError(t, ForEach(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)))
})
Expand Down
39 changes: 37 additions & 2 deletions utils/parallelisation/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package parallelisation

import (
"context"
"iter"
"slices"

"go.uber.org/atomic"

Expand Down Expand Up @@ -101,12 +103,17 @@ func (g *TransformGroup[I, O]) appendResult(o *resultElement[O]) {

// Inputs registers inputs to transform.
func (g *TransformGroup[I, O]) Inputs(ctx context.Context, i ...I) error {
for j := range i {
return g.InputSequence(ctx, slices.Values(i))
}

// InputSequence registers inputs to transform.
func (g *TransformGroup[I, O]) InputSequence(ctx context.Context, i iter.Seq[I]) error {
for e := range i {
err := DetermineContextError(ctx)
if err != nil {
return err
}
g.RegisterFunction(i[j])
g.RegisterFunction(e)
}
return nil
}
Expand Down Expand Up @@ -159,3 +166,31 @@ func NewTransformGroup[I any, O any](transform TransformFunc[I, O], options ...S
}, options...)
return g
}

func transformF[I any, O any](ctx context.Context, inputs iter.Seq[I], transform TransformFunc[I, O], ordered bool, options ...StoreOption) (result []O, err error) {
grp := NewTransformGroup[I, O](transform, options...)
err = grp.InputSequence(ctx, inputs)
if err != nil {
return
}
err = grp.Transform(ctx)
if err != nil {
return
}
if ordered {
result, err = grp.OrderedOutputs(ctx)
} else {
result, err = grp.Outputs(ctx)
}
return
}

// Transform transforms inputs into outputs using the transform function.
func Transform[I any, O any](ctx context.Context, inputs iter.Seq[I], transform TransformFunc[I, O], options ...StoreOption) ([]O, error) {
return transformF[I, O](ctx, inputs, transform, false, options...)
}

// TransformInOrder transforms inputs into outputs using the transform function but returns the output in the same order as the input.
func TransformInOrder[I any, O any](ctx context.Context, inputs iter.Seq[I], transform TransformFunc[I, O], options ...StoreOption) ([]O, error) {
return transformF[I, O](ctx, inputs, transform, true, options...)
}
7 changes: 7 additions & 0 deletions utils/parallelisation/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package parallelisation

import (
"context"
"slices"
"strconv"
"testing"

Expand Down Expand Up @@ -47,6 +48,9 @@ func TestNewTransformGroup(t *testing.T) {
o, err = g.Outputs(context.Background())
require.NoError(t, err)
assert.ElementsMatch(t, in, o)
o, err = Transform[string, int](context.Background(), slices.Values(in2), tr, RetainAfterExecution, Parallel)
require.NoError(t, err)
assert.ElementsMatch(t, in, o)
o, err = g.OrderedOutputs(context.Background())
require.NoError(t, err)
assert.Empty(t, o)
Expand All @@ -55,6 +59,9 @@ func TestNewTransformGroup(t *testing.T) {
o, err = g.OrderedOutputs(context.Background())
require.NoError(t, err)
assert.Equal(t, in, o)
o, err = TransformInOrder[string, int](context.Background(), slices.Values(in2), tr, RetainAfterExecution, Parallel)
require.NoError(t, err)
assert.Equal(t, in, o)
err = g.Inputs(context.Background(), in2...)
require.NoError(t, err)
assert.Equal(t, 2*numberOfInput, g.Len())
Expand Down
Loading