Skip to content

Commit

Permalink
consul: improve consul service discovery
Browse files Browse the repository at this point in the history
Related to prometheus#3711

- Add the ability to filter by tag and node-meta in an efficient way (`/catalog/services`
  allow filtering by node-meta, and returns a `map[string]string` or `service`->`tags`).
  Tags and nore-meta are also used in `/catalog/service` requests.
- Do not require a call to the catalog if services are specified by name. This is important
  because on large cluster `/catalog/services` changes all the time.
- Add `allow_stale` configuration option to do stale reads. Non-stale
  reads can be costly, even more when you are doing them to a remote
  datacenter with 10k+ targets over WAN (which is common for federation).
- Add `refresh_interval` to minimize the strain on the catalog and on the
  service endpoint. This is needed because of that kind of behavior from
  consul: hashicorp/consul#3712 and because a catalog
  on a large cluster would basically change *all* the time. No need to discover
  targets in 1sec if we scrape them every minute.
- Added plenty of unit tests.

Benchmarks
----------

```yaml
scrape_configs:

- job_name: prometheus
  scrape_interval: 60s
  static_configs:
    - targets: ["127.0.0.1:9090"]

- job_name: "observability-by-tag"
  scrape_interval: "60s"
  metrics_path: "/metrics"
  consul_sd_configs:
    - server: consul.service.par.consul.prod.crto.in:8500
      tag: marathon-user-observability  # Used in After
      refresh_interval: 30s             # Used in After+delay
  relabel_configs:
    - source_labels: [__meta_consul_tags]
      regex: ^(.*,)?marathon-user-observability(,.*)?$
      action: keep

- job_name: "observability-by-name"
  scrape_interval: "60s"
  metrics_path: "/metrics"
  consul_sd_configs:
    - server: consul.service.par.consul.prod.crto.in:8500
      services:
        - observability-cerebro
        - observability-portal-web

- job_name: "fake-fake-fake"
  scrape_interval: "15s"
  metrics_path: "/metrics"
  consul_sd_configs:
    - server: consul.service.par.consul.prod.crto.in:8500
      services:
        - fake-fake-fake
```

Note: tested with ~1200 services, ~5000 nodes.

| Resource | Empty | Before | After | After + delay |
| -------- |:-----:|:------:|:-----:|:-------------:|
|/service-discovery size|5K|85MiB|27k|27k|27k|
|`go_memstats_heap_objects`|100k|1M|120k|110k|
|`go_memstats_heap_alloc_bytes`|24MB|150MB|28MB|27MB|
|`rate(go_memstats_alloc_bytes_total[5m])`|0.2MB/s|28MB/s|2MB/s|0.3MB/s|
|`rate(process_cpu_seconds_total[5m])`|0.1%|15%|2%|0.01%|
|`process_open_fds`|16|*1236*|22|22|
|`rate(prometheus_sd_consul_rpc_duration_seconds_count{call="services"}[5m])`|~0|1|1|*0.03*|
|`rate(prometheus_sd_consul_rpc_duration_seconds_count{call="service"}[5m])`|0.1|*80*|0.5|0.5|
|`prometheus_target_sync_length_seconds{quantile="0.9",scrape_job="observability-by-tag"}`|N/A|200ms|0.2ms|0.2ms|
|Network bandwidth|~10kbps|~2.8Mbps|~1.6Mbps|~10kbps|

Filtering by tag using relabel_configs uses **100kiB and 23kiB/s per service per job** and quite a lot of CPU. Also sends and additional *1Mbps* of traffic to consul.
Being a little bit smarter about this reduces the overhead quite a lot.
Limiting the number of `/catalog/services` queries per second almost removes the overhead of service discovery.
  • Loading branch information
Corentin Chary committed Mar 16, 2018
1 parent bc6058c commit 57b359f
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 74 deletions.
3 changes: 3 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,11 @@ var expectedConf = &Config{
Server: "localhost:1234",
Token: "mysecret",
Services: []string{"nginx", "cache", "mysql"},
Tag: "canary",
NodeMeta: map[string]string{"rack": "123"},
TagSeparator: consul.DefaultSDConfig.TagSeparator,
Scheme: "https",
AllowStale: true,
TLSConfig: config_util.TLSConfig{
CertFile: filepath.FromSlash("testdata/valid_cert_file"),
KeyFile: filepath.FromSlash("testdata/valid_key_file"),
Expand Down
4 changes: 4 additions & 0 deletions config/testdata/conf.good.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ scrape_configs:
- server: 'localhost:1234'
token: mysecret
services: ['nginx', 'cache', 'mysql']
tag: "canary"
node_meta:
rack: "123"
allow_stale: true
scheme: https
tls_config:
ca_file: valid_ca_file
Expand Down
242 changes: 172 additions & 70 deletions discovery/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ var (

// DefaultSDConfig is the default Consul SD configuration.
DefaultSDConfig = SDConfig{
TagSeparator: ",",
Scheme: "http",
Server: "localhost:8500",
TagSeparator: ",",
Scheme: "http",
Server: "localhost:8500",
AllowStale: true,
RefreshInterval: model.Duration(0 * time.Second),
}
)

Expand All @@ -95,9 +97,23 @@ type SDConfig struct {
Scheme string `yaml:"scheme,omitempty"`
Username string `yaml:"username,omitempty"`
Password config_util.Secret `yaml:"password,omitempty"`
// See https://www.consul.io/docs/internals/consensus.html#consistency-modes,
// stale reads are a lot cheaper and are a necessity if you have >5k targets.
AllowStale bool `yaml:"allow_stale,omitempty"`
// By default use blocking queries () but allow users to delay
// updates if necessary. This can be useful because of "bugs" like
// https://github.com/hashicorp/consul/issues/3712 which cause an un-necessary
// amount of requests on consul.
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`

// See https://www.consul.io/api/catalog.html#list-services
// The list of services for which targets are discovered.
// Defaults to all services if empty.
Services []string `yaml:"services"`
// An optional tag used to filter instances inside a service.
Tag string `yaml:"tag,omitempty"`
// Desired node metadata.
NodeMeta map[string]string `yaml:"node_meta,omitempty"`

TLSConfig config_util.TLSConfig `yaml:"tls_config,omitempty"`
// Catches all undefined fields and must be empty after parsing.
Expand Down Expand Up @@ -138,6 +154,10 @@ type Discovery struct {
clientDatacenter string
tagSeparator string
watchedServices []string // Set of services which will be discovered.
watchedTag string // A tag used to filter instances of a service.
watchedNodeMeta map[string]string
allowStale bool
refreshInterval time.Duration
logger log.Logger
}

Expand Down Expand Up @@ -183,18 +203,28 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) {
clientConf: clientConf,
tagSeparator: conf.TagSeparator,
watchedServices: conf.Services,
watchedTag: conf.Tag,
watchedNodeMeta: conf.NodeMeta,
allowStale: conf.AllowStale,
refreshInterval: time.Duration(conf.RefreshInterval),
clientDatacenter: clientConf.Datacenter,
logger: logger,
}
return cd, nil
}

// shouldWatch returns whether the service of the given name should be watched.
func (d *Discovery) shouldWatch(name string) bool {
func (d *Discovery) shouldWatch(name string, tags []string) bool {
return d.shouldWatchFromName(name) && d.shouldWatchFromTags(tags)
}

// shouldWatch returns whether the service of the given name should be watched based on its name.
func (d *Discovery) shouldWatchFromName(name string) bool {
// If there's no fixed set of watched services, we watch everything.
if len(d.watchedServices) == 0 {
return true
}

for _, sn := range d.watchedServices {
if sn == name {
return true
Expand All @@ -203,21 +233,46 @@ func (d *Discovery) shouldWatch(name string) bool {
return false
}

// shouldWatch returns whether the service of the given name should be watched based on its tags.
func (d *Discovery) shouldWatchFromTags(tags []string) bool {
// If there's no fixed set of watched tags, we watch everything.
if d.watchedTag == "" {
return true
}

for _, tag := range tags {
if d.watchedTag == tag {
return true
}
}
return false
}

// Get the local datacenter if not specified.
func (d *Discovery) getDatacenter() error {
// If the datacenter was not set from clientConf, let's get it from the local Consul agent
// (Consul default is to use local node's datacenter if one isn't given for a query).
if d.clientDatacenter != "" {
return nil
}

info, err := d.client.Agent().Self()
if err != nil {
level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err)
return err
}

d.clientDatacenter = info["Config"]["Datacenter"].(string)
return nil
}

// Run implements the Discoverer interface.
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Watched services and their cancelation functions.
// Watched services and their cancellation functions.
services := map[string]func(){}

var lastIndex uint64
for {
catalog := d.client.Catalog()
t0 := time.Now()
srvs, meta, err := catalog.Services(&consul.QueryOptions{
WaitIndex: lastIndex,
WaitTime: watchTimeout,
})
rpcDuration.WithLabelValues("catalog", "services").Observe(time.Since(t0).Seconds())

// We have to check the context at least once. The checks during channel sends
// do not guarantee that.
select {
Expand All @@ -226,92 +281,138 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
default:
}

// Get the local datacenter first, if necessary.
err := d.getDatacenter()
if err != nil {
level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err)
rpcFailuresCount.Inc()
time.Sleep(retryInterval)
continue
}
// If the index equals the previous one, the watch timed out with no update.
if meta.LastIndex == lastIndex {
continue
}
lastIndex = meta.LastIndex

// If the datacenter was not set from clientConf, let's get it from the local Consul agent
// (Consul default is to use local node's datacenter if one isn't given for a query).
if d.clientDatacenter == "" {
info, err := d.client.Agent().Self()
if err != nil {
level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err)
time.Sleep(retryInterval)
continue
if len(d.watchedServices) == 0 || d.watchedTag != "" {
// We need to watch the catalog.
d.watchServices(ctx, ch, &lastIndex, services)
} else {
// We only have fully defined services.
for _, name := range d.watchedServices {
ctx, _ := context.WithCancel(ctx)
d.watchService(name, ctx, ch)
}
d.clientDatacenter = info["Config"]["Datacenter"].(string)
// Wait for cancellation.
<-ctx.Done()
break
}
}
}

// Check for new services.
for name := range srvs {
if !d.shouldWatch(name) {
continue
}
if _, ok := services[name]; ok {
continue // We are already watching the service.
}

srv := &consulService{
client: d.client,
name: name,
labels: model.LabelSet{
serviceLabel: model.LabelValue(name),
datacenterLabel: model.LabelValue(d.clientDatacenter),
},
tagSeparator: d.tagSeparator,
logger: d.logger,
}

wctx, cancel := context.WithCancel(ctx)
go srv.watch(wctx, ch)
// Watch the catalog for services we would like to watch. This is only necessary
// for services filtered by tag. If we already have the name it is more efficient to
// watch nodes the service directly.
func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, lastIndex *uint64, services map[string]func()) error {
catalog := d.client.Catalog()
level.Debug(d.logger).Log("msg", "Watching services", "tag", d.watchedTag)

t0 := time.Now()
srvs, meta, err := catalog.Services(&consul.QueryOptions{
WaitIndex: *lastIndex,
WaitTime: watchTimeout,
AllowStale: d.allowStale,
NodeMeta: d.watchedNodeMeta,
})
rpcDuration.WithLabelValues("catalog", "services").Observe(time.Since(t0).Seconds())

services[name] = cancel
if err != nil {
level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err)
rpcFailuresCount.Inc()
time.Sleep(retryInterval)
return err
}
// If the index equals the previous one, the watch timed out with no update.
if meta.LastIndex == *lastIndex {
return nil
}
*lastIndex = meta.LastIndex

// Check for new services.
for name := range srvs {
// catalog.Service() returns a map of service name to tags, we can use that to watch
// only the services that have the tag we are looking for (if specified).
// In the future consul will also support server side for service metadata.
// https://github.com/hashicorp/consul/issues/1107
if !d.shouldWatch(name, srvs[name]) {
continue
}
if _, ok := services[name]; ok {
continue // We are already watching the service.
}

// Check for removed services.
for name, cancel := range services {
if _, ok := srvs[name]; !ok {
// Call the watch cancelation function.
cancel()
delete(services, name)

// Send clearing target group.
select {
case <-ctx.Done():
return
case ch <- []*targetgroup.Group{{Source: name}}:
}
wctx, cancel := context.WithCancel(ctx)
d.watchService(name, wctx, ch)
services[name] = cancel
}

// Check for removed services.
for name, cancel := range services {
if _, ok := srvs[name]; !ok {
// Call the watch cancellation function.
cancel()
delete(services, name)

// Send clearing target group.
select {
case <-ctx.Done():
return ctx.Err()
case ch <- []*targetgroup.Group{{Source: name}}:
}
}
}

time.Sleep(d.refreshInterval)
return nil
}

// consulService contains data belonging to the same service.
type consulService struct {
name string
tag string
labels model.LabelSet
discovery *Discovery
client *consul.Client
tagSeparator string
logger log.Logger
}

// Start watching a service.
func (d *Discovery) watchService(name string, ctx context.Context, ch chan<- []*targetgroup.Group) {
srv := &consulService{
discovery: d,
client: d.client,
name: name,
tag: d.watchedTag,
labels: model.LabelSet{
serviceLabel: model.LabelValue(name),
datacenterLabel: model.LabelValue(d.clientDatacenter),
},
tagSeparator: d.tagSeparator,
logger: d.logger,
}

go srv.watch(ctx, ch)
}

// Continuously watch one service.
func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Group) {
catalog := srv.client.Catalog()

lastIndex := uint64(0)
for {
level.Debug(srv.logger).Log("msg", "Watching service", "service", srv.name, "tag", srv.tag)

t0 := time.Now()
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{
WaitIndex: lastIndex,
WaitTime: watchTimeout,
nodes, meta, err := catalog.Service(srv.name, srv.tag, &consul.QueryOptions{
WaitIndex: lastIndex,
WaitTime: watchTimeout,
AllowStale: srv.discovery.allowStale,
NodeMeta: srv.discovery.watchedNodeMeta,
})
rpcDuration.WithLabelValues("catalog", "service").Observe(time.Since(t0).Seconds())

Expand All @@ -324,7 +425,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
}

if err != nil {
level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "err", err)
level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "tag", srv.tag, "err", err)
rpcFailuresCount.Inc()
time.Sleep(retryInterval)
continue
Expand Down Expand Up @@ -374,7 +475,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr

tgroup.Targets = append(tgroup.Targets, labels)
}
// Check context twice to ensure we always catch cancelation.
// Check context twice to ensure we always catch cancellation.
select {
case <-ctx.Done():
return
Expand All @@ -385,5 +486,6 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
return
case ch <- []*targetgroup.Group{&tgroup}:
}
time.Sleep(srv.discovery.refreshInterval)
}
}
Loading

0 comments on commit 57b359f

Please sign in to comment.