-
Notifications
You must be signed in to change notification settings - Fork 3
/
output_tcp_text.go
143 lines (126 loc) · 4.75 KB
/
output_tcp_text.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package steps
import (
"fmt"
"io"
"regexp"
"strings"
"github.com/bitflow-stream/go-bitflow/bitflow"
"github.com/bitflow-stream/go-bitflow/script/reg"
)
func RegisterGraphiteOutput(b reg.ProcessorRegistry) {
factory := &SimpleTextMarshallerFactory{
Description: "graphite",
NameFixer: strings.NewReplacer("/", ".", " ", "_", "\t", "_", "\n", "_").Replace,
WriteValue: func(name string, val float64, sample *bitflow.Sample, writer io.Writer) error {
_, err := fmt.Fprintf(writer, "%v %v %v\n", name, val, sample.Time.Unix())
return err
},
}
b.RegisterStep("graphite", factory.createTcpOutput, "Send metrics and/or tags to the given Graphite endpoint. Required parameter: 'target'. Optional: 'prefix'")
}
func RegisterOpentsdbOutput(b reg.ProcessorRegistry) {
const max_opentsdb_tags = 8
nameReplacer := strings.NewReplacer("/", ".") // Convention for bitflow metric names uses slashes, while OpenTSDB uses dots
illegalChars := regexp.MustCompile("[^\\p{L}\\d-_./]") // \p{L} matches Unicode letters, \d matches digits. The listed characters are legal, and the entire set is negated.
replacementString := "_"
factory := &SimpleTextMarshallerFactory{
Description: "opentsdb",
NameFixer: func(in string) string {
in = nameReplacer.Replace(in)
return illegalChars.ReplaceAllLiteralString(in, replacementString)
},
WriteValue: func(name string, val float64, sample *bitflow.Sample, writer io.Writer) error {
_, err := fmt.Fprintf(writer, "put %v %v %f", name, sample.Time.Unix(), val)
addedTags := 0
for _, tag := range sample.SortedTags() {
key := illegalChars.ReplaceAllLiteralString(tag.Key, replacementString)
val := illegalChars.ReplaceAllLiteralString(tag.Value, replacementString)
_, err = fmt.Fprintf(writer, " %s=%s", key, val)
addedTags++
if err != nil || addedTags >= max_opentsdb_tags {
break
}
}
if err == nil && addedTags == 0 {
_, err = fmt.Fprintf(writer, " bitflow=true") // Add an artificial tag, because at least one tag is required
}
if err == nil {
_, err = writer.Write([]byte{'\n'})
}
return err
},
}
b.RegisterStep("opentsdb", factory.createTcpOutput, "Send metrics and/or tags to the given OpenTSDB endpoint.",
reg.VariableParams())
}
var _ bitflow.Marshaller = new(SimpleTextMarshaller)
type SimpleTextMarshallerFactory struct {
Description string
NameFixer func(string) string
WriteValue func(name string, val float64, sample *bitflow.Sample, writer io.Writer) error
}
func (f *SimpleTextMarshallerFactory) createTcpOutput(p *bitflow.SamplePipeline, params map[string]string) error {
target, hasTarget := params["target"]
if !hasTarget {
return reg.ParameterError("target", fmt.Errorf("Missing required parameter"))
}
prefix := params["prefix"]
delete(params, "target")
delete(params, "prefix")
sink, err := _make_tcp_output(params)
if err == nil {
sink.Endpoint = target
sink.SetMarshaller(&SimpleTextMarshaller{
MetricPrefix: prefix,
Description: f.Description,
NameFixer: f.NameFixer,
WriteValue: f.WriteValue,
})
p.Add(sink)
}
return err
}
func _make_tcp_output(params map[string]string) (*bitflow.TCPSink, error) {
if err := bitflow.DefaultEndpointFactory.ParseParameters(params); err != nil {
return nil, fmt.Errorf("Error parsing parameters: %v", err)
}
output, err := bitflow.DefaultEndpointFactory.CreateOutput("tcp://-") // Create empty TCP output, will only be used as template with configuration values
if err != nil {
return nil, fmt.Errorf("Error creating template TCP output: %v", err)
}
tcpOutput, ok := output.(*bitflow.TCPSink)
if !ok {
return nil, fmt.Errorf("Error creating template file output, received wrong type: %T", output)
}
return tcpOutput, nil
}
type SimpleTextMarshaller struct {
Description string
MetricPrefix string
NameFixer func(string) string
WriteValue func(name string, val float64, sample *bitflow.Sample, writer io.Writer) error
}
// ShouldCloseAfterFirstSample defines that text streams can stream without closing
func (SimpleTextMarshaller) ShouldCloseAfterFirstSample() bool {
return false
}
func (o *SimpleTextMarshaller) String() string {
return fmt.Sprintf("%s(prefix: %v)", o.Description, o.MetricPrefix)
}
func (o *SimpleTextMarshaller) WriteHeader(header *bitflow.Header, hasTags bool, writer io.Writer) error {
// No separate header
return nil
}
func (o *SimpleTextMarshaller) WriteSample(sample *bitflow.Sample, header *bitflow.Header, hasTags bool, writer io.Writer) error {
prefix := o.MetricPrefix
if prefix != "" {
prefix = bitflow.ResolveTagTemplate(prefix, "_", sample)
}
for i, value := range sample.Values {
name := o.NameFixer(prefix + header.Fields[i])
if err := o.WriteValue(name, float64(value), sample, writer); err != nil {
return err
}
}
return nil
}