Skip to content
This repository has been archived by the owner on Jul 7, 2023. It is now read-only.

Commit

Permalink
DoAll: Better panic handling; reorganize tests
Browse files Browse the repository at this point in the history
  • Loading branch information
earthboundkid committed Jun 21, 2023
1 parent daa7c70 commit 04600bb
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 64 deletions.
23 changes: 17 additions & 6 deletions do_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,35 @@ package workgroup

import "errors"

type void = struct{}

// DoAll starts n concurrent workers (or GOMAXPROCS workers if n < 1)
// and processes each initial input as a task.
// Errors returned by a task do not halt execution,
// but are joined into a multierror return value.
// If a task panics during execution,
// the panic will be caught and rethrown in the main Goroutine.
func DoAll[Input any](n int, items []Input, task func(Input) error) error {
var recovered any
errs := make([]error, 0, len(items))
DoTasks(n, func(in Input) (void, error) {
return void{}, task(in)
}, func(_ Input, _ void, err error) ([]Input, bool) {
runner := func(in Input) (r any, err error) {
defer func() {
r = recover()
}()
err = task(in)
return
}
manager := func(_ Input, r any, err error) ([]Input, bool) {
if r != nil {
recovered = r
}
if err != nil {
errs = append(errs, err)
}
return nil, true
}, items...)
}
DoTasks(n, runner, manager, items...)

if recovered != nil {
panic(recovered)
}
return errors.Join(errs...)
}
7 changes: 7 additions & 0 deletions do_all_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package workgroup_test

import "testing"

func TestDoAll_err(t *testing.T) {

}
48 changes: 48 additions & 0 deletions do_tasks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package workgroup_test

import (
"errors"
"fmt"
"testing"
"time"

"github.com/carlmjohnson/workgroup"
)

func TestDoTasks_drainage(t *testing.T) {
const sleepTime = 10 * time.Millisecond
b := false
task := func(n int) (int, error) {
if n == 1 {
return 0, errors.New("text string")
}
time.Sleep(sleepTime)
b = true
return 0, nil
}
start := time.Now()
m := map[int]struct {
int
error
}{}
manager := func(in, out int, err error) ([]int, bool) {
m[in] = struct {
int
error
}{out, err}
if err != nil {
return nil, false
}
return nil, true
}
workgroup.DoTasks(5, task, manager, 0, 1)
if s := fmt.Sprint(m); s != "map[1:text string]" {
t.Fatal(s)
}
if time.Since(start) < sleepTime {
t.Fatal("didn't sleep enough")
}
if !b {
t.Fatal("didn't finish")
}
}
74 changes: 16 additions & 58 deletions do_test.go → panic_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package workgroup_test

import (
"errors"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/carlmjohnson/workgroup"
)

func try(f func()) (r any) {
defer func() {
r = recover()
}()
f()
return
}

func TestDoTasks_panic(t *testing.T) {
task := func(n int) (int, error) {
if n == 3 {
Expand All @@ -22,13 +28,9 @@ func TestDoTasks_panic(t *testing.T) {
triples = append(triples, triple)
return nil, true
}
var r any
func() {
defer func() {
r = recover()
}()
r := try(func() {
workgroup.DoTasks(1, task, manager, 1, 2, 3, 4)
}()
})
if r == nil {
t.Fatal("should have panicked")
}
Expand All @@ -44,12 +46,8 @@ func TestDoAll_panic(t *testing.T) {
var (
n atomic.Int64
err error
r any
)
func() {
defer func() {
r = recover()
}()
r := try(func() {
err = workgroup.DoAll(1, []int64{1, 2, 3},
func(delta int64) error {
if delta == 2 {
Expand All @@ -58,7 +56,7 @@ func TestDoAll_panic(t *testing.T) {
n.Add(delta)
return nil
})
}()
})
if err != nil {
t.Fatal("should have panicked")
}
Expand All @@ -68,7 +66,7 @@ func TestDoAll_panic(t *testing.T) {
if r != "boom" {
t.Fatal(r)
}
if n.Load() != 1 {
if n.Load() != 4 {
t.Fatal(n.Load())
}
}
Expand All @@ -77,10 +75,8 @@ func TestDo_panic(t *testing.T) {
var (
n atomic.Int64
err error
r any
)
func() {
defer func() { r = recover() }()
r := try(func() {
err = workgroup.Do(1,
func() error {
n.Add(1)
Expand All @@ -93,7 +89,7 @@ func TestDo_panic(t *testing.T) {
n.Add(1)
return nil
})
}()
})
if err != nil {
t.Fatal("should have panicked")
}
Expand All @@ -103,45 +99,7 @@ func TestDo_panic(t *testing.T) {
if r != "boom" {
t.Fatal(r)
}
if n.Load() != 1 {
if n.Load() != 2 {
t.Fatal(n.Load())
}
}

func TestDoTasks_drainage(t *testing.T) {
const sleepTime = 10 * time.Millisecond
b := false
task := func(n int) (int, error) {
if n == 1 {
return 0, errors.New("text string")
}
time.Sleep(sleepTime)
b = true
return 0, nil
}
start := time.Now()
m := map[int]struct {
int
error
}{}
manager := func(in, out int, err error) ([]int, bool) {
m[in] = struct {
int
error
}{out, err}
if err != nil {
return nil, false
}
return nil, true
}
workgroup.DoTasks(5, task, manager, 0, 1)
if s := fmt.Sprint(m); s != "map[1:text string]" {
t.Fatal(s)
}
if time.Since(start) < sleepTime {
t.Fatal("didn't sleep enough")
}
if !b {
t.Fatal("didn't finish")
}
}

0 comments on commit 04600bb

Please sign in to comment.