/
firehoseclient.go
130 lines (114 loc) · 4 KB
/
firehoseclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package firehoseclient
import (
"crypto/tls"
"time"
"github.com/cloudfoundry-community/go-cfclient"
"github.com/cloudfoundry/noaa/consumer"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gorilla/websocket"
"github.com/SumoLogic/sumologic-cloudfoundry-nozzle/eventRouting"
"github.com/SumoLogic/sumologic-cloudfoundry-nozzle/logging"
)
type FirehoseNozzle struct {
errs <-chan error
messages <-chan *events.Envelope
consumer *consumer.Consumer
eventRouting *eventRouting.EventRouting
config *FirehoseConfig
cfClient *cfclient.Client
}
type FirehoseConfig struct {
TrafficControllerURL string
InsecureSSLSkipVerify bool
IdleTimeoutSeconds time.Duration
FirehoseSubscriptionID string
}
type CfClientTokenRefresh struct {
cfClient *cfclient.Client
}
func NewFirehoseNozzle(cfClient *cfclient.Client, eventRouting *eventRouting.EventRouting, firehoseconfig *FirehoseConfig) *FirehoseNozzle {
return &FirehoseNozzle{
errs: make(<-chan error),
messages: make(<-chan *events.Envelope),
eventRouting: eventRouting,
config: firehoseconfig,
cfClient: cfClient,
}
}
func (f *FirehoseNozzle) Start() error {
logging.Info.Printf("Started the Nozzle... \n")
f.consumeFirehose()
logging.Info.Printf("consume the firehose... \n")
err := f.routeEvent()
return err
}
func (f *FirehoseNozzle) consumeFirehose() {
f.consumer = consumer.New(
f.config.TrafficControllerURL,
&tls.Config{InsecureSkipVerify: f.config.InsecureSSLSkipVerify},
nil)
refresher := CfClientTokenRefresh{cfClient: f.cfClient}
f.consumer.RefreshTokenFrom(&refresher)
f.consumer.SetIdleTimeout(time.Duration(f.config.IdleTimeoutSeconds) * time.Second)
f.messages, f.errs = f.consumer.Firehose(f.config.FirehoseSubscriptionID, "")
}
func (f *FirehoseNozzle) routeEvent() error {
for {
select {
case envelope := <-f.messages:
f.eventRouting.RouteEvent(envelope)
case err := <-f.errs:
f.handleError(err)
return err
}
}
}
func (f *FirehoseNozzle) handleError(err error) {
switch {
case websocket.IsCloseError(err, websocket.CloseNormalClosure):
logging.Error.Printf("Normal Websocket Closure: %v ", err)
logging.Error.Printf("Closing connection with traffic controller due to error: %v", err)
f.consumer.Close()
case websocket.IsCloseError(err, websocket.ClosePolicyViolation):
logging.Error.Printf("Error while reading from the firehose: %v ", err)
logging.Error.Println("Disconnected because nozzle couldn't keep up. Please try scaling up the nozzle.")
logging.Trace.Println("Waiting for 60 seconds")
time.Sleep(60000 * time.Millisecond)
logging.Trace.Println("Trying to re-start firehose Client after fault...")
f.ResetCfClient()
f.Start()
default:
logging.Error.Printf("Error while reading from the firehose: %v", err)
logging.Trace.Println("Waiting for 60 seconds")
time.Sleep(60000 * time.Millisecond)
logging.Trace.Println("Trying to re-start firehose Client after fault...")
f.ResetCfClient()
f.Start()
}
}
func (f *FirehoseNozzle) handleMessage(envelope *events.Envelope) {
if envelope.GetEventType() == events.Envelope_CounterEvent && envelope.CounterEvent.GetName() == "TruncatingBuffer.DroppedMessages" && envelope.GetOrigin() == "doppler" {
logging.Info.Println("We've intercepted an upstream message which indicates that the nozzle or the TrafficController is not keeping up. Please try scaling up the nozzle.")
}
}
func (ct *CfClientTokenRefresh) RefreshAuthToken() (string, error) {
return ct.cfClient.GetToken()
}
func (f *FirehoseNozzle) ResetCfClient() {
logging.Info.Printf("Resetting cfClient...")
client, err := cfclient.NewClient(cleanCfConfig(f.cfClient.Config))
if err != nil {
logging.Error.Printf("Failed to reset cfClient: %v", err)
return
}
f.cfClient = client
}
func cleanCfConfig(config cfclient.Config) (*cfclient.Config) {
c := cfclient.Config{
ApiAddress: config.ApiAddress,
Username: config.Username,
Password: config.Password,
SkipSslValidation: config.SkipSslValidation,
}
return &c
}