Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/20250820002654.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: `[parallelisation]` Added new groups (ContextualFunctionGroup) and new Store options to configure the execution (number of workers, single execution, etc.)
1 change: 1 addition & 0 deletions changes/20250820140853.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: `[parallelisation]` Added new compound execution group to support nested execution groups
249 changes: 13 additions & 236 deletions utils/parallelisation/cancel_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,245 +5,23 @@

package parallelisation

import (
"context"
import "context"

"github.com/sasha-s/go-deadlock"
"golang.org/x/sync/errgroup"

"github.com/ARM-software/golang-utils/utils/commonerrors"
"github.com/ARM-software/golang-utils/utils/reflection"
)

type StoreOptions struct {
clearOnExecution bool
stopOnFirstError bool
sequential bool
reverse bool
joinErrors bool
}
type StoreOption func(*StoreOptions) *StoreOptions

// StopOnFirstError stops store execution on first error.
var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.stopOnFirstError = true
o.joinErrors = false
return o
}

// JoinErrors will collate any errors which happened when executing functions in store.
// This option should not be used in combination to StopOnFirstError.
var JoinErrors StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.stopOnFirstError = false
o.joinErrors = true
return o
}

// ExecuteAll executes all functions in the store even if an error is raised. the first error raised is then returned.
var ExecuteAll StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.stopOnFirstError = false
return o
}

// ClearAfterExecution clears the store after execution.
var ClearAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.clearOnExecution = true
return o
}

// RetainAfterExecution keep the store intact after execution (no reset).
var RetainAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.clearOnExecution = false
return o
}

// Parallel ensures every function registered in the store is executed concurrently in the order they were registered.
var Parallel StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.sequential = false
return o
}

// Sequential ensures every function registered in the store is executed sequentially in the order they were registered.
var Sequential StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.sequential = true
return o
}

// SequentialInReverse ensures every function registered in the store is executed sequentially but in the reverse order they were registered.
var SequentialInReverse StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.sequential = true
o.reverse = true
return o
}

func newFunctionStore[T any](executeFunc func(context.Context, T) error, options ...StoreOption) *store[T] {

opts := &StoreOptions{}

for i := range options {
opts = options[i](opts)
}
return &store[T]{
mu: deadlock.RWMutex{},
functions: make([]T, 0),
executeFunc: executeFunc,
options: *opts,
}
}

type store[T any] struct {
mu deadlock.RWMutex
functions []T
executeFunc func(ctx context.Context, element T) error
options StoreOptions
}

func (s *store[T]) RegisterFunction(function ...T) {
defer s.mu.Unlock()
s.mu.Lock()
s.functions = append(s.functions, function...)
}

func (s *store[T]) Len() int {
defer s.mu.RUnlock()
s.mu.RLock()
return len(s.functions)
}

func (s *store[T]) Execute(ctx context.Context) (err error) {
defer s.mu.Unlock()
s.mu.Lock()
if reflection.IsEmpty(s.executeFunc) {
return commonerrors.New(commonerrors.ErrUndefined, "the store was not initialised correctly")
}

if s.options.sequential {
err = s.executeSequentially(ctx, s.options.stopOnFirstError, s.options.reverse, s.options.joinErrors)
} else {
err = s.executeConcurrently(ctx, s.options.stopOnFirstError, s.options.joinErrors)
}

if err == nil && s.options.clearOnExecution {
s.functions = make([]T, 0, len(s.functions))
}
return
}

func (s *store[T]) executeConcurrently(ctx context.Context, stopOnFirstError bool, collateErrors bool) error {
g, gCtx := errgroup.WithContext(ctx)
if !stopOnFirstError {
gCtx = ctx
}
funcNum := len(s.functions)
errCh := make(chan error, funcNum)
g.SetLimit(funcNum)
for i := range s.functions {
g.Go(func() error {
_, subErr := s.executeFunction(gCtx, s.functions[i])
errCh <- subErr
return subErr
})
}
err := g.Wait()
close(errCh)
if collateErrors {
collateErr := make([]error, funcNum)
i := 0
for subErr := range errCh {
collateErr[i] = subErr
i++
}
err = commonerrors.Join(collateErr...)
}

return err
type CancelFunctionStore struct {
ExecutionGroup[context.CancelFunc]
}

func (s *store[T]) executeSequentially(ctx context.Context, stopOnFirstError, reverse, collateErrors bool) (err error) {
err = DetermineContextError(ctx)
if err != nil {
return
}
funcNum := len(s.functions)
collateErr := make([]error, funcNum)
if reverse {
for i := funcNum - 1; i >= 0; i-- {
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
collateErr[funcNum-i-1] = subErr
if shouldBreak {
err = subErr
return
}
if subErr != nil && err == nil {
err = subErr
if stopOnFirstError {
return
}
}
}
} else {
for i := range s.functions {
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
collateErr[i] = subErr
if shouldBreak {
err = subErr
return
}
if subErr != nil && err == nil {
err = subErr
if stopOnFirstError {
return
}
}
}
}

if collateErrors {
err = commonerrors.Join(collateErr...)
}
return
func (s *CancelFunctionStore) RegisterCancelFunction(cancel ...context.CancelFunc) {
s.ExecutionGroup.RegisterFunction(cancel...)
}

func (s *store[T]) executeFunction(ctx context.Context, element T) (mustBreak bool, err error) {
err = DetermineContextError(ctx)
if err != nil {
mustBreak = true
func (s *CancelFunctionStore) RegisterCancelStore(store *CancelFunctionStore) {
if store == nil {
return
}
err = s.executeFunc(ctx, element)
return
}

type CancelFunctionStore struct {
store[context.CancelFunc]
}

func (s *CancelFunctionStore) RegisterCancelFunction(cancel ...context.CancelFunc) {
s.store.RegisterFunction(cancel...)
s.RegisterCancelFunction(func() {
store.Cancel()
})
}

// Cancel will execute the cancel functions in the store. Any errors will be ignored and Execute() is recommended if you need to know if a cancellation failed
Expand All @@ -252,15 +30,14 @@ func (s *CancelFunctionStore) Cancel() {
}

func (s *CancelFunctionStore) Len() int {
return s.store.Len()
return s.ExecutionGroup.Len()
}

// NewCancelFunctionsStore creates a store for cancel functions. Whatever the options passed, all cancel functions will be executed and cleared. In other words, options `RetainAfterExecution` and `StopOnFirstError` would be discarded if selected to create the Cancel store
func NewCancelFunctionsStore(options ...StoreOption) *CancelFunctionStore {
return &CancelFunctionStore{
store: *newFunctionStore[context.CancelFunc](func(_ context.Context, cancelFunc context.CancelFunc) error {
cancelFunc()
return nil
ExecutionGroup: *NewExecutionGroup[context.CancelFunc](func(ctx context.Context, cancelFunc context.CancelFunc) error {
return WrapCancelToContextualFunc(cancelFunc)(ctx)
}, append(options, ClearAfterExecution, ExecuteAll)...),
}
}
30 changes: 21 additions & 9 deletions utils/parallelisation/cancel_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/ARM-software/golang-utils/utils/commonerrors"
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
Expand All @@ -19,24 +20,35 @@ func testCancelStore(t *testing.T, store *CancelFunctionStore) {
t.Helper()
require.NotNil(t, store)
// Set up some fake CancelFuncs to make sure they are called
called1 := false
called2 := false
called1 := atomic.NewBool(false)
called2 := atomic.NewBool(false)
called3 := atomic.NewBool(false)

cancelFunc1 := func() {
called1 = true
called1.Store(true)
}
cancelFunc2 := func() {
called2 = true
called2.Store(true)
}
cancelFunc3 := func() {
called3.Store(true)
}
subStore := NewCancelFunctionsStore()
subStore.RegisterCancelFunction(cancelFunc3)

store.RegisterCancelFunction(cancelFunc1, cancelFunc2)
store.RegisterCancelStore(subStore)
store.RegisterCancelStore(nil)

assert.Equal(t, 2, store.Len())
assert.False(t, called1)
assert.False(t, called2)
assert.Equal(t, 3, store.Len())
assert.False(t, called1.Load())
assert.False(t, called2.Load())
assert.False(t, called3.Load())
store.Cancel()

assert.True(t, called1)
assert.True(t, called2)
assert.True(t, called1.Load())
assert.True(t, called2.Load())
assert.True(t, called3.Load())
}

// Given a CancelFunctionsStore
Expand Down
41 changes: 41 additions & 0 deletions utils/parallelisation/contextual.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package parallelisation

import (
"context"

"github.com/ARM-software/golang-utils/utils/commonerrors"
)

// DetermineContextError determines what the context error is if any.
func DetermineContextError(ctx context.Context) error {
return commonerrors.ConvertContextError(ctx.Err())
}

type ContextualFunc func(ctx context.Context) error

type ContextualFunctionGroup struct {
ExecutionGroup[ContextualFunc]
}

// NewContextualGroup returns a group executing contextual functions.
func NewContextualGroup(options ...StoreOption) *ContextualFunctionGroup {
return &ContextualFunctionGroup{
ExecutionGroup: *NewExecutionGroup[ContextualFunc](func(ctx context.Context, contextualF ContextualFunc) error {
return contextualF(ctx)
}, options...),
}
}

// ForEach executes all the contextual functions according to the store options and returns an error if one occurred.
func ForEach(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error {
group := NewContextualGroup(ExecuteAll(executionOptions).Options()...)
group.RegisterFunction(contextualFunc...)
return group.Execute(ctx)
}

// BreakOnError executes each functions in the group until an error is found or the context gets cancelled.
func BreakOnError(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error {
group := NewContextualGroup(StopOnFirstError(executionOptions).Options()...)
group.RegisterFunction(contextualFunc...)
return group.Execute(ctx)
}
Loading
Loading