-
Notifications
You must be signed in to change notification settings - Fork 0
/
event.go
105 lines (89 loc) · 1.65 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package do
import (
"context"
)
type (
C = context.Context
E = error
)
type PipeFunc[I, O any] func(C, I) (O, E)
// Pipe is a pipe run the PipeFuncs in order
func Pipe[B, D, A, R any](
ctx C,
b B,
before PipeFunc[B, D],
do PipeFunc[D, A],
after PipeFunc[A, R],
) (r R, err E) {
// 1
d, err := before(ctx, b)
if err != nil {
return
}
// 2
a, err := do(ctx, d)
if err != nil {
return
}
// 3
r, err = after(ctx, a)
if err != nil {
return
}
return r, nil
}
// Event do something with input I, handle result with success or failed
func Event[I, O, R any](
ctx C,
param I,
do PipeFunc[I, O],
success PipeFunc[O, R],
failed PipeFunc[E, R],
) (r R, err E) {
o, berr := do(ctx, param)
if berr != nil {
r, err = failed(ctx, berr)
} else {
r, err = success(ctx, o)
}
return
}
type (
EventFunc[I, O, R any] func(
ctx C,
param I,
do PipeFunc[I, O],
success PipeFunc[O, R],
failed PipeFunc[E, R],
) (r R, err E)
EventEntity[I, O, R any] struct {
Param I
Do PipeFunc[I, O]
Success PipeFunc[O, R]
Failed PipeFunc[E, R]
Handler func(R, E)
}
)
var (
_ EventFunc[int, int, int] = Event[int, int, int]
)
func EventLoop[I, O, R any](ctx C, n int) (chan<- EventEntity[I, O, R], chan<- struct{}) {
innerch := make(chan EventEntity[I, O, R], n)
stopch := make(chan struct{}, 1)
go func() {
defer func() {
close(innerch)
close(stopch)
}()
for {
select {
case event := <-innerch:
r, err := Event(ctx, event.Param, event.Do, event.Success, event.Failed)
event.Handler(r, err)
case <-stopch:
return
}
}
}()
return (chan<- EventEntity[I, O, R])(innerch), (chan<- struct{})(stopch)
}