-
Notifications
You must be signed in to change notification settings - Fork 29
/
splunk_metric.go
89 lines (77 loc) · 2.14 KB
/
splunk_metric.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package eventwriter
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"code.cloudfoundry.org/cfhttp"
"code.cloudfoundry.org/lager"
)
type splunkMetric struct {
httpClient *http.Client
config *SplunkConfig
}
func NewSplunkMetric(config *SplunkConfig) Writer {
httpClient := cfhttp.NewClient()
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.SkipSSL},
}
httpClient.Transport = tr
return &splunkMetric{
httpClient: httpClient,
config: config,
}
}
func (s *splunkMetric) Write(events []map[string]interface{}) (error, uint64) {
bodyBuffer := new(bytes.Buffer)
count := uint64(len(events))
for _, event := range events {
event["index"] = s.config.Index
eventJson, err := json.Marshal(event)
if err == nil {
bodyBuffer.Write(eventJson)
bodyBuffer.Write([]byte("\n\n"))
} else {
s.config.Logger.Error("Error marshalling event", err,
lager.Data{
"event": fmt.Sprintf("%+v", event),
},
)
}
}
bodyBytes := bodyBuffer.Bytes()
return s.send(&bodyBytes), count
}
func (s *splunkMetric) send(postBody *[]byte) error {
endpoint := fmt.Sprintf("%s/services/collector", s.config.Host)
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(*postBody))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Authorization", fmt.Sprintf("Splunk %s", s.config.Token))
//Add app headers for HEC telemetry
req.Header.Set("__splunk_app_name", "Splunk Firehose Nozzle")
req.Header.Set("__splunk_app_version", s.config.Version)
resp, err := s.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode > 299 {
responseBody, _ := ioutil.ReadAll(resp.Body)
return errors.New(fmt.Sprintf("Non-ok response code [%d] from splunk: %s", resp.StatusCode, responseBody))
} else {
//Draining the response buffer, so that the same connection can be reused the next time
_, err := io.Copy(ioutil.Discard, resp.Body)
if err != nil {
s.config.Logger.Error("Error discarding response body", err)
}
}
return nil
}