-
Notifications
You must be signed in to change notification settings - Fork 166
/
wavefront.go
111 lines (92 loc) · 3.23 KB
/
wavefront.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// SPDX-License-Identifier: MIT OR Apache-2.0
package outputs
import (
"fmt"
"log"
"strings"
"github.com/DataDog/datadog-go/statsd"
"github.com/falcosecurity/falcosidekick/types"
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
)
// NewWavefrontClient returns a new output.Client for accessing the Wavefront API.
func NewWavefrontClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {
var sender wavefront.Sender
var err error
batchSize := config.Wavefront.BatchSize
if batchSize < 1 {
batchSize = 10000 // Defaults to 10000
}
flushInterval := config.Wavefront.FlushIntervalSeconds
if flushInterval < 1 {
flushInterval = 1 // Defaults to 1s
}
switch config.Wavefront.EndpointType {
case "direct":
server := fmt.Sprintf("https://%s@%s", config.Wavefront.EndpointToken, config.Wavefront.EndpointHost)
sender, err = wavefront.NewSender(
server,
wavefront.BatchSize(batchSize),
wavefront.FlushIntervalSeconds(flushInterval),
)
case "proxy":
sender, err = wavefront.NewSender(
config.Wavefront.EndpointHost,
wavefront.MetricsPort(config.Wavefront.EndpointMetricPort),
)
default:
return nil, fmt.Errorf("failed to configure wavefront sender: invalid type %s", config.Wavefront.EndpointType)
}
if err != nil {
return nil, fmt.Errorf("failed to configure wavefront %s sender: %s", config.Wavefront.EndpointType, err)
}
return &Client{
OutputType: "Wavefront",
Config: config,
WavefrontSender: &sender,
Stats: stats,
PromStats: promStats,
StatsdClient: statsdClient,
DogstatsdClient: dogstatsdClient,
}, nil
}
// WavefrontPost sends metrics to WaveFront.
func (c *Client) WavefrontPost(falcopayload types.FalcoPayload) {
tags := make(map[string]string)
tags["severity"] = falcopayload.Priority.String()
tags["rule"] = falcopayload.Rule
tags["source"] = falcopayload.Source
if falcopayload.Hostname != "" {
tags[Hostname] = falcopayload.Hostname
}
for tag, value := range falcopayload.OutputFields {
switch v := value.(type) {
case string:
tags[tag] = v
default:
continue
}
}
if len(falcopayload.Tags) != 0 {
tags["tags"] = strings.Join(falcopayload.Tags, ", ")
}
c.Stats.Wavefront.Add(Total, 1)
if c.WavefrontSender != nil {
sender := *c.WavefrontSender
// TODO: configurable metric name
if err := sender.SendMetric(c.Config.Wavefront.MetricName, 1, falcopayload.Time.UnixNano(), "falco-exporter", tags); err != nil {
c.Stats.Wavefront.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "wavefront", "status": Error}).Inc()
log.Printf("[ERROR] : Wavefront - Unable to send event %s: %s\n", falcopayload.Rule, err)
return
}
if err := sender.Flush(); err != nil {
c.Stats.Wavefront.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "wavefront", "status": Error}).Inc()
log.Printf("[ERROR] : Wavefront - Unable to flush event %s: %s\n", falcopayload.Rule, err)
return
}
c.Stats.Wavefront.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "wavefront", "status": OK}).Inc()
log.Printf("[INFO] : Wavefront - Send Event OK %s\n", falcopayload.Rule)
}
}