Skip to content

Commit

Permalink
Merge pull request #564 from fabiolb/issue-558
Browse files Browse the repository at this point in the history
Refactor consul service monitor
  • Loading branch information
magiconair committed Nov 15, 2018
2 parents 44d6dec + b498073 commit b6d5441
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 139 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type Consul struct {
CheckTLSSkipVerify bool
CheckDeregisterCriticalServiceAfter string
ChecksRequired string
ServiceMonitors int
}

type Tracing struct {
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var defaultConfig = &Config{
ServiceAddr: ":9998",
ServiceName: "fabio",
ServiceStatus: []string{"passing"},
ServiceMonitors: 1,
CheckInterval: time.Second,
CheckTimeout: 3 * time.Second,
CheckScheme: "http",
Expand Down
5 changes: 5 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.BoolVar(&cfg.Registry.Consul.CheckTLSSkipVerify, "registry.consul.register.checkTLSSkipVerify", defaultConfig.Registry.Consul.CheckTLSSkipVerify, "service check TLS verification")
f.StringVar(&cfg.Registry.Consul.CheckDeregisterCriticalServiceAfter, "registry.consul.register.checkDeregisterCriticalServiceAfter", defaultConfig.Registry.Consul.CheckDeregisterCriticalServiceAfter, "critical service deregistration timeout")
f.StringVar(&cfg.Registry.Consul.ChecksRequired, "registry.consul.checksRequired", defaultConfig.Registry.Consul.ChecksRequired, "number of checks which must pass: one or all")
f.IntVar(&cfg.Registry.Consul.ServiceMonitors, "registry.consul.serviceMonitors", defaultConfig.Registry.Consul.ServiceMonitors, "concurrency for route updates")
f.IntVar(&cfg.Runtime.GOGC, "runtime.gogc", defaultConfig.Runtime.GOGC, "sets runtime.GOGC")
f.IntVar(&cfg.Runtime.GOMAXPROCS, "runtime.gomaxprocs", defaultConfig.Runtime.GOMAXPROCS, "sets runtime.GOMAXPROCS")
f.StringVar(&cfg.UI.Access, "ui.access", defaultConfig.UI.Access, "access mode, one of [ro, rw]")
Expand Down Expand Up @@ -244,6 +245,10 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
cfg.Registry.Consul.CheckScheme = "https"
}

if cfg.Registry.Consul.ServiceMonitors <= 0 {
cfg.Registry.Consul.ServiceMonitors = 1
}

if gzipContentTypesValue != "" {
cfg.Proxy.GZIPContentTypes, err = regexp.Compile(gzipContentTypesValue)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,13 @@ func TestLoad(t *testing.T) {
return cfg
},
},
{
args: []string{"-registry.consul.serviceMonitors", "5"},
cfg: func(cfg *Config) *Config {
cfg.Registry.Consul.ServiceMonitors = 5
return cfg
},
},
{
args: []string{"-log.access.format", "foobar"},
cfg: func(cfg *Config) *Config {
Expand Down
12 changes: 12 additions & 0 deletions docs/content/ref/registry.consul.serviceMonitors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
title: "registry.consul.serviceMonitors"
---

`registry.consul.serviceMonitors` configures the concurrency for
route updates. Fabio will make up to the configured number of
concurrent calls to Consul to fetch status data for route
updates.

The default is

registry.consul.serviceMonitors = 1
10 changes: 10 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,16 @@
# registry.consul.checksRequired = one


# registry.consul.serviceMonitors configures the concurrency for
# route updates. Fabio will make up to the configured number of
# concurrent calls to Consul to fetch status data for route
# updates.
#
# The default is
#
# registry.consul.serviceMonitors = 1


# glob.matching.disabled disables glob matching on route lookups
# If glob matching is enabled there is a performance decrease
# for every route lookup. At a large number of services (> 500) this
Expand Down
3 changes: 2 additions & 1 deletion registry/consul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func (b *be) WatchServices() chan string {
log.Printf("[INFO] consul: Using dynamic routes")
log.Printf("[INFO] consul: Using tag prefix %q", b.cfg.TagPrefix)

m := NewServiceMonitor(b.c, b.cfg, b.dc)
svc := make(chan string)
go watchServices(b.c, b.cfg, svc)
go m.Watch(svc)
return svc
}

Expand Down
49 changes: 0 additions & 49 deletions registry/consul/parse.go

This file was deleted.

140 changes: 140 additions & 0 deletions registry/consul/routecmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package consul

import (
"fmt"
"log"
"net"
"os"
"runtime"
"strconv"
"strings"

"github.com/hashicorp/consul/api"
)

// routecmd builds a route command.
type routecmd struct {
// svc is the consul service instance.
svc *api.CatalogService

// prefix is the prefix of urlprefix tags. e.g. 'urlprefix-'.
prefix string

env map[string]string
}

func (r routecmd) build() []string {
var svctags, routetags []string
for _, t := range r.svc.ServiceTags {
if strings.HasPrefix(t, r.prefix) {
routetags = append(routetags, t)
} else {
svctags = append(svctags, t)
}
}

// generate route commands
var config []string
for _, tag := range routetags {
if route, opts, ok := parseURLPrefixTag(tag, r.prefix, r.env); ok {
name, addr, port := r.svc.ServiceName, r.svc.ServiceAddress, r.svc.ServicePort

// use consul node address if service address is not set
if addr == "" {
addr = r.svc.Address
}

// add .local suffix on OSX for simple host names w/o domain
if runtime.GOOS == "darwin" && !strings.Contains(addr, ".") && !strings.HasSuffix(addr, ".local") {
addr += ".local"
}

addr = net.JoinHostPort(addr, strconv.Itoa(port))
//tags := strings.Join(r.tags, ",")
dst := "http://" + addr + "/"

var weight string
var ropts []string
for _, o := range strings.Fields(opts) {
switch {
case o == "proto=tcp":
dst = "tcp://" + addr

case o == "proto=https":
dst = "https://" + addr

case strings.HasPrefix(o, "weight="):
weight = o[len("weight="):]

case strings.HasPrefix(o, "redirect="):
redir := strings.Split(o[len("redirect="):], ",")
if len(redir) == 2 {
dst = redir[1]
ropts = append(ropts, fmt.Sprintf("redirect=%s", redir[0]))
} else {
log.Printf("[ERROR] Invalid syntax for redirect: %s. should be redirect=<code>,<url>", o)
continue
}
default:
ropts = append(ropts, o)
}
}

cfg := "route add " + name + " " + route + " " + dst
if weight != "" {
cfg += " weight " + weight
}
if len(svctags) > 0 {
cfg += " tags " + strconv.Quote(strings.Join(svctags, ","))
}
if len(ropts) > 0 {
cfg += " opts " + strconv.Quote(strings.Join(ropts, " "))
}

config = append(config, cfg)
}
}
return config
}

// parseURLPrefixTag expects an input in the form of 'tag-host/path[ opts]'
// and returns the lower cased host and the unaltered path if the
// prefix matches the tag.
func parseURLPrefixTag(s, prefix string, env map[string]string) (route, opts string, ok bool) {
// expand $x or ${x} to env[x] or ""
expand := func(s string) string {
return os.Expand(s, func(x string) string {
if env == nil {
return ""
}
return env[x]
})
}

s = strings.TrimSpace(s)
if !strings.HasPrefix(s, prefix) {
return "", "", false
}
s = strings.TrimSpace(s[len(prefix):])

p := strings.SplitN(s, " ", 2)
if len(p) == 2 {
opts = p[1]
}
s = p[0]

// prefix is ":port"
if strings.HasPrefix(s, ":") {
return s, opts, true
}

// prefix is "host/path"
p = strings.SplitN(s, "/", 2)
if len(p) == 1 {
log.Printf("[WARN] consul: Invalid %s tag %q - You need to have a trailing slash!", prefix, s)
return "", "", false
}
host, path := p[0], p[1]

return strings.ToLower(expand(host)) + "/" + expand(path), opts, true
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,58 @@
package consul

import "testing"
import (
"reflect"
"testing"

"github.com/hashicorp/consul/api"
)

func TestRouteCmd(t *testing.T) {
cases := []struct {
name string
r routecmd
cfg []string
}{
{
name: "http",
r: routecmd{
prefix: "p-",
svc: &api.CatalogService{
ServiceName: "svc-1",
ServiceAddress: "1.1.1.1",
ServicePort: 2222,
ServiceTags: []string{`p-foo/bar`},
},
},
cfg: []string{
`route add svc-1 foo/bar http://1.1.1.1:2222/`,
},
},
{
name: "tcp",
r: routecmd{
prefix: "p-",
svc: &api.CatalogService{
ServiceName: "svc-1",
ServiceAddress: "1.1.1.1",
ServicePort: 2222,
ServiceTags: []string{`p-:1234 proto=tcp`},
},
},
cfg: []string{
`route add svc-1 :1234 tcp://1.1.1.1:2222`,
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if got, want := c.r.build(), c.cfg; !reflect.DeepEqual(got, want) {
t.Fatalf("\ngot %#v\nwant %#v", got, want)
}
})
}
}

func TestParseTag(t *testing.T) {
prefix := "p-"
Expand Down
Loading

0 comments on commit b6d5441

Please sign in to comment.