-
Notifications
You must be signed in to change notification settings - Fork 13
/
goshim.go
144 lines (125 loc) · 2.94 KB
/
goshim.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package shim
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/circonus-labs/circonus-unified-agent/cua"
"github.com/circonus-labs/circonus-unified-agent/plugins/serializers/influx"
)
type empty struct{}
var (
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
`"`, `\"`,
`\`, `\\`,
)
)
const (
// PollIntervalDisabled is used to indicate that you want to disable polling,
// as opposed to duration 0 meaning poll constantly.
PollIntervalDisabled = time.Duration(0)
)
// Shim allows you to wrap your inputs and run them as if they were part of circonus-unified-agent,
// except built externally.
type Shim struct {
Input cua.Input
Processor cua.StreamingProcessor
Output cua.Output
log *Logger
// streams
stdin io.Reader
stdout io.Writer
stderr io.Writer
// outgoing metric channel
metricCh chan cua.Metric
// input only
gatherPromptCh chan empty
}
// New creates a new shim interface
func New() *Shim {
return &Shim{
metricCh: make(chan cua.Metric, 1),
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
log: NewLogger(),
}
}
func (s *Shim) watchForShutdown(cancel context.CancelFunc) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-quit // user-triggered quit
// cancel, but keep looping until the metric channel closes.
cancel()
}()
}
// Run the input plugins..
func (s *Shim) Run(pollInterval time.Duration) error {
switch {
case s.Input != nil:
err := s.RunInput(pollInterval)
if err != nil {
return fmt.Errorf("RunInput error: %w", err)
}
case s.Processor != nil:
err := s.RunProcessor()
if err != nil {
return fmt.Errorf("RunProcessor error: %w", err)
}
case s.Output != nil:
err := s.RunOutput()
if err != nil {
return fmt.Errorf("RunOutput error: %w", err)
}
default:
return fmt.Errorf("Nothing to run")
}
return nil
}
func hasQuit(ctx context.Context) bool {
return ctx.Err() != nil
}
func (s *Shim) writeProcessedMetrics() error {
serializer := influx.NewSerializer()
for m := range s.metricCh {
b, err := serializer.Serialize(m)
if err != nil {
return fmt.Errorf("failed to serialize metric: %w", err)
}
// Write this to stdout
fmt.Fprint(s.stdout, string(b))
}
// for {
// select {
// case m, open := <-s.metricCh:
// if !open {
// return nil
// }
// b, err := serializer.Serialize(m)
// if err != nil {
// return fmt.Errorf("failed to serialize metric: %s", err)
// }
// // Write this to stdout
// fmt.Fprint(s.stdout, string(b))
// }
// }
return nil
}
// LogName satisfies the MetricMaker interface
func (s *Shim) LogName() string {
return ""
}
// MakeMetric satisfies the MetricMaker interface
func (s *Shim) MakeMetric(m cua.Metric) cua.Metric {
return m // don't need to do anything to it.
}
// Log satisfies the MetricMaker interface
func (s *Shim) Log() cua.Logger {
return s.log
}