Skip to content

Commit

Permalink
reverseproxy: Fix dial placeholders, SRV, active health checks (#3780)
Browse files Browse the repository at this point in the history
* reverseproxy: Fix dial placeholders, SRV, active health checks

Supercedes #3776
Partially reverts or updates #3756, #3693, and #3695

* reverseproxy: add integration tests

Co-authored-by: Mohammed Al Sahaf <msaa1990@gmail.com>
  • Loading branch information
mholt and mohammed90 committed Oct 13, 2020
1 parent e34d9f1 commit c7efb03
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 33 deletions.
238 changes: 238 additions & 0 deletions caddytest/integration/reverseproxy_test.go
Expand Up @@ -79,6 +79,244 @@ func TestSRVWithDial(t *testing.T) {
`, "json", `upstream: specifying dial address is incompatible with lookup_srv: 0: {\"dial\": \"tcp/address.to.upstream:80\", \"lookup_srv\": \"srv.host.service.consul\"}`)
}

func TestDialWithPlaceholderUnix(t *testing.T) {

if runtime.GOOS == "windows" {
t.SkipNow()
}

f, err := ioutil.TempFile("", "*.sock")
if err != nil {
t.Errorf("failed to create TempFile: %s", err)
return
}
// a hack to get a file name within a valid path to use as socket
socketName := f.Name()
os.Remove(f.Name())

server := http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("Hello, World!"))
}),
}

unixListener, err := net.Listen("unix", socketName)
if err != nil {
t.Errorf("failed to listen on the socket: %s", err)
return
}
go server.Serve(unixListener)
t.Cleanup(func() {
server.Close()
})
runtime.Gosched() // Allow other goroutines to run

tester := caddytest.NewTester(t)
tester.InitServer(`
{
"apps": {
"http": {
"servers": {
"srv0": {
"listen": [
":8080"
],
"routes": [
{
"handle": [
{
"handler": "reverse_proxy",
"upstreams": [
{
"dial": "unix/{http.request.header.X-Caddy-Upstream-Dial}"
}
]
}
]
}
]
}
}
}
}
}
`, "json")

req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", nil)
if err != nil {
t.Fail()
return
}
req.Header.Set("X-Caddy-Upstream-Dial", socketName)
tester.AssertResponse(req, 200, "Hello, World!")
}

func TestReverseProxyWithPlaceholderDialAddress(t *testing.T) {
tester := caddytest.NewTester(t)
tester.InitServer(`
{
"apps": {
"http": {
"servers": {
"srv0": {
"listen": [
":8080"
],
"routes": [
{
"match": [
{
"host": [
"localhost"
]
}
],
"handle": [
{
"handler": "static_response",
"body": "Hello, World!"
}
],
"terminal": true
}
],
"automatic_https": {
"skip": [
"localhost"
]
}
},
"srv1": {
"listen": [
":9080"
],
"routes": [
{
"match": [
{
"host": [
"localhost"
]
}
],
"handle": [
{
"handler": "reverse_proxy",
"upstreams": [
{
"dial": "{http.request.header.X-Caddy-Upstream-Dial}"
}
]
}
],
"terminal": true
}
],
"automatic_https": {
"skip": [
"localhost"
]
}
}
}
}
}
}
`, "json")

req, err := http.NewRequest(http.MethodGet, "http://localhost:9080", nil)
if err != nil {
t.Fail()
return
}
req.Header.Set("X-Caddy-Upstream-Dial", "localhost:8080")
tester.AssertResponse(req, 200, "Hello, World!")
}

func TestReverseProxyWithPlaceholderTCPDialAddress(t *testing.T) {
tester := caddytest.NewTester(t)
tester.InitServer(`
{
"apps": {
"http": {
"servers": {
"srv0": {
"listen": [
":8080"
],
"routes": [
{
"match": [
{
"host": [
"localhost"
]
}
],
"handle": [
{
"handler": "static_response",
"body": "Hello, World!"
}
],
"terminal": true
}
],
"automatic_https": {
"skip": [
"localhost"
]
}
},
"srv1": {
"listen": [
":9080"
],
"routes": [
{
"match": [
{
"host": [
"localhost"
]
}
],
"handle": [
{
"handler": "reverse_proxy",
"upstreams": [
{
"dial": "tcp/{http.request.header.X-Caddy-Upstream-Dial}:8080"
}
]
}
],
"terminal": true
}
],
"automatic_https": {
"skip": [
"localhost"
]
}
}
}
}
}
}
`, "json")

req, err := http.NewRequest(http.MethodGet, "http://localhost:9080", nil)
if err != nil {
t.Fail()
return
}
req.Header.Set("X-Caddy-Upstream-Dial", "localhost")
tester.AssertResponse(req, 200, "Hello, World!")
}

func TestSRVWithActiveHealthcheck(t *testing.T) {
caddytest.AssertLoadError(t, `
{
Expand Down
43 changes: 31 additions & 12 deletions modules/caddyhttp/reverseproxy/healthchecks.go
Expand Up @@ -154,23 +154,42 @@ func (h *Handler) doActiveHealthCheckForAllHosts() {
}
}()

portStr := strconv.Itoa(upstream.activeHealthCheckPort)
hostAddr := net.JoinHostPort(upstream.networkAddress.Host, portStr)
if upstream.networkAddress.IsUnixNetwork() {
networkAddr, err := caddy.NewReplacer().ReplaceOrErr(upstream.Dial, true, true)
if err != nil {
h.HealthChecks.Active.logger.Error("invalid use of placeholders in dial address for active health checks",
zap.String("address", networkAddr),
zap.Error(err),
)
return
}
addr, err := caddy.ParseNetworkAddress(networkAddr)
if err != nil {
h.HealthChecks.Active.logger.Error("bad network address",
zap.String("address", networkAddr),
zap.Error(err),
)
return
}
if hcp := uint(upstream.activeHealthCheckPort); hcp != 0 {
if addr.IsUnixNetwork() {
addr.Network = "tcp" // I guess we just assume TCP since we are using a port??
}
addr.StartPort, addr.EndPort = hcp, hcp
}
if upstream.LookupSRV == "" && addr.PortRangeSize() != 1 {
h.HealthChecks.Active.logger.Error("multiple addresses (upstream must map to only one address)",
zap.String("address", networkAddr),
)
return
}
hostAddr := addr.JoinHostPort(0)
if addr.IsUnixNetwork() {
// this will be used as the Host portion of a http.Request URL, and
// paths to socket files would produce an error when creating URL,
// so use a fake Host value instead; unix sockets are usually local
hostAddr = "localhost"
}

dialInfo := DialInfo{
Upstream: upstream,
Network: upstream.networkAddress.Network,
Host: upstream.networkAddress.Host,
Port: portStr,
Address: hostAddr,
}
err := h.doActiveHealthCheck(dialInfo, hostAddr, upstream.Host)
err = h.doActiveHealthCheck(DialInfo{Network: addr.Network, Address: hostAddr}, hostAddr, upstream.Host)
if err != nil {
h.HealthChecks.Active.logger.Error("active health check failed",
zap.String("address", hostAddr),
Expand Down
1 change: 0 additions & 1 deletion modules/caddyhttp/reverseproxy/hosts.go
Expand Up @@ -92,7 +92,6 @@ type Upstream struct {
// HeaderAffinity string
// IPAffinity string

networkAddress caddy.NetworkAddress
activeHealthCheckPort int
healthCheckPolicy *PassiveHealthChecks
cb CircuitBreaker
Expand Down
3 changes: 0 additions & 3 deletions modules/caddyhttp/reverseproxy/httptransport.go
Expand Up @@ -182,9 +182,6 @@ func (h *HTTPTransport) NewTransport(ctx caddy.Context) (*http.Transport, error)
if dialInfo, ok := GetDialInfo(ctx); ok {
network = dialInfo.Network
address = dialInfo.Address
if dialInfo.Upstream.networkAddress.IsUnixNetwork() {
address = dialInfo.Host
}
}
conn, err := dialer.DialContext(ctx, network, address)
if err != nil {
Expand Down
18 changes: 1 addition & 17 deletions modules/caddyhttp/reverseproxy/reverseproxy.go
Expand Up @@ -129,10 +129,8 @@ func (h *Handler) Provision(ctx caddy.Context) error {
h.ctx = ctx
h.logger = ctx.Logger(h)

// get validation out of the way
// verify SRV compatibility
for i, v := range h.Upstreams {
// Having LookupSRV non-empty conflicts with 2 other config parameters: active health checks, and upstream dial address.
// Therefore if LookupSRV is empty, then there are no immediately apparent config conflicts, and the iteration can be skipped.
if v.LookupSRV == "" {
continue
}
Expand Down Expand Up @@ -219,18 +217,6 @@ func (h *Handler) Provision(ctx caddy.Context) error {

// set up upstreams
for _, upstream := range h.Upstreams {
if upstream.LookupSRV == "" {
addr, err := caddy.ParseNetworkAddress(upstream.Dial)
if err != nil {
return err
}

if addr.PortRangeSize() != 1 {
return fmt.Errorf("multiple addresses (upstream must map to only one address): %v", addr)
}

upstream.networkAddress = addr
}
// create or get the host representation for this upstream
var host Host = new(upstreamHost)
existingHost, loaded := hosts.LoadOrStore(upstream.String(), host)
Expand Down Expand Up @@ -292,8 +278,6 @@ func (h *Handler) Provision(ctx caddy.Context) error {
// then use it, otherwise use the port of upstream.
if h.HealthChecks.Active.Port != 0 {
upstream.activeHealthCheckPort = h.HealthChecks.Active.Port
} else {
upstream.activeHealthCheckPort = int(upstream.networkAddress.StartPort)
}
}

Expand Down

0 comments on commit c7efb03

Please sign in to comment.