-
Notifications
You must be signed in to change notification settings - Fork 724
/
observer.go
120 lines (100 loc) · 3.16 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package connection
import (
"net"
"strings"
"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/cloudflare/cloudflared/management"
)
const (
LogFieldConnectionID = "connection"
LogFieldLocation = "location"
LogFieldIPAddress = "ip"
LogFieldProtocol = "protocol"
observerChannelBufferSize = 16
)
type Observer struct {
log *zerolog.Logger
logTransport *zerolog.Logger
metrics *tunnelMetrics
tunnelEventChan chan Event
addSinkChan chan EventSink
}
type EventSink interface {
OnTunnelEvent(event Event)
}
func NewObserver(log, logTransport *zerolog.Logger) *Observer {
o := &Observer{
log: log,
logTransport: logTransport,
metrics: newTunnelMetrics(),
tunnelEventChan: make(chan Event, observerChannelBufferSize),
addSinkChan: make(chan EventSink, observerChannelBufferSize),
}
go o.dispatchEvents()
return o
}
func (o *Observer) RegisterSink(sink EventSink) {
o.addSinkChan <- sink
}
func (o *Observer) logConnected(connectionID uuid.UUID, connIndex uint8, location string, address net.IP, protocol Protocol) {
o.sendEvent(Event{Index: connIndex, EventType: Connected, Location: location})
o.log.Info().
Int(management.EventTypeKey, int(management.Cloudflared)).
Str(LogFieldConnectionID, connectionID.String()).
Uint8(LogFieldConnIndex, connIndex).
Str(LogFieldLocation, location).
IPAddr(LogFieldIPAddress, address).
Str(LogFieldProtocol, protocol.String()).
Msg("Registered tunnel connection")
o.metrics.registerServerLocation(uint8ToString(connIndex), location)
}
func (o *Observer) sendRegisteringEvent(connIndex uint8) {
o.sendEvent(Event{Index: connIndex, EventType: RegisteringTunnel})
}
func (o *Observer) sendConnectedEvent(connIndex uint8, protocol Protocol, location string) {
o.sendEvent(Event{Index: connIndex, EventType: Connected, Protocol: protocol, Location: location})
}
func (o *Observer) SendURL(url string) {
o.sendEvent(Event{EventType: SetURL, URL: url})
if !strings.HasPrefix(url, "https://") {
// We add https:// in the prefix for backwards compatibility as we used to do that with the old free tunnels
// and some tools (like `wrangler tail`) are regexp-ing for that specifically.
url = "https://" + url
}
o.metrics.userHostnamesCounts.WithLabelValues(url).Inc()
}
func (o *Observer) SendReconnect(connIndex uint8) {
o.sendEvent(Event{Index: connIndex, EventType: Reconnecting})
}
func (o *Observer) sendUnregisteringEvent(connIndex uint8) {
o.sendEvent(Event{Index: connIndex, EventType: Unregistering})
}
func (o *Observer) SendDisconnect(connIndex uint8) {
o.sendEvent(Event{Index: connIndex, EventType: Disconnected})
}
func (o *Observer) sendEvent(e Event) {
select {
case o.tunnelEventChan <- e:
break
default:
o.log.Warn().Msg("observer channel buffer is full")
}
}
func (o *Observer) dispatchEvents() {
var sinks []EventSink
for {
select {
case sink := <-o.addSinkChan:
sinks = append(sinks, sink)
case evt := <-o.tunnelEventChan:
for _, sink := range sinks {
sink.OnTunnelEvent(evt)
}
}
}
}
type EventSinkFunc func(event Event)
func (f EventSinkFunc) OnTunnelEvent(event Event) {
f(event)
}