Skip to content

Commit

Permalink
try reducing locks
Browse files Browse the repository at this point in the history
  • Loading branch information
razor-87 committed Nov 2, 2023
1 parent d818876 commit 4d61fd3
Showing 1 changed file with 41 additions and 22 deletions.
63 changes: 41 additions & 22 deletions errsizedgroup.go
Expand Up @@ -2,6 +2,7 @@ package syncs

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand All @@ -15,13 +16,12 @@ type ErrSizedGroup struct {
wg sync.WaitGroup
sema Locker

termCancel func()
terminated func() bool
canceled func() bool
terminated func() bool
termCancel func()

err *MultiError
errLock sync.RWMutex
errOnce sync.Once
err *MultiError
errCh chan error
}

// NewErrSizedGroup makes wait group with limited size alive goroutines.
Expand All @@ -32,22 +32,17 @@ func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup {
res := ErrSizedGroup{
sema: NewSemaphore(size),
err: new(MultiError),
errCh: make(chan error, size),
terminated: func() bool { return false },
}

for _, opt := range options {
opt(&res.options)
}
if res.ctx == nil {
res.ctx = context.Background()
}
if res.termOnError {
res.ctx, res.termCancel = context.WithCancel(res.ctx)
res.terminated = func() bool { // terminated will be true if any error happened before
res.errLock.RLock()
defer res.errLock.RUnlock()
return res.err.ErrorOrNil() != nil
}
}

res.canceled = func() bool {
select {
case <-res.ctx.Done():
Expand All @@ -56,6 +51,24 @@ func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup {
return false
}
}
if res.termOnError {
res.terminated = func() bool { // terminated will be true if any error happened before
return !res.err.isEmpty() || len(res.errCh) != 0
}
res.ctx, res.termCancel = context.WithCancel(res.ctx)
}

go func() {
var ctxError bool
for err := range res.errCh {
if !ctxError {
res.err.append(err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
ctxError = true // don't repeat this error
}
}
}
}()

return &res
}
Expand All @@ -64,11 +77,10 @@ func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup {
// The first call to return a non-nil error cancels the group if termOnError; its error will be
// returned by Wait. If no termOnError all errors will be collected in multierror.
func (g *ErrSizedGroup) Go(f func(ctx context.Context) error) {
if g.canceled() && (!g.termOnError || g.err.len() == 0) {
g.errOnce.Do(func() {
// don't repeat this error
g.err.append(g.ctx.Err())
})
if g.canceled() {
if !g.terminated() {
g.errCh <- g.ctx.Err()
}
return
}

Expand Down Expand Up @@ -110,8 +122,10 @@ func (g *ErrSizedGroup) Go(f func(ctx context.Context) error) {
isLocked = true
}

if err := f(g.ctx); err != nil && !g.canceled() {
g.err.append(err)
if !g.canceled() && !g.terminated() {
if err := f(g.ctx); err != nil {
g.errCh <- err
}
}
}()
}
Expand All @@ -120,6 +134,7 @@ func (g *ErrSizedGroup) Go(f func(ctx context.Context) error) {
// returns all errors (if any) wrapped with multierror from them.
func (g *ErrSizedGroup) Wait() error {
g.wg.Wait()
close(g.errCh)
return g.err.ErrorOrNil()
}

Expand All @@ -132,7 +147,7 @@ type MultiError struct {

// ErrorOrNil returns nil if no errors or multierror if errors occurred
func (m *MultiError) ErrorOrNil() error {
if m.len() == 0 {
if m.isEmpty() {
return nil
}
m.makeStr()
Expand All @@ -155,14 +170,18 @@ func (m *MultiError) append(err error) {
m.lock.Unlock()
}

func (m *MultiError) isEmpty() bool {
return m.len() == 0
}

func (m *MultiError) len() int {
m.lock.RLock()
defer m.lock.RUnlock()
return len(m.errors)
}

func (m *MultiError) makeStr() {
lenErrors := len(m.errors)
lenErrors := m.len()
m.str = fmt.Sprintf("%d error(s) occurred: ", lenErrors)
errs := make([]string, lenErrors)
for i := range m.errors {
Expand Down

0 comments on commit 4d61fd3

Please sign in to comment.