/
leader_forwarder.go
124 lines (105 loc) · 3.49 KB
/
leader_forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
// Package api contains the telemetry of the Cluster Agent API and implements
// the forwarding of queries from Cluster Agent followers to the leader.
package api
import (
"crypto/tls"
"fmt"
stdLog "log"
"net"
"net/http"
"net/http/httputil"
"strconv"
"sync"
"time"
"github.com/cihub/seelog"
"github.com/DataDog/datadog-agent/pkg/config"
)
const (
forwardHeader = "X-DCA-Follower-Forwarded"
respForwarded = "X-DCA-Forwarded"
)
// globalLeaderForwarder is the global LeaderForwarder instance
var globalLeaderForwarder *LeaderForwarder
// LeaderForwarder allows to forward queries from follower to leader
type LeaderForwarder struct {
transport http.RoundTripper
logger *stdLog.Logger
proxy *httputil.ReverseProxy
proxyLock sync.RWMutex
apiPort string
}
// NewLeaderForwarder initializes a new LeaderForwarder instance and is used for test purposes
func NewLeaderForwarder(apiPort, maxConnections int) *LeaderForwarder {
// Use a stack depth of 4 on top of the default one to get a relevant filename in the stdlib
logWriter, _ := config.NewLogWriter(4, seelog.DebugLvl)
return &LeaderForwarder{
apiPort: strconv.Itoa(apiPort),
transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 1 * time.Second,
KeepAlive: 20 * time.Second,
}).DialContext,
ForceAttemptHTTP2: false,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSHandshakeTimeout: 5 * time.Second,
MaxConnsPerHost: maxConnections,
MaxIdleConnsPerHost: maxConnections,
MaxIdleConns: 0,
IdleConnTimeout: 120 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
logger: stdLog.New(logWriter, "Error while forwarding to leader DCA: ", 0), // log errors to seelog,
}
}
// NewGlobalLeaderForwarder initializes the global LeaderForwarder instance
func NewGlobalLeaderForwarder(apiPort, maxConnections int) {
if globalLeaderForwarder != nil {
return
}
globalLeaderForwarder = NewLeaderForwarder(apiPort, maxConnections)
}
// GetGlobalLeaderForwarder returns the global LeaderForwarder instance
func GetGlobalLeaderForwarder() *LeaderForwarder {
return globalLeaderForwarder
}
// Forward forwards a query to leader if available
func (lf *LeaderForwarder) Forward(rw http.ResponseWriter, req *http.Request) {
// Always set Forwarded header in reply
rw.Header().Set(respForwarded, "true")
if req.Header.Get(forwardHeader) != "" {
http.Error(rw, fmt.Sprintf("Query was already forwarded from: %s", req.RemoteAddr), http.StatusLoopDetected)
}
var currentProxy *httputil.ReverseProxy
lf.proxyLock.RLock()
currentProxy = lf.proxy
lf.proxyLock.RUnlock()
if currentProxy == nil {
http.Error(rw, "", http.StatusServiceUnavailable)
return
}
currentProxy.ServeHTTP(rw, req)
}
// SetLeaderIP allows to change the target leader IP
func (lf *LeaderForwarder) SetLeaderIP(leaderIP string) {
lf.proxyLock.Lock()
defer lf.proxyLock.Unlock()
if leaderIP == "" {
lf.proxy = nil
return
}
lf.proxy = &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "https"
req.URL.Host = leaderIP + ":" + lf.apiPort
req.Header.Add(forwardHeader, "true")
},
Transport: lf.transport,
ErrorLog: lf.logger,
}
}