/
websocket_sink.go
100 lines (82 loc) · 2.76 KB
/
websocket_sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package websocket
import (
"doppler/sinks"
"log"
"net"
"time"
"truncatingbuffer"
"github.com/cloudfoundry/sonde-go/events"
"github.com/gogo/protobuf/proto"
gorilla "github.com/gorilla/websocket"
)
type remoteMessageWriter interface {
RemoteAddr() net.Addr
SetWriteDeadline(time.Time) error
WriteMessage(messageType int, data []byte) error
}
type Counter interface {
Increment(events.Envelope_EventType)
}
type noopCounter struct{}
func (noopCounter) Increment(events.Envelope_EventType) {}
type WebsocketSink struct {
appID string
ws remoteMessageWriter
clientAddress net.Addr
messageDrainBufferSize uint
writeTimeout time.Duration
dropsondeOrigin string
counter Counter
}
func NewWebsocketSink(appID string, ws remoteMessageWriter, messageDrainBufferSize uint, writeTimeout time.Duration, dropsondeOrigin string) *WebsocketSink {
return &WebsocketSink{
appID: appID,
ws: ws,
clientAddress: ws.RemoteAddr(),
messageDrainBufferSize: messageDrainBufferSize,
writeTimeout: writeTimeout,
dropsondeOrigin: dropsondeOrigin,
counter: noopCounter{},
}
}
func (sink *WebsocketSink) SetCounter(counter Counter) {
sink.counter = counter
}
func (sink *WebsocketSink) Identifier() string {
return sink.ws.RemoteAddr().String()
}
func (sink *WebsocketSink) AppID() string {
return sink.appID
}
func (sink *WebsocketSink) ShouldReceiveErrors() bool {
return true
}
func (sink *WebsocketSink) Run(inputChan <-chan *events.Envelope) {
stopChan := make(chan struct{})
log.Printf("Websocket Sink %s: Running for streamId [%s]", sink.clientAddress, sink.appID)
context := truncatingbuffer.NewDefaultContext(sink.dropsondeOrigin, sink.Identifier())
buffer := sinks.RunTruncatingBuffer(inputChan, sink.messageDrainBufferSize, context, stopChan)
for {
messageEnvelope, ok := <-buffer.GetOutputChannel()
if !ok {
log.Printf("Websocket Sink %s: Closed listener channel detected. Closing websocket", sink.clientAddress)
close(stopChan)
return
}
messageBytes, err := proto.Marshal(messageEnvelope)
if err != nil {
log.Printf("Websocket Sink %s: Error marshalling %s envelope from origin %s: %s", sink.clientAddress, messageEnvelope.GetEventType(), messageEnvelope.GetOrigin(), err.Error())
continue
}
if sink.writeTimeout != 0 {
sink.ws.SetWriteDeadline(time.Now().Add(sink.writeTimeout))
}
err = sink.ws.WriteMessage(gorilla.BinaryMessage, messageBytes)
if err != nil {
log.Printf("Websocket Sink %s: Error when trying to send data to sink. Requesting close. Err: %v", sink.clientAddress, err)
close(stopChan)
return
}
sink.counter.Increment(messageEnvelope.GetEventType())
}
}