/
upstream.go
153 lines (124 loc) · 4.06 KB
/
upstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package azugo
import (
"bytes"
"crypto/tls"
"net/url"
"strings"
"azugo.io/azugo/internal/proxy"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
type proxyUpstream struct {
Scheme []byte
Host []byte
Path []byte
BaseURL []byte
}
// Proxy is the proxy handler.
type Proxy struct {
client *fasthttp.Client
options *proxyOptions
upstreamIndex uint
}
// ProxyOption is a proxy option.
type ProxyOption interface {
apply(*proxyOptions)
}
type proxyOptions struct {
BasePath string
InsecureSkipVerify bool
BodyRewriter *proxy.BodyRewriter
Upstream []*proxyUpstream
}
// ProxyUpstreamInsecureSkipVerify skips TLS certificate verification for upstream request.
type ProxyUpstreamInsecureSkipVerify bool
func (o ProxyUpstreamInsecureSkipVerify) apply(opts *proxyOptions) {
opts.InsecureSkipVerify = bool(o)
}
type bodyRewriterRule struct {
from, to string
}
func (o *bodyRewriterRule) apply(opts *proxyOptions) {
opts.BodyRewriter.AddReplace([]byte(o.from), []byte(o.to))
}
// ProxyUpstreamBodyReplaceText replaces text in the response body.
func ProxyUpstreamBodyReplaceText(from, to string) ProxyOption {
return &bodyRewriterRule{from, to}
}
// ProxyUpstreamBodyReplaceURL replaces URL in the response body.
type ProxyUpstreamBodyReplaceURL bool
func (o ProxyUpstreamBodyReplaceURL) apply(opts *proxyOptions) {
opts.BodyRewriter.RewriteBaseURL = bool(o)
}
type proxyUpstreams []*proxyUpstream
func (o proxyUpstreams) apply(opts *proxyOptions) {
opts.Upstream = append(opts.Upstream, o...)
}
// ProxyUpstream adds one or more upstream URLs.
func ProxyUpstream(upstream ...*url.URL) ProxyOption {
upstr := make(proxyUpstreams, len(upstream))
for i, v := range upstream {
upstr[i] = &proxyUpstream{
Scheme: []byte(v.Scheme),
Host: []byte(v.Host),
Path: []byte(strings.TrimRight(v.Path, "/")),
BaseURL: []byte(strings.TrimRight(v.String(), "/")),
}
}
return upstr
}
// newUpstreamProxy creates a new proxy handler.
func (m *mux) newUpstreamProxy(basePath string, options ...ProxyOption) *Proxy {
opt := &proxyOptions{
BasePath: strings.TrimRight(basePath, "/"),
InsecureSkipVerify: true,
BodyRewriter: proxy.NewBodyRewriter(),
}
for _, option := range options {
option.apply(opt)
}
return &Proxy{
client: &fasthttp.Client{
NoDefaultUserAgentHeader: true,
TLSConfig: &tls.Config{
InsecureSkipVerify: opt.InsecureSkipVerify,
},
ReadBufferSize: m.app.ServerOptions.ResponseWriteBufferSize,
WriteBufferSize: m.app.ServerOptions.RequestReadBufferSize,
},
options: opt,
}
}
// Handler implements azugo.Handler to handle incoming request.
func (p *Proxy) Handler(ctx *Context) {
if len(p.options.Upstream) == 0 {
ctx.StatusCode(fasthttp.StatusBadGateway).Text(fasthttp.StatusMessage(fasthttp.StatusBadGateway))
return
}
// This is not thread safe but we don't care if multiple requests goes to the same upstream.
p.upstreamIndex = (p.upstreamIndex + 1) % uint(len(p.options.Upstream))
upstream := p.options.Upstream[p.upstreamIndex]
// Copy request from original
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
ctx.Request().CopyTo(req)
resp := &ctx.Context().Response
req.SetRequestURIBytes(append(upstream.Path, req.RequestURI()[len(p.options.BasePath):]...))
req.URI().SetSchemeBytes(upstream.Scheme)
req.SetHostBytes(upstream.Host)
// Downgrade HTTP/2 to HTTP/1.1
if ctx.IsTLS() && bytes.Equal(req.Header.Protocol(), []byte("HTTP/2")) {
req.Header.SetProtocolBytes([]byte("HTTP/1.1"))
}
proxy.StripHeaders(&req.Header)
if err := p.client.Do(req, resp); err != nil {
ctx.Log().With(zap.Error(err)).Warn("proxy upstream failed")
ctx.StatusCode(fasthttp.StatusBadGateway).Text(fasthttp.StatusMessage(fasthttp.StatusBadGateway))
return
}
proxy.StripHeaders(&resp.Header)
proxy.RewriteCookies(ctx.IsTLS(), ctx.Host(), resp)
if p.options.BodyRewriter != nil && p.options.BodyRewriter.Enabled() {
p.options.BodyRewriter.RewriteResponse(append([]byte(ctx.BaseURL()), []byte(p.options.BasePath)...), []byte(upstream.BaseURL), resp)
}
}