-
Notifications
You must be signed in to change notification settings - Fork 0
/
serializer_async.go
53 lines (42 loc) · 892 Bytes
/
serializer_async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package iter
import "github.com/friedenberg/zit/src/alfa/schnittstellen"
type AsyncSerializer[T any] struct {
chError <-chan error
chE chan<- T
chDone <-chan struct{}
}
func MakeAsyncSerializer[T any](
wf schnittstellen.FuncIter[T],
) AsyncSerializer[T] {
chError := make(chan error)
chE := make(chan T)
chDone := make(chan struct{})
go func(chError chan<- error, chE <-chan T, chDone chan<- struct{}) {
defer func() {
chDone <- struct{}{}
}()
for e := range chE {
if err := wf(e); err != nil {
chError <- err
}
}
}(chError, chE, chDone)
return AsyncSerializer[T]{
chError: chError,
chE: chE,
chDone: chDone,
}
}
func (s AsyncSerializer[T]) Do(e T) (err error) {
select {
case err = <-s.chError:
case s.chE <- e:
}
return
}
func (s AsyncSerializer[T]) Wait() (err error) {
err = <-s.chError
close(s.chE)
<-s.chDone
return
}