-
Notifications
You must be signed in to change notification settings - Fork 718
/
service.go
336 lines (305 loc) · 10.6 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
package management
import (
"context"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/cors"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"nhooyr.io/websocket"
)
const (
// In the current state, an invalid command was provided by the client
StatusInvalidCommand websocket.StatusCode = 4001
reasonInvalidCommand = "expected start streaming as first event"
// There are a limited number of available streaming log sessions that cloudflared will service, exceeding this
// value will return this error to incoming requests.
StatusSessionLimitExceeded websocket.StatusCode = 4002
reasonSessionLimitExceeded = "limit exceeded for streaming sessions"
// There is a limited idle time while not actively serving a session for a request before dropping the connection.
StatusIdleLimitExceeded websocket.StatusCode = 4003
reasonIdleLimitExceeded = "session was idle for too long"
)
var (
// CORS middleware required to allow dash to access management.argotunnel.com requests
corsHandler = cors.Handler(cors.Options{
// Allows for any subdomain of cloudflare.com
AllowedOrigins: []string{"https://*.cloudflare.com"},
// Required to present cookies or other authentication across origin boundries
AllowCredentials: true,
MaxAge: 300, // Maximum value not ignored by any of major browsers
})
)
type ManagementService struct {
// The management tunnel hostname
Hostname string
// Host details related configurations
serviceIP string
clientID uuid.UUID
label string
// Additional Handlers
metricsHandler http.Handler
log *zerolog.Logger
router chi.Router
// streamingMut is a lock to prevent concurrent requests to start streaming. Utilizing the atomic.Bool is not
// sufficient to complete this operation since many other checks during an incoming new request are needed
// to validate this before setting streaming to true.
streamingMut sync.Mutex
logger LoggerListener
}
func New(managementHostname string,
enableDiagServices bool,
serviceIP string,
clientID uuid.UUID,
label string,
log *zerolog.Logger,
logger LoggerListener,
) *ManagementService {
s := &ManagementService{
Hostname: managementHostname,
log: log,
logger: logger,
serviceIP: serviceIP,
clientID: clientID,
label: label,
metricsHandler: promhttp.Handler(),
}
r := chi.NewRouter()
r.Use(ValidateAccessTokenQueryMiddleware)
// Default management services
r.With(corsHandler).Get("/ping", ping)
r.With(corsHandler).Head("/ping", ping)
r.Get("/logs", s.logs)
r.With(corsHandler).Get("/host_details", s.getHostDetails)
// Diagnostic management services
if enableDiagServices {
// Prometheus endpoint
r.With(corsHandler).Get("/metrics", s.metricsHandler.ServeHTTP)
// Supports only heap and goroutine
r.With(corsHandler).Get("/debug/pprof/{profile:heap|goroutine}", pprof.Index)
}
s.router = r
return s
}
func (m *ManagementService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.router.ServeHTTP(w, r)
}
// Management Ping handler
func ping(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
// The response provided by the /host_details endpoint
type getHostDetailsResponse struct {
ClientID string `json:"connector_id"`
IP string `json:"ip,omitempty"`
HostName string `json:"hostname,omitempty"`
}
func (m *ManagementService) getHostDetails(w http.ResponseWriter, r *http.Request) {
var getHostDetailsResponse = getHostDetailsResponse{
ClientID: m.clientID.String(),
}
if ip, err := getPrivateIP(m.serviceIP); err == nil {
getHostDetailsResponse.IP = ip
}
getHostDetailsResponse.HostName = m.getLabel()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
json.NewEncoder(w).Encode(getHostDetailsResponse)
}
func (m *ManagementService) getLabel() string {
if m.label != "" {
return fmt.Sprintf("custom:%s", m.label)
}
// If no label is provided we return the system hostname. This is not
// a fqdn hostname.
hostname, err := os.Hostname()
if err != nil {
return "unknown"
}
return hostname
}
// Get preferred private ip of this machine
func getPrivateIP(addr string) (string, error) {
conn, err := net.DialTimeout("tcp", addr, 1*time.Second)
if err != nil {
return "", err
}
defer conn.Close()
localAddr := conn.LocalAddr().String()
host, _, err := net.SplitHostPort(localAddr)
return host, err
}
// readEvents will loop through all incoming websocket messages from a client and marshal them into the
// proper Event structure and pass through to the events channel. Any invalid messages sent will automatically
// terminate the connection.
func (m *ManagementService) readEvents(c *websocket.Conn, ctx context.Context, events chan<- *ClientEvent) {
for {
event, err := ReadClientEvent(c, ctx)
select {
case <-ctx.Done():
return
default:
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if !IsClosed(err, m.log) {
m.log.Err(err).Send()
m.log.Err(c.Close(websocket.StatusUnsupportedData, err.Error())).Send()
}
// Any errors when reading the messages from the client will close the connection
return
}
events <- event
}
}
}
// streamLogs will begin the process of reading from the Session listener and write the log events to the client.
func (m *ManagementService) streamLogs(c *websocket.Conn, ctx context.Context, session *session) {
for session.Active() {
select {
case <-ctx.Done():
session.Stop()
return
case event := <-session.listener:
err := WriteEvent(c, ctx, &EventLog{
ServerEvent: ServerEvent{Type: Logs},
Logs: []*Log{event},
})
if err != nil {
// If the client (or the server) already closed the connection, don't attempt to close it again
if !IsClosed(err, m.log) {
m.log.Err(err).Send()
m.log.Err(c.Close(websocket.StatusInternalError, err.Error())).Send()
}
// Any errors when writing the messages to the client will stop streaming and close the connection
session.Stop()
return
}
default:
// No messages to send
}
}
}
// canStartStream will check the conditions of the request and return if the session can begin streaming.
func (m *ManagementService) canStartStream(session *session) bool {
m.streamingMut.Lock()
defer m.streamingMut.Unlock()
// Limits to one actor for streaming logs
if m.logger.ActiveSessions() > 0 {
// Allow the same user to preempt their existing session to disconnect their old session and start streaming
// with this new session instead.
if existingSession := m.logger.ActiveSession(session.actor); existingSession != nil {
m.log.Info().
Msgf("Another management session request for the same actor was requested; the other session will be disconnected to handle the new request.")
existingSession.Stop()
m.logger.Remove(existingSession)
existingSession.cancel()
} else {
m.log.Warn().
Msgf("Another management session request was attempted but one session already being served; there is a limit of streaming log sessions to reduce overall performance impact.")
return false
}
}
return true
}
// parseFilters will check the ClientEvent for start_streaming and assign filters if provided to the session
func (m *ManagementService) parseFilters(c *websocket.Conn, event *ClientEvent, session *session) bool {
// Expect the first incoming request
startEvent, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming)
if !ok {
return false
}
session.Filters(startEvent.Filters)
return true
}
// Management Streaming Logs accept handler
func (m *ManagementService) logs(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
OriginPatterns: []string{
"*.cloudflare.com",
},
})
if err != nil {
m.log.Debug().Msgf("management handshake: %s", err.Error())
return
}
// Make sure the connection is closed if other go routines fail to close the connection after completing.
defer c.Close(websocket.StatusInternalError, "")
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
events := make(chan *ClientEvent)
go m.readEvents(c, ctx, events)
// Send a heartbeat ping to hold the connection open even if not streaming.
ping := time.NewTicker(15 * time.Second)
defer ping.Stop()
// Close the connection if no operation has occurred after the idle timeout. The timeout is halted
// when streaming logs is active.
idleTimeout := 5 * time.Minute
idle := time.NewTimer(idleTimeout)
defer idle.Stop()
// Fetch the claims from the request context to acquire the actor
claims, ok := ctx.Value(accessClaimsCtxKey).(*managementTokenClaims)
if !ok || claims == nil {
// Typically should never happen as it is provided in the context from the middleware
m.log.Err(c.Close(websocket.StatusInternalError, "missing access_token")).Send()
return
}
session := newSession(logWindow, claims.Actor, cancel)
defer m.logger.Remove(session)
for {
select {
case <-ctx.Done():
m.log.Debug().Msgf("management logs: context cancelled")
c.Close(websocket.StatusNormalClosure, "context closed")
return
case event := <-events:
switch event.Type {
case StartStreaming:
idle.Stop()
// Expect the first incoming request
startEvent, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming)
if !ok {
m.log.Warn().Msgf("expected start_streaming as first recieved event")
m.log.Err(c.Close(StatusInvalidCommand, reasonInvalidCommand)).Send()
return
}
// Make sure the session can start
if !m.canStartStream(session) {
m.log.Err(c.Close(StatusSessionLimitExceeded, reasonSessionLimitExceeded)).Send()
return
}
session.Filters(startEvent.Filters)
m.logger.Listen(session)
m.log.Debug().Msgf("Streaming logs")
go m.streamLogs(c, ctx, session)
continue
case StopStreaming:
idle.Reset(idleTimeout)
// Stop the current session for the current actor who requested it
session.Stop()
m.logger.Remove(session)
case UnknownClientEventType:
fallthrough
default:
// Drop unknown events and close connection
m.log.Debug().Msgf("unexpected management message received: %s", event.Type)
// If the client (or the server) already closed the connection, don't attempt to close it again
if !IsClosed(err, m.log) {
m.log.Err(err).Err(c.Close(websocket.StatusUnsupportedData, err.Error())).Send()
}
return
}
case <-ping.C:
go c.Ping(ctx)
case <-idle.C:
c.Close(StatusIdleLimitExceeded, reasonIdleLimitExceeded)
return
}
}
}