Skip to content
This repository has been archived by the owner on Jul 7, 2023. It is now read-only.

Commit

Permalink
Reorganize files
Browse files Browse the repository at this point in the history
  • Loading branch information
earthboundkid committed Jun 21, 2023
1 parent f4d67a9 commit daa7c70
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 63 deletions.
60 changes: 9 additions & 51 deletions do.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,13 @@
package workgroup

import (
"github.com/carlmjohnson/deque"
)

// MaxProcs means use GOMAXPROCS workers when doing tasks.
const MaxProcs = -1

// Manager is a function that serially examines Task results to see if it produced any new Inputs.
// Returning false will halt the processing of future tasks.
type Manager[Input, Output any] func(Input, Output, error) (tasks []Input, ok bool)

// Task is a function that can concurrently transform an input into an output.
type Task[Input, Output any] func(in Input) (out Output, err error)

// DoTasks does tasks using n concurrent workers (or GOMAXPROCS workers if n < 1)
// which produce output consumed by a serially run manager.
// The manager should return a slice of new task inputs based on prior task results,
// or return false to halt processing.
// If a task panics during execution,
// Do starts n concurrent workers (or GOMAXPROCS workers if n < 1)
// that execute each function.
// Errors returned by a function do not halt execution,
// but are joined into a multierror return value.
// If a function panics during execution,
// the panic will be caught and rethrown in the main Goroutine.
func DoTasks[Input, Output any](n int, task Task[Input, Output], manager Manager[Input, Output], initial ...Input) {
in, out := start(n, task)
defer func() {
close(in)
// drain any waiting tasks
for range out {
}
}()
queue := deque.Of(initial...)
inflight := 0
for inflight > 0 || queue.Len() > 0 {
inch := in
item, ok := queue.Head()
if !ok {
inch = nil
}
select {
case inch <- item:
inflight++
queue.PopHead()
case r := <-out:
inflight--
if r.Panic != nil {
panic(r.Panic)
}
items, ok := manager(r.In, r.Out, r.Err)
if !ok {
return
}
queue.Append(items...)
}
}
func Do(n int, fns ...func() error) error {
return DoAll(n, fns, func(in func() error) error {
return in()
})
}
12 changes: 0 additions & 12 deletions tasks.go → do_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,3 @@ func DoAll[Input any](n int, items []Input, task func(Input) error) error {

return errors.Join(errs...)
}

// Do starts n concurrent workers (or GOMAXPROCS workers if n < 1)
// that execute each function.
// Errors returned by a function do not halt execution,
// but are joined into a multierror return value.
// If a function panics during execution,
// the panic will be caught and rethrown in the main Goroutine.
func Do(n int, fns ...func() error) error {
return DoAll(n, fns, func(in func() error) error {
return in()
})
}
52 changes: 52 additions & 0 deletions do_tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package workgroup

import (
"github.com/carlmjohnson/deque"
)

// Manager is a function that serially examines Task results to see if it produced any new Inputs.
// Returning false will halt the processing of future tasks.
type Manager[Input, Output any] func(Input, Output, error) (tasks []Input, ok bool)

// Task is a function that can concurrently transform an input into an output.
type Task[Input, Output any] func(in Input) (out Output, err error)

// DoTasks does tasks using n concurrent workers (or GOMAXPROCS workers if n < 1)
// which produce output consumed by a serially run manager.
// The manager should return a slice of new task inputs based on prior task results,
// or return false to halt processing.
// If a task panics during execution,
// the panic will be caught and rethrown in the main Goroutine.
func DoTasks[Input, Output any](n int, task Task[Input, Output], manager Manager[Input, Output], initial ...Input) {
in, out := start(n, task)
defer func() {
close(in)
// drain any waiting tasks
for range out {
}
}()
queue := deque.Of(initial...)
inflight := 0
for inflight > 0 || queue.Len() > 0 {
inch := in
item, ok := queue.Head()
if !ok {
inch = nil
}
select {
case inch <- item:
inflight++
queue.PopHead()
case r := <-out:
inflight--
if r.Panic != nil {
panic(r.Panic)
}
items, ok := manager(r.In, r.Out, r.Err)
if !ok {
return
}
queue.Append(items...)
}
}
}
3 changes: 3 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
// Package workgroup contains generic concurrent task runners.
package workgroup

// MaxProcs means use GOMAXPROCS workers when doing tasks.
const MaxProcs = -1
File renamed without changes.

0 comments on commit daa7c70

Please sign in to comment.