-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
monitor.go
128 lines (104 loc) · 2.73 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package monitor
import (
"errors"
log "github.com/hashicorp/go-hclog"
)
// Monitor provides a mechanism to stream logs using go-hclog
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor interface {
// Start returns a channel of log messages which are sent
// ever time a log message occurs
Start() <-chan []byte
// Stop deregisters the sink from the InterceptLogger and closes the log
// channels. This returns a count of the number of log messages that were
// dropped during streaming.
Stop() int
}
// monitor implements the Monitor interface
type monitor struct {
sink log.SinkAdapter
// logger is the logger we will be monitoring
logger log.InterceptLogger
// logCh is a buffered chan where we send logs when streaming
logCh chan []byte
// droppedCount is the current count of messages
// that were dropped from the logCh buffer.
droppedCount int
// doneCh coordinates the shutdown of logCh
doneCh chan struct{}
// Defaults to 512.
bufSize int
}
type Config struct {
BufferSize int
Logger log.InterceptLogger
LoggerOptions *log.LoggerOptions
}
// New creates a new Monitor. Start must be called in order to actually start
// streaming logs
func New(cfg Config) Monitor {
bufSize := cfg.BufferSize
if bufSize == 0 {
bufSize = 512
}
sw := &monitor{
logger: cfg.Logger,
logCh: make(chan []byte, bufSize),
doneCh: make(chan struct{}, 1),
bufSize: bufSize,
}
cfg.LoggerOptions.Output = sw
sink := log.NewSinkAdapter(cfg.LoggerOptions)
sw.sink = sink
return sw
}
// Stop deregisters the sink and stops the monitoring process
func (d *monitor) Stop() int {
d.logger.DeregisterSink(d.sink)
close(d.doneCh)
return d.droppedCount
}
// Start registers a sink on the monitor's logger and starts sending
// received log messages over the returned channel.
func (d *monitor) Start() <-chan []byte {
// register our sink with the logger
d.logger.RegisterSink(d.sink)
streamCh := make(chan []byte, d.bufSize)
// run a go routine that listens for streamed
// log messages and sends them to streamCh
go func() {
defer close(streamCh)
for {
select {
case log := <-d.logCh:
select {
case <-d.doneCh:
return
case streamCh <- log:
}
case <-d.doneCh:
return
}
}
}()
return streamCh
}
// Write attempts to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *monitor) Write(p []byte) (int, error) {
// Check whether we have been stopped
select {
case <-d.doneCh:
return 0, errors.New("monitor stopped")
default:
}
bytes := make([]byte, len(p))
copy(bytes, p)
select {
case d.logCh <- bytes:
default:
d.droppedCount++
}
return len(p), nil
}