-
Notifications
You must be signed in to change notification settings - Fork 2k
/
monitor.go
172 lines (145 loc) · 3.92 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package monitor
import (
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
)
// Monitor provides a mechanism to stream logs using go-hclog
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor interface {
// Start returns a channel of log messages which are sent
// ever time a log message occurs
Start() <-chan []byte
// Stop de-registers the sink from the InterceptLogger
// and closes the log channels
Stop()
}
// monitor implements the Monitor interface
type monitor struct {
// protects droppedCount and logCh
sync.Mutex
sink log.SinkAdapter
// logger is the logger we will be monitoring
logger log.InterceptLogger
// logCh is a buffered chan where we send logs when streaming
logCh chan []byte
// doneCh coordinates the shutdown of logCh
doneCh chan struct{}
// droppedCount is the current count of messages
// that were dropped from the logCh buffer.
// only access under lock
droppedCount int
bufSize int
// droppedDuration is the amount of time we should
// wait to check for dropped messages. Defaults
// to 3 seconds
droppedDuration time.Duration
}
// New creates a new Monitor. Start must be called in order to actually start
// streaming logs
func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) Monitor {
return new(buf, logger, opts)
}
func new(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *monitor {
sw := &monitor{
logger: logger,
logCh: make(chan []byte, buf),
doneCh: make(chan struct{}, 1),
bufSize: buf,
droppedDuration: 3 * time.Second,
}
opts.Output = sw
sink := log.NewSinkAdapter(opts)
sw.sink = sink
return sw
}
// Stop deregisters the sink and stops the monitoring process
func (d *monitor) Stop() {
d.logger.DeregisterSink(d.sink)
close(d.doneCh)
}
// Start registers a sink on the monitor's logger and starts sending
// received log messages over the returned channel.
func (d *monitor) Start() <-chan []byte {
// register our sink with the logger
d.logger.RegisterSink(d.sink)
streamCh := make(chan []byte, d.bufSize)
// run a go routine that listens for streamed
// log messages and sends them to streamCh
go func() {
defer close(streamCh)
for {
select {
case log := <-d.logCh:
select {
case <-d.doneCh:
return
case streamCh <- log:
}
case <-d.doneCh:
return
}
}
}()
// run a go routine that periodically checks for
// dropped messages and makes room on the logCh
// to add a dropped message count warning
go func() {
// loop and check for dropped messages
for {
select {
case <-d.doneCh:
return
case <-time.After(d.droppedDuration):
d.Lock()
// Check if there have been any dropped messages.
if d.droppedCount > 0 {
dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
select {
case <-d.doneCh:
d.Unlock()
return
// Try sending dropped message count to logCh in case
// there is room in the buffer now.
case d.logCh <- []byte(dropped):
default:
// Drop a log message to make room for "Monitor dropped.." message
select {
case <-d.logCh:
d.droppedCount++
dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
default:
}
d.logCh <- []byte(dropped)
}
d.droppedCount = 0
}
// unlock after handling dropped message
d.Unlock()
}
}
}()
return streamCh
}
// Write attempts to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *monitor) Write(p []byte) (n int, err error) {
d.Lock()
defer d.Unlock()
// ensure logCh is still open
select {
case <-d.doneCh:
return
default:
}
bytes := make([]byte, len(p))
copy(bytes, p)
select {
case d.logCh <- bytes:
default:
d.droppedCount++
}
return len(p), nil
}