-
Notifications
You must be signed in to change notification settings - Fork 0
/
spooler.go
151 lines (126 loc) · 4.18 KB
/
spooler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package beater
import (
"sync"
"time"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/logp"
)
var debugf = logp.MakeDebug("spooler")
// channelSize is the number of events Channel can buffer before blocking will occur.
const channelSize = 16
// Spooler aggregates the events and sends the aggregated data to the publisher.
type Spooler struct {
Channel chan *input.FileEvent // Channel is the input to the Spooler.
// Config
idleTimeout time.Duration // How often to flush the spooler if spoolSize is not reached.
spoolSize uint64 // Maximum number of events that are stored before a flush occurs.
exit chan struct{} // Channel used to signal shutdown.
nextFlushTime time.Time // Scheduled time of the next flush.
publisher chan<- []*input.FileEvent // Channel used to publish events.
spool []*input.FileEvent // FileEvents being held by the Spooler.
wg sync.WaitGroup // WaitGroup used to control the shutdown.
}
// NewSpooler creates and returns a new Spooler. The returned Spooler must be
// started by calling Start before it can be used.
func NewSpooler(
config cfg.FilebeatConfig,
publisher chan<- []*input.FileEvent,
) *Spooler {
spoolSize := config.SpoolSize
if spoolSize <= 0 {
spoolSize = cfg.DefaultSpoolSize
debugf("Spooler will use the default spool_size of %d", spoolSize)
}
idleTimeout := config.IdleTimeout
if idleTimeout <= 0 {
idleTimeout = cfg.DefaultIdleTimeout
debugf("Spooler will use the default idle_timeout of %s", idleTimeout)
}
return &Spooler{
Channel: make(chan *input.FileEvent, channelSize),
idleTimeout: idleTimeout,
spoolSize: spoolSize,
exit: make(chan struct{}),
nextFlushTime: time.Now().Add(idleTimeout),
publisher: publisher,
spool: make([]*input.FileEvent, 0, spoolSize),
}
}
// Start starts the Spooler. Stop must be called to stop the Spooler.
func (s *Spooler) Start() {
s.wg.Add(1)
go s.run()
}
// run queues events that it reads from Channel and flushes them when either the
// queue reaches its capacity (which is spoolSize) or a timeout period elapses.
func (s *Spooler) run() {
defer s.wg.Done()
ticker := time.NewTicker(s.idleTimeout / 2)
logp.Info("Starting spooler: spool_size: %v; idle_timeout: %s",
s.spoolSize, s.idleTimeout)
loop:
for {
select {
case <-s.exit:
ticker.Stop()
break loop
case event := <-s.Channel:
if event != nil {
s.queue(event)
}
case <-ticker.C:
s.timedFlush()
}
}
// Drain any events that may remain in Channel.
for e := range s.Channel {
s.queue(e)
}
debugf("Flushing events from spooler at shutdown")
s.flush()
}
// Stop stops this Spooler. This method blocks until all events have been
// flushed to the publisher. The method should only be invoked one time after
// Start has been invoked.
func (s *Spooler) Stop() {
logp.Info("Stopping spooler")
// Signal to the run method that it should stop.
close(s.exit)
// Stop accepting writes. Any events in the channel will be flushed.
close(s.Channel)
// Wait for the flush to complete.
s.wg.Wait()
debugf("Spooler has stopped")
}
// queue queues a single event to be spooled. If the queue reaches spoolSize
// while calling this method then all events in the queue will be flushed to
// the publisher.
func (s *Spooler) queue(event *input.FileEvent) {
s.spool = append(s.spool, event)
if len(s.spool) == cap(s.spool) {
debugf("Flushing spooler because spooler full. Events flushed: %v", len(s.spool))
s.flush()
}
}
// timedFlush flushes the events in the queue if a flush has not occurred
// for a period of time greater than idleTimeout.
func (s *Spooler) timedFlush() {
if time.Now().After(s.nextFlushTime) {
debugf("Flushing spooler because of timeout. Events flushed: %v", len(s.spool))
s.flush()
}
}
// flush flushes all events to the publisher.
func (s *Spooler) flush() {
if len(s.spool) > 0 {
// copy buffer
tmpCopy := make([]*input.FileEvent, len(s.spool))
copy(tmpCopy, s.spool)
// clear buffer
s.spool = s.spool[:0]
// send
s.publisher <- tmpCopy
}
s.nextFlushTime = time.Now().Add(s.idleTimeout)
}