/
parallel.go
193 lines (169 loc) · 4.63 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Package parallel contains generic typesafe functions to manage concurrent logic of various kinds.
package parallel
import (
"context"
"fmt"
"sync"
"golang.org/x/sync/errgroup"
"github.com/bobg/go-generics/v3/iter"
)
// Error is an error type for wrapping errors returned from worker goroutines.
// It contains the worker number of the goroutine that produced the error.
type Error struct {
N int
Err error
}
func (e Error) Error() string {
return fmt.Sprintf("in goroutine %d: %s", e.N, e.Err)
}
func (e Error) Unwrap() error {
return e.Err
}
// Values produces a slice of n values using n parallel workers each running the function f.
//
// Each worker receives its worker number (in the range 0 through n-1).
//
// An error from any worker cancels them all.
// The first error is returned to the caller.
//
// The resulting slice has length n.
// The value at position i comes from worker i.
func Values[F ~func(context.Context, int) (T, error), T any](ctx context.Context, n int, f F) ([]T, error) {
g, ctx := errgroup.WithContext(ctx)
result := make([]T, n)
for i := 0; i < n; i++ {
i := i // Go loop var pitfall
g.Go(func() error {
val, err := f(ctx, i)
result[i] = val
if err != nil {
return Error{N: i, Err: err}
}
return nil
})
}
err := g.Wait()
return result, err
}
// Producers launches n parallel workers each running the function f.
//
// Each worker receives its worker number
// (in the range 0 through n-1)
// and a callback to use for producing a value.
// If the callback returns an error,
// the worker should exit with that error.
//
// The callback that the worker uses to produce a value may block
// until the caller is able to consume the value.
//
// An error from any worker cancels them all.
//
// The caller gets an iterator over the values produced.
func Producers[F ~func(context.Context, int, func(T) error) error, T any](ctx context.Context, n int, f F) iter.Of[T] {
ch := make(chan T)
g, innerCtx := errgroup.WithContext(ctx)
for i := 0; i < n; i++ {
i := i
g.Go(func() error {
return f(innerCtx, i, func(val T) error {
select {
case <-innerCtx.Done():
return Error{N: i, Err: innerCtx.Err()}
case ch <- val:
return nil
}
})
})
}
var err error
go func() {
err = g.Wait()
close(ch)
}()
return iter.FromChan(ch, iter.WithContext(ctx), iter.WithError(func() error { return err }))
}
// Consumers launches n parallel workers each consuming values supplied by the caller.
//
// When a value is available,
// an available worker calls the function f to consume it.
// This callback receives the worker's number
// (in the range 0 through n-1)
// and the value.
//
// The caller receives two callbacks:
// one for sending a value to the workers via an internal channel,
// and one for closing that channel,
// signaling the end of input and causing the workers to exit normally.
//
// The value-sending callback may block until a worker is available to consume the value.
//
// An error from any worker cancels them all.
// This error is returned from the close-channel callback.
// After any error, the value-sending callback will return an error.
// (Not the original error, however.
// For that, the caller should still invoke the close callback.)
func Consumers[F ~func(context.Context, int, T) error, T any](ctx context.Context, n int, f F) (func(T) error, func() error) {
ch := make(chan T, n)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < n; i++ {
i := i
g.Go(func() error {
for {
select {
case <-ctx.Done():
return Error{N: i, Err: ctx.Err()}
case val, ok := <-ch:
if !ok {
return nil
}
err := f(ctx, i, val)
if err != nil {
return Error{N: i, Err: err}
}
}
}
})
}
sendfn := func(val T) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- val:
return nil
}
}
closefn := func() error {
close(ch)
return g.Wait()
}
return sendfn, closefn
}
// Pool permits up to n concurrent calls to a function f.
// The caller receives a callback for requesting a worker from this pool.
// When no worker is available,
// the callback blocks until one becomes available.
// Then it invokes f and returns the result.
//
// Each call of the callback is synchronous.
// Any desired concurrency is the responsibility of the caller.
func Pool[F ~func(T) (U, error), T, U any](n int, f F) func(T) (U, error) {
var (
running int
mu sync.Mutex
cond = sync.NewCond(&mu)
)
return func(val T) (U, error) {
mu.Lock()
for running >= n {
cond.Wait()
}
running++
mu.Unlock()
result, err := f(val)
mu.Lock()
running--
cond.Signal()
mu.Unlock()
return result, err
}
}