forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
signal.go
178 lines (148 loc) · 4.2 KB
/
signal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package outputs
import (
"github.com/elastic/beats/libbeat/logp"
"sync/atomic"
)
// Signaler signals the completion of potentially asynchronous output operation.
// Completed is called by the output plugin when all events have been sent. On
// failure or if only a subset of the data is published then Failed will be
// invoked.
type Signaler interface {
Completed()
Failed()
}
// ChanSignal will send outputer signals on configurable channel.
type ChanSignal struct {
ch chan bool
}
// SyncSignal blocks waiting for a signal.
type SyncSignal struct {
ch chan bool
}
// SplitSignal guards one output signaler from multiple calls
// by using a simple reference counting scheme. If one Signaler consumer
// reports a Failed event, the Failed event will be send to the guarded Signaler
// once the reference count becomes zero.
//
// Example use cases:
// - Push signaler to multiple outputers
// - split data to be send in smaller batches
type SplitSignal struct {
count int32
failed bool
signaler Signaler
}
// CompositeSignal combines multiple signalers into one Signaler forwarding an event to
// to all signalers.
type CompositeSignal struct {
signalers []Signaler
}
// NewChanSignal create a new ChanSignal forwarding signals to a channel.
func NewChanSignal(ch chan bool) *ChanSignal { return &ChanSignal{ch} }
// Completed sends true to the confiugred channel.
func (c *ChanSignal) Completed() { c.ch <- true }
// Failed sends false to the confiugred channel.
func (c *ChanSignal) Failed() { c.ch <- false }
// NewSyncSignal create a new SyncSignal signaler. Use Wait() method to wait for
// a signal from the publisher
func NewSyncSignal() *SyncSignal { return &SyncSignal{make(chan bool, 1)} }
// Wait blocks waiting for a signal from the outputer. Wait return true if
// Completed was signaled and false if a Failed signal was received
func (s *SyncSignal) Wait() bool { return <-s.ch }
// Completed sends true to the process waiting for a signal.
func (s *SyncSignal) Completed() { s.ch <- true }
// Failed sends false to the process waiting for a signal.
func (s *SyncSignal) Failed() { s.ch <- false }
// NewSplitSignaler creates a new SplitSignal if s is not nil.
// If s is nil, nil will be returned. The count is the number of events to be
// received before publishing the final event to the guarded Signaler.
func NewSplitSignaler(
s Signaler,
count int,
) Signaler {
if s == nil {
return nil
}
return &SplitSignal{
count: int32(count),
signaler: s,
}
}
// Completed signals a Completed event to s.
func (s *SplitSignal) Completed() {
s.onEvent()
}
// Failed signals a Failed event to s.
func (s *SplitSignal) Failed() {
s.failed = true
s.onEvent()
}
func (s *SplitSignal) onEvent() {
res := atomic.AddInt32(&s.count, -1)
if res == 0 {
if s.failed {
s.signaler.Failed()
} else {
s.signaler.Completed()
}
}
}
// NewCompositeSignaler creates a new composite signaler.
func NewCompositeSignaler(signalers ...Signaler) Signaler {
if len(signalers) == 0 {
return nil
}
return &CompositeSignal{signalers}
}
// Completed sends the Completed signal to all signalers.
func (cs *CompositeSignal) Completed() {
for _, s := range cs.signalers {
if s != nil {
s.Completed()
}
}
}
// Failed sends the Failed signal to all signalers.
func (cs *CompositeSignal) Failed() {
for _, s := range cs.signalers {
if s != nil {
s.Failed()
}
}
}
// SignalCompleted sends the Completed event to s if s is not nil.
func SignalCompleted(s Signaler) {
if s != nil {
s.Completed()
}
}
// SignalFailed sends the Failed event to s if s is not nil
func SignalFailed(s Signaler, err error) {
if err != nil {
logp.Err("Error sending/writing event: %s", err)
}
if s != nil {
s.Failed()
}
}
// Signal will send the Completed or Failed event to s depending
// on err being set if s is not nil.
func Signal(s Signaler, err error) {
if err != nil {
logp.Info("Failed to send event %s", err)
}
if s != nil {
if err == nil {
s.Completed()
} else {
s.Failed()
}
}
}
// SignalAll send the Completed or Failed event to all given signalers
// depending on err being set.
func SignalAll(signalers []Signaler, err error) {
if signalers != nil {
Signal(NewCompositeSignaler(signalers...), err)
}
}