-
Notifications
You must be signed in to change notification settings - Fork 3
/
output_console_box.go
145 lines (126 loc) · 4.75 KB
/
output_console_box.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package steps
import (
"flag"
"fmt"
"io"
"sync"
"time"
"github.com/antongulenko/golib"
"github.com/antongulenko/golib/gotermBox"
"github.com/bitflow-stream/go-bitflow/bitflow"
log "github.com/sirupsen/logrus"
)
const ConsoleBoxEndpoint = bitflow.EndpointType("box")
var (
ConsoleBoxSettings = gotermBox.CliLogBox{
NoUtf8: false,
LogLines: 10,
MessageBuffer: 500,
}
ConsoleBoxUpdateInterval = 500 * time.Millisecond
ConsoleBoxMinUpdateInterval = 50 * time.Millisecond
// ConsoleBoxOutputTestMode is a flag used by tests to suppress initialization routines
// that are not testable. It is a hack to keep the EndpointFactory easy to use
// while making it testable.
ConsoleBoxOutputTestMode = false
)
func RegisterConsoleBoxOutput(e *bitflow.EndpointFactory) {
var factory consoleBoxFactory
e.CustomDataSinks[ConsoleBoxEndpoint] = factory.createConsoleBox
e.CustomOutputFlags = append(e.CustomOutputFlags, factory.registerFlags)
}
type consoleBoxFactory struct {
ConsoleBoxNoImmediateScreenUpdate bool
}
func (factory *consoleBoxFactory) registerFlags(f *flag.FlagSet) {
f.BoolVar(&factory.ConsoleBoxNoImmediateScreenUpdate, "slow-screen-updates", false, fmt.Sprintf("For console box output, don't update the screen on every sample, but only in intervals of %v", ConsoleBoxUpdateInterval))
}
func (factory *consoleBoxFactory) createConsoleBox(target string) (bitflow.SampleProcessor, error) {
if target != bitflow.StdTransportTarget {
return nil, fmt.Errorf("Transport '%v' can only be defined with target '%v' (received '%v')", ConsoleBoxEndpoint, bitflow.StdTransportTarget, target)
}
sink := &ConsoleBoxSink{
CliLogBoxTask: gotermBox.CliLogBoxTask{
CliLogBox: ConsoleBoxSettings,
UpdateInterval: ConsoleBoxUpdateInterval,
MinUpdateInterval: ConsoleBoxMinUpdateInterval,
},
ImmediateScreenUpdate: !factory.ConsoleBoxNoImmediateScreenUpdate,
}
if !ConsoleBoxOutputTestMode {
sink.Init()
}
return sink, nil
}
// ConsoleBoxSink implements the SampleSink interface by printing the received
// samples to the standard out. Contrary to the ConsoleSink, the screen is erased
// before printing a new sample, and the output is embedded in a box that shows
// the last lines of log output at the bottom. ConsoleBoxSink does not implement
// MarshallingSampleSink, because it uses its own, fixed marshaller.
//
// Multiple embedded fields provide access to configuration options.
//
// Init() must be called as early as possible when using ConsoleBoxSink, to make
// sure that all log messages are capture and none are overwritten by the box.
type ConsoleBoxSink struct {
bitflow.AbstractSampleOutput
gotermBox.CliLogBoxTask
// ImmediateScreenUpdate causes the console box to be updated immediately
// whenever a sample is received by this ConsoleBoxSink. Otherwise, the screen
// will be updated in regular intervals based on the settings in CliLogBoxTask.
ImmediateScreenUpdate bool
lock sync.Mutex
lastSample *bitflow.Sample
lastHeader *bitflow.Header
}
// Implement the bitflow.ConsoleSampleSink interface
func (sink *ConsoleBoxSink) WritesToConsole() bool {
return true
}
// String implements the SampleSink interface.
func (sink *ConsoleBoxSink) String() string {
return "ConsoleBoxSink"
}
// Start implements the SampleSink interface. It starts a goroutine
// that regularly refreshes the screen to display the current sample values
// and latest log output lines.
func (sink *ConsoleBoxSink) Start(wg *sync.WaitGroup) golib.StopChan {
log.Println("Printing samples to table")
sink.CliLogBoxTask.Update = sink.updateBox
return sink.CliLogBoxTask.Start(wg)
}
func (sink *ConsoleBoxSink) updateBox(out io.Writer, textWidth int) error {
sink.lock.Lock()
sample := sink.lastSample
header := sink.lastHeader
sink.lock.Unlock()
if sample == nil || header == nil {
return nil
}
return bitflow.TextMarshaller{
TextWidth: textWidth,
}.WriteSample(sample, header, sample.NumTags() > 0, out)
}
// Close implements the SampleSink interface. It stops the screen refresh
// goroutine.
func (sink *ConsoleBoxSink) Close() {
sink.CliLogBoxTask.Stop()
sink.CloseSink()
}
// Stop shadows the Stop() method from gotermBox.CliLogBoxTask to make sure
// that this SampleSink is actually closed in the Close() method.
func (sink *ConsoleBoxSink) Stop() {
}
// Sample implements the SampleSink interface. The latest sample is stored
// and displayed on the console on the next screen refresh. Intermediate
// samples might get lost without being displayed.
func (sink *ConsoleBoxSink) Sample(sample *bitflow.Sample, header *bitflow.Header) error {
sink.lock.Lock()
sink.lastSample = sample
sink.lastHeader = header
if sink.ImmediateScreenUpdate {
sink.TriggerUpdate()
}
sink.lock.Unlock()
return sink.AbstractSampleOutput.Sample(nil, sample, header)
}