From 4d61fd3cb1ab0f2750d47a7fd15e330db6f9e0db Mon Sep 17 00:00:00 2001 From: Sergei Razorenov Date: Mon, 30 Oct 2023 10:34:51 +0300 Subject: [PATCH] try reducing locks --- errsizedgroup.go | 63 +++++++++++++++++++++++++++++++----------------- 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/errsizedgroup.go b/errsizedgroup.go index 0317cc2..6b637e2 100644 --- a/errsizedgroup.go +++ b/errsizedgroup.go @@ -2,6 +2,7 @@ package syncs import ( "context" + "errors" "fmt" "strings" "sync" @@ -15,13 +16,12 @@ type ErrSizedGroup struct { wg sync.WaitGroup sema Locker - termCancel func() - terminated func() bool canceled func() bool + terminated func() bool + termCancel func() - err *MultiError - errLock sync.RWMutex - errOnce sync.Once + err *MultiError + errCh chan error } // NewErrSizedGroup makes wait group with limited size alive goroutines. @@ -32,22 +32,17 @@ func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup { res := ErrSizedGroup{ sema: NewSemaphore(size), err: new(MultiError), + errCh: make(chan error, size), terminated: func() bool { return false }, } + for _, opt := range options { opt(&res.options) } if res.ctx == nil { res.ctx = context.Background() } - if res.termOnError { - res.ctx, res.termCancel = context.WithCancel(res.ctx) - res.terminated = func() bool { // terminated will be true if any error happened before - res.errLock.RLock() - defer res.errLock.RUnlock() - return res.err.ErrorOrNil() != nil - } - } + res.canceled = func() bool { select { case <-res.ctx.Done(): @@ -56,6 +51,24 @@ func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup { return false } } + if res.termOnError { + res.terminated = func() bool { // terminated will be true if any error happened before + return !res.err.isEmpty() || len(res.errCh) != 0 + } + res.ctx, res.termCancel = context.WithCancel(res.ctx) + } + + go func() { + var ctxError bool + for err := range res.errCh { + if !ctxError { + res.err.append(err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + ctxError = true // don't repeat this error + } + } + } + }() return &res } @@ -64,11 +77,10 @@ func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup { // The first call to return a non-nil error cancels the group if termOnError; its error will be // returned by Wait. If no termOnError all errors will be collected in multierror. func (g *ErrSizedGroup) Go(f func(ctx context.Context) error) { - if g.canceled() && (!g.termOnError || g.err.len() == 0) { - g.errOnce.Do(func() { - // don't repeat this error - g.err.append(g.ctx.Err()) - }) + if g.canceled() { + if !g.terminated() { + g.errCh <- g.ctx.Err() + } return } @@ -110,8 +122,10 @@ func (g *ErrSizedGroup) Go(f func(ctx context.Context) error) { isLocked = true } - if err := f(g.ctx); err != nil && !g.canceled() { - g.err.append(err) + if !g.canceled() && !g.terminated() { + if err := f(g.ctx); err != nil { + g.errCh <- err + } } }() } @@ -120,6 +134,7 @@ func (g *ErrSizedGroup) Go(f func(ctx context.Context) error) { // returns all errors (if any) wrapped with multierror from them. func (g *ErrSizedGroup) Wait() error { g.wg.Wait() + close(g.errCh) return g.err.ErrorOrNil() } @@ -132,7 +147,7 @@ type MultiError struct { // ErrorOrNil returns nil if no errors or multierror if errors occurred func (m *MultiError) ErrorOrNil() error { - if m.len() == 0 { + if m.isEmpty() { return nil } m.makeStr() @@ -155,6 +170,10 @@ func (m *MultiError) append(err error) { m.lock.Unlock() } +func (m *MultiError) isEmpty() bool { + return m.len() == 0 +} + func (m *MultiError) len() int { m.lock.RLock() defer m.lock.RUnlock() @@ -162,7 +181,7 @@ func (m *MultiError) len() int { } func (m *MultiError) makeStr() { - lenErrors := len(m.errors) + lenErrors := m.len() m.str = fmt.Sprintf("%d error(s) occurred: ", lenErrors) errs := make([]string, lenErrors) for i := range m.errors {