forked from grafana/loki
/
proxy_backend.go
115 lines (98 loc) · 3.26 KB
/
proxy_backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package querytee
import (
"context"
"io"
"net"
"net/http"
"net/url"
"path"
"time"
"github.com/pkg/errors"
)
// ProxyBackend holds the information of a single backend.
type ProxyBackend struct {
name string
endpoint *url.URL
client *http.Client
timeout time.Duration
// Whether this is the preferred backend from which picking up
// the response and sending it back to the client.
preferred bool
}
// NewProxyBackend makes a new ProxyBackend
func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool) *ProxyBackend {
return &ProxyBackend{
name: name,
endpoint: endpoint,
timeout: timeout,
preferred: preferred,
client: &http.Client{
CheckRedirect: func(_ *http.Request, _ []*http.Request) error {
return errors.New("the query-tee proxy does not follow redirects")
},
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100, // see https://github.com/golang/go/issues/13801
IdleConnTimeout: 90 * time.Second,
DisableCompression: true,
},
},
}
}
func (b *ProxyBackend) ForwardRequest(orig *http.Request, body io.ReadCloser) (int, []byte, error) {
req := b.createBackendRequest(orig, body)
return b.doBackendRequest(req)
}
func (b *ProxyBackend) createBackendRequest(orig *http.Request, body io.ReadCloser) *http.Request {
req := orig.Clone(context.Background())
req.Body = body
// RequestURI can't be set on a cloned request. It's only for handlers.
req.RequestURI = ""
// Replace the endpoint with the backend one.
req.URL.Scheme = b.endpoint.Scheme
req.URL.Host = b.endpoint.Host
// Prepend the endpoint path to the request path.
req.URL.Path = path.Join(b.endpoint.Path, req.URL.Path)
// Set the correct host header for the backend
req.Header.Set("Host", b.endpoint.Host)
// Replace the auth:
// - If the endpoint has user and password, use it.
// - If the endpoint has user only, keep it and use the request password (if any).
// - If the endpoint has no user and no password, use the request auth (if any).
clientUser, clientPass, clientAuth := orig.BasicAuth()
endpointUser := b.endpoint.User.Username()
endpointPass, _ := b.endpoint.User.Password()
req.Header.Del("Authorization")
if endpointUser != "" && endpointPass != "" {
req.SetBasicAuth(endpointUser, endpointPass)
} else if endpointUser != "" {
req.SetBasicAuth(endpointUser, clientPass)
} else if clientAuth {
req.SetBasicAuth(clientUser, clientPass)
}
// Remove Accept-Encoding header to avoid sending compressed responses
req.Header.Del("Accept-Encoding")
return req
}
func (b *ProxyBackend) doBackendRequest(req *http.Request) (int, []byte, error) {
// Honor the read timeout.
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel()
// Execute the request.
res, err := b.client.Do(req.WithContext(ctx))
if err != nil {
return 0, nil, errors.Wrap(err, "executing backend request")
}
// Read the entire response body.
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return 0, nil, errors.Wrap(err, "reading backend response")
}
return res.StatusCode, body, nil
}