-
Notifications
You must be signed in to change notification settings - Fork 3
/
sample_processor.go
269 lines (235 loc) · 9.85 KB
/
sample_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
package bitflow
import (
"fmt"
"sync"
"github.com/antongulenko/golib"
log "github.com/sirupsen/logrus"
)
// SampleProcessor is the basic interface to receive and process samples.
// It receives Samples through the Sample method and
// sends samples to the subsequent SampleProcessor configured over SetSink. The forwarded Samples
// can be the same as received, completely new generated samples, and also a different
// number of Samples from the incoming ones. The Header can also be changed, but then
// the SampleProcessor implementation must take care to adjust the outgoing
// Samples accordingly. All required goroutines must be started in Start()
// and stopped when Close() is called. When Start() is called, it can be assumed that SetSink()
// has already been called to configure a non-nil subsequent SampleProcessor.
// As a special case, some SampleProcessor implementations output samples to external sinks
// like files or network connections. In this case, the incoming samples should usually be
// forwarded to the subsequent SampleProcessor without changes.
type SampleProcessor interface {
SampleSource
SampleSink
}
// AbstractSampleProcessor provides a few basic methods for implementations of
// SampleProcessor. It currently simply embeds the AbstractSampleSource type,
// but should be used instead of it to make the purpose more clear.
type AbstractSampleProcessor struct {
AbstractSampleSource
}
// AbstractSampleOutput is a partial implementation of SampleProcessor intended for
// processors that output samples to an external data sink (e.g. console, file, ...).
// Configuration variables are provided for controlling the error handling.
type AbstractSampleOutput struct {
AbstractSampleProcessor
// DontForwardSamples can be set to true to disable forwarding of received samples
// to the subsequent SampleProcessor.
DontForwardSamples bool
// DropOutputErrors can be set to true to make this AbstractSampleOutput ignore
// errors that occurred from outputting samples to byte streams like files or network Connections.
// In that case, such errors will be logged and the samples will be forwarded to subsequent
// processing steps.
DropOutputErrors bool
}
// Sample forwards the received header and sample the the subsequent SampleProcessor,
// unless the DontForwardSamples flag has been set. Actual implementations of SampleOutput
// should provide an implementation that writes the samples to some destination.
// The error parameter should be an error (possibly nil), that resulted from previously
// writing the sample to some byte stream output (like a file or network connection).
// Depending on the configuration of this AbstractSampleOutput, this error will be returned
// immediately or simply logged so that the sample can be forwarded to the subsequent processing step.
func (out *AbstractSampleOutput) Sample(err error, sample *Sample, header *Header) error {
if err != nil {
if out.DropOutputErrors {
log.Errorln(err)
} else {
return err
}
}
if out.DontForwardSamples {
return nil
}
return out.GetSink().Sample(sample, header)
}
// MarshallingSampleOutput is a SampleProcessor that outputs the received samples to a
// byte stream that is generated by a Marshaller instance.
type MarshallingSampleOutput interface {
SampleProcessor
// SetMarshaller must configure a valid instance of Marshaller before Start() is called.
// All received samples will be converted to a byte stream using the configured marshaller.
SetMarshaller(marshaller Marshaller)
}
// AbstractMarshallingSampleOutput is a partial implementation of MarshallingSampleOutput
// with a simple implementation of SetMarshaller().
type AbstractMarshallingSampleOutput struct {
AbstractSampleOutput
// Marshaller will be used when converting Samples to byte buffers before
// writing them to the given output stream.
Marshaller Marshaller
// Writer contains variables that control the marshalling and writing process.
// They must be configured before calling Start() on this AbstractSampleOutput.
Writer SampleWriter
}
// SetMarshaller implements the SampleOutput interface.
func (out *AbstractMarshallingSampleOutput) SetMarshaller(marshaller Marshaller) {
out.Marshaller = marshaller
}
// DroppingSampleProcessor implements the SampleProcessor interface by dropping any incoming
// samples.
type DroppingSampleProcessor struct {
AbstractSampleProcessor
}
// Start implements the golib.Task interface.
func (s *DroppingSampleProcessor) Start(wg *sync.WaitGroup) (_ golib.StopChan) {
return
}
// Close implements the SampleProcessor interface.
func (s *DroppingSampleProcessor) Close() {
s.CloseSink()
}
// String implements the golib.Task interface.
func (s *DroppingSampleProcessor) String() string {
return "dropping samples"
}
// Sample implements the SampleProcessor interface.
func (s *DroppingSampleProcessor) Sample(sample *Sample, header *Header) error {
return nil
}
// NoopProcessor is an empty implementation of SampleProcessor. It can be
// directly added to a SamplePipeline and will behave as a no-op processing step.
// Other implementations of SampleProcessor can embed this and override parts of
// the methods as required. No initialization is needed for this type, but an
// instance can only be used once, in one pipeline.
type NoopProcessor struct {
AbstractSampleProcessor
StopChan golib.StopChan
}
// Sample implements the SampleProcessor interface. It forwards the sample to the
// subsequent processor.
func (p *NoopProcessor) Sample(sample *Sample, header *Header) error {
return p.GetSink().Sample(sample, header)
}
// Start implements the SampleProcessor interface. It creates an error-channel
// with a small channel buffer. Calling CloseSink() or Error() writes a value
// to that channel to signalize that this NoopProcessor is finished.
func (p *NoopProcessor) Start(wg *sync.WaitGroup) golib.StopChan {
p.StopChan = golib.NewStopChan()
return p.StopChan
}
// CloseSink reports that this NoopProcessor is finished processing.
// All goroutines must be stopped, and all Headers and Samples must be already
// forwarded to the outgoing sink, when this is called. CloseSink forwards
// the Close() invocation to the outgoing sink.
func (p *NoopProcessor) CloseSink() {
// If there was no error, make sure to signal that this task is done.
p.StopChan.Stop()
p.AbstractSampleProcessor.CloseSink()
}
// Error reports that NoopProcessor has encountered an error and has stopped
// operation. After calling this, no more Headers and Samples can be forwarded
// to the outgoing sink. Ultimately, p.Close() will be called for cleaning up.
func (p *NoopProcessor) Error(err error) {
p.StopChan.StopErr(err)
}
// Close implements the SampleProcessor interface by closing the outgoing
// sink and internal golib.StopChan. Other types that embed NoopProcessor can override this to perform
// specific actions when closing, but CloseSink() should always be called in the end.
func (p *NoopProcessor) Close() {
p.CloseSink()
}
// String implements the SampleProcessor interface. This should be overridden
// by types that are embedding NoopProcessor.
func (p *NoopProcessor) String() string {
return "NoopProcessor"
}
// MergeableProcessor is an extension of SampleProcessor, that also allows
// merging two processor instances of the same time into one. Merging is only allowed
// when the result of the merge would has exactly the same functionality as using the
// two separate instances. This can be used as an optional optimization.
type MergeableProcessor interface {
SampleProcessor
MergeProcessor(other SampleProcessor) bool
}
// ResizingSampleProcessor is a helper interface that can be implemented by SampleProcessors
// in order to make RequiredValues() more reliable. The result of
// the OutputSampleSize() method should give a worst-case estimation of the number of values
// that will be present in Samples after this SampleProcessor is done processing a sample.
// This allows the optimization of pre-allocating a value array large enough to hold the final
// amount of metrics.
// The optimization works best when all samples are processed in a one-to-one fashion,
// i.e. no samples are split into multiple samples.
type ResizingSampleProcessor interface {
SampleProcessor
OutputSampleSize(sampleSize int) int
}
// RequiredValues the number of Values that should be large enough to hold
// the end-result after processing a Sample by all intermediate SampleProcessors.
// The result is based on ResizingSampleProcessor.OutputSampleSize(). SampleProcessor instances
// that do not implement the ResizingSampleProcessor interface are assumed to not increase the
// number metrics.
func RequiredValues(numFields int, sink SampleSink) int {
for {
if sink == nil {
break
}
if sink, ok := sink.(ResizingSampleProcessor); ok {
newSize := sink.OutputSampleSize(numFields)
if newSize > numFields {
numFields = newSize
}
}
if source, ok := sink.(SampleSource); ok {
sink = source.GetSink()
} else {
break
}
}
return numFields
}
type SimpleProcessor struct {
NoopProcessor
Description string
Process func(sample *Sample, header *Header) (*Sample, *Header, error)
OnClose func()
OutputSampleSizeFunc func(sampleSize int) int
}
func (p *SimpleProcessor) Sample(sample *Sample, header *Header) error {
if process := p.Process; process == nil {
return fmt.Errorf("%s: Process function is not set", p)
} else {
sample, header, err := process(sample, header)
if err == nil && sample != nil && header != nil {
err = p.NoopProcessor.Sample(sample, header)
}
return err
}
}
func (p *SimpleProcessor) Close() {
if c := p.OnClose; c != nil {
c()
}
p.NoopProcessor.Close()
}
func (p *SimpleProcessor) OutputSampleSize(sampleSize int) int {
if f := p.OutputSampleSizeFunc; f != nil {
return f(sampleSize)
}
return sampleSize
}
func (p *SimpleProcessor) String() string {
if p.Description == "" {
return "SimpleProcessor"
} else {
return p.Description
}
}