This repository has been archived by the owner on Jan 21, 2022. It is now read-only.
/
datadog_firehose_nozzle.go
144 lines (124 loc) · 4.06 KB
/
datadog_firehose_nozzle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package datadogfirehosenozzle
import (
"crypto/tls"
"time"
"code.cloudfoundry.org/localip"
"github.com/cloudfoundry-incubator/datadog-firehose-nozzle/datadogclient"
"github.com/cloudfoundry-incubator/datadog-firehose-nozzle/nozzleconfig"
"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/noaa/consumer"
noaaerrors "github.com/cloudfoundry/noaa/errors"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gorilla/websocket"
)
type DatadogFirehoseNozzle struct {
config *nozzleconfig.NozzleConfig
errs <-chan error
messages <-chan *events.Envelope
authTokenFetcher AuthTokenFetcher
consumer *consumer.Consumer
client *datadogclient.Client
log *gosteno.Logger
}
type AuthTokenFetcher interface {
FetchAuthToken() string
}
func NewDatadogFirehoseNozzle(config *nozzleconfig.NozzleConfig, tokenFetcher AuthTokenFetcher, log *gosteno.Logger) *DatadogFirehoseNozzle {
return &DatadogFirehoseNozzle{
config: config,
authTokenFetcher: tokenFetcher,
log: log,
}
}
func (d *DatadogFirehoseNozzle) Start() error {
var authToken string
if !d.config.DisableAccessControl {
authToken = d.authTokenFetcher.FetchAuthToken()
}
d.log.Info("Starting DataDog Firehose Nozzle...")
d.createClient()
d.consumeFirehose(authToken)
err := d.postToDatadog()
d.log.Info("DataDog Firehose Nozzle shutting down...")
return err
}
func (d *DatadogFirehoseNozzle) createClient() {
ipAddress, err := localip.LocalIP()
if err != nil {
panic(err)
}
d.client = datadogclient.New(
d.config.DataDogURL,
d.config.DataDogAPIKey,
d.config.MetricPrefix,
d.config.Deployment,
ipAddress,
time.Duration(d.config.DataDogTimeoutSeconds)*time.Second,
d.config.FlushMaxBytes,
d.log,
)
}
func (d *DatadogFirehoseNozzle) consumeFirehose(authToken string) {
d.consumer = consumer.New(
d.config.TrafficControllerURL,
&tls.Config{InsecureSkipVerify: d.config.InsecureSSLSkipVerify},
nil)
d.consumer.SetIdleTimeout(time.Duration(d.config.IdleTimeoutSeconds) * time.Second)
d.messages, d.errs = d.consumer.Firehose(d.config.FirehoseSubscriptionID, authToken)
}
func (d *DatadogFirehoseNozzle) postToDatadog() error {
ticker := time.NewTicker(time.Duration(d.config.FlushDurationSeconds) * time.Second)
for {
select {
case <-ticker.C:
d.postMetrics()
case envelope := <-d.messages:
if !d.keepMessage(envelope) {
continue
}
d.handleMessage(envelope)
d.client.AddMetric(envelope)
case err := <-d.errs:
d.handleError(err)
return err
}
}
}
func (d *DatadogFirehoseNozzle) postMetrics() {
err := d.client.PostMetrics()
if err != nil {
d.log.Fatalf("FATAL ERROR: %s\n\n", err)
}
}
func (d *DatadogFirehoseNozzle) handleError(err error) {
if retryErr, ok := err.(noaaerrors.RetryError); ok {
err = retryErr.Err
}
switch closeErr := err.(type) {
case *websocket.CloseError:
switch closeErr.Code {
case websocket.CloseNormalClosure:
// no op
case websocket.ClosePolicyViolation:
d.log.Errorf("Error while reading from the firehose: %v", err)
d.log.Errorf("Disconnected because nozzle couldn't keep up. Please try scaling up the nozzle.")
d.client.AlertSlowConsumerError()
default:
d.log.Errorf("Error while reading from the firehose: %v", err)
}
default:
d.log.Errorf("Error while reading from the firehose: %v", err)
}
d.log.Infof("Closing connection with traffic controller due to %v", err)
d.consumer.Close()
d.postMetrics()
}
func (d *DatadogFirehoseNozzle) keepMessage(envelope *events.Envelope) bool {
return d.config.DeploymentFilter == "" || d.config.DeploymentFilter == envelope.GetDeployment()
}
func (d *DatadogFirehoseNozzle) handleMessage(envelope *events.Envelope) {
if envelope.GetEventType() == events.Envelope_CounterEvent && envelope.CounterEvent.GetName() == "TruncatingBuffer.DroppedMessages" && envelope.GetOrigin() == "doppler" {
d.log.Infof("We've intercepted an upstream message which indicates that the nozzle or the TrafficController is not keeping up. Please try scaling up the nozzle.")
d.client.AlertSlowConsumerError()
}
}