-
Notifications
You must be signed in to change notification settings - Fork 3
/
sample_source.go
113 lines (96 loc) · 3.72 KB
/
sample_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package bitflow
import (
"sync"
"github.com/antongulenko/golib"
)
// SampleSource is the interface used for producing Headers and Samples.
// It should start producing samples in a separate goroutine when Start() is
// called, and should stop all goroutines when Close() is called. Before Start()
// is called, SetSink() must be called to inform the SampleSource about the SampleSink
// it should output the Headers/Samples into.
// After all samples have been generated (for example because the data source is
// finished, like a file, or because Close() has been called) the Close() method must be
// called on the outgoing SampleProcessor. After calling Close(), no more headers or samples
// are allowed to go into the SampleProcessor.
// See the golib.Task interface for info about the Start() method.
type SampleSource interface {
golib.Startable
String() string
SetSink(sink SampleProcessor)
GetSink() SampleProcessor
Close()
}
// AbstractSampleSource is a partial implementation of SampleSource that stores
// the SampleProcessor and closes the outgoing SampleProcessor after all samples
// have been generated.
type AbstractSampleSource struct {
out SampleProcessor
}
// SetSink implements the SampleSource interface.
func (s *AbstractSampleSource) SetSink(sink SampleProcessor) {
s.out = sink
}
// GetSink implements the SampleSource interface.
func (s *AbstractSampleSource) GetSink() SampleProcessor {
return s.out
}
// CloseSink closes the subsequent SampleProcessor. It must be called after the
// receiving AbstractSampleSource has finished producing samples.
func (s *AbstractSampleSource) CloseSink() {
if s.out != nil {
s.out.Close()
}
}
// CloseSinkParallel closes the subsequent SampleProcessor in a concurrent goroutine, which is registered
// in the WaitGroup. This can be useful compared to CloseSink() in certain cases
// to avoid deadlocks due to long-running Close() invocations. As a general rule of thumb,
// Implementations of SampleSource should use CloseSinkParallel(), while SampleProcessors should simply use CloseSink().
func (s *AbstractSampleSource) CloseSinkParallel(wg *sync.WaitGroup) {
if s.out != nil {
wg.Add(1)
go func() {
defer wg.Done()
s.out.Close()
}()
}
}
// UnmarshallingSampleSource extends SampleSource and adds a configuration setter
// that gives access to the samples that are read by this data source.
type UnmarshallingSampleSource interface {
SampleSource
SetSampleHandler(handler ReadSampleHandler)
}
// AbstractUnmarshallingSampleSource extends AbstractSampleSource by adding
// configuration fields required for unmarshalling samples.
type AbstractUnmarshallingSampleSource struct {
AbstractSampleSource
// Reader configures aspects of parallel reading and parsing. See SampleReader for more info.
Reader SampleReader
}
// SetSampleHandler implements the UnmarshallingSampleSource interface
func (s *AbstractUnmarshallingSampleSource) SetSampleHandler(handler ReadSampleHandler) {
s.Reader.Handler = handler
}
// EmptySampleSource implements SampleSource but does not generate any samples.
// It is used in cases where a source is required but no real implementation is available.
type EmptySampleSource struct {
AbstractSampleSource
wg *sync.WaitGroup
}
// Start implements the golib.Task interface.
func (s *EmptySampleSource) Start(wg *sync.WaitGroup) (_ golib.StopChan) {
s.wg = wg
return
}
// Close implements the SampleSource interface.
func (s *EmptySampleSource) Close() {
s.CloseSinkParallel(s.wg)
}
// String implements the golib.Task interface.
func (s *EmptySampleSource) String() string {
return "empty sample source"
}
// SetSampleHandler implements the UnmarshallingSampleSource interface.
func (s *EmptySampleSource) SetSampleHandler(handler ReadSampleHandler) {
// Do nothing
}