diff --git a/config/config_test.go b/config/config_test.go index 07cbb1b56dd..2352b847470 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -278,8 +278,11 @@ var expectedConf = &Config{ Server: "localhost:1234", Token: "mysecret", Services: []string{"nginx", "cache", "mysql"}, + Tag: "canary", + NodeMeta: map[string]string{"rack": "123"}, TagSeparator: consul.DefaultSDConfig.TagSeparator, Scheme: "https", + AllowStale: true, TLSConfig: config_util.TLSConfig{ CertFile: filepath.FromSlash("testdata/valid_cert_file"), KeyFile: filepath.FromSlash("testdata/valid_key_file"), diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index a282c1ec288..9ad55d5a6dc 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -122,6 +122,10 @@ scrape_configs: - server: 'localhost:1234' token: mysecret services: ['nginx', 'cache', 'mysql'] + tag: "canary" + node_meta: + rack: "123" + allow_stale: true scheme: https tls_config: ca_file: valid_ca_file diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index de6e4ba6dc0..66831c7f11b 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -80,9 +80,11 @@ var ( // DefaultSDConfig is the default Consul SD configuration. DefaultSDConfig = SDConfig{ - TagSeparator: ",", - Scheme: "http", - Server: "localhost:8500", + TagSeparator: ",", + Scheme: "http", + Server: "localhost:8500", + AllowStale: true, + RefreshInterval: model.Duration(0 * time.Second), } ) @@ -95,9 +97,23 @@ type SDConfig struct { Scheme string `yaml:"scheme,omitempty"` Username string `yaml:"username,omitempty"` Password config_util.Secret `yaml:"password,omitempty"` + // See https://www.consul.io/docs/internals/consensus.html#consistency-modes, + // stale reads are a lot cheaper and are a necessity if you have >5k targets. + AllowStale bool `yaml:"allow_stale,omitempty"` + // By default use blocking queries () but allow users to delay + // updates if necessary. This can be useful because of "bugs" like + // https://github.com/hashicorp/consul/issues/3712 which cause an un-necessary + // amount of requests on consul. + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` + + // See https://www.consul.io/api/catalog.html#list-services // The list of services for which targets are discovered. // Defaults to all services if empty. Services []string `yaml:"services"` + // An optional tag used to filter instances inside a service. + Tag string `yaml:"tag,omitempty"` + // Desired node metadata. + NodeMeta map[string]string `yaml:"node_meta,omitempty"` TLSConfig config_util.TLSConfig `yaml:"tls_config,omitempty"` // Catches all undefined fields and must be empty after parsing. @@ -138,6 +154,10 @@ type Discovery struct { clientDatacenter string tagSeparator string watchedServices []string // Set of services which will be discovered. + watchedTag string // A tag used to filter instances of a service. + watchedNodeMeta map[string]string + allowStale bool + refreshInterval time.Duration logger log.Logger } @@ -183,6 +203,10 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { clientConf: clientConf, tagSeparator: conf.TagSeparator, watchedServices: conf.Services, + watchedTag: conf.Tag, + watchedNodeMeta: conf.NodeMeta, + allowStale: conf.AllowStale, + refreshInterval: time.Duration(conf.RefreshInterval), clientDatacenter: clientConf.Datacenter, logger: logger, } @@ -190,11 +214,17 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) { } // shouldWatch returns whether the service of the given name should be watched. -func (d *Discovery) shouldWatch(name string) bool { +func (d *Discovery) shouldWatch(name string, tags []string) bool { + return d.shouldWatchFromName(name) && d.shouldWatchFromTags(tags) +} + +// shouldWatch returns whether the service of the given name should be watched based on its name. +func (d *Discovery) shouldWatchFromName(name string) bool { // If there's no fixed set of watched services, we watch everything. if len(d.watchedServices) == 0 { return true } + for _, sn := range d.watchedServices { if sn == name { return true @@ -203,21 +233,46 @@ func (d *Discovery) shouldWatch(name string) bool { return false } +// shouldWatch returns whether the service of the given name should be watched based on its tags. +func (d *Discovery) shouldWatchFromTags(tags []string) bool { + // If there's no fixed set of watched tags, we watch everything. + if d.watchedTag == "" { + return true + } + + for _, tag := range tags { + if d.watchedTag == tag { + return true + } + } + return false +} + +// Get the local datacenter if not specified. +func (d *Discovery) getDatacenter() error { + // If the datacenter was not set from clientConf, let's get it from the local Consul agent + // (Consul default is to use local node's datacenter if one isn't given for a query). + if d.clientDatacenter != "" { + return nil + } + + info, err := d.client.Agent().Self() + if err != nil { + level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err) + return err + } + + d.clientDatacenter = info["Config"]["Datacenter"].(string) + return nil +} + // Run implements the Discoverer interface. func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - // Watched services and their cancelation functions. + // Watched services and their cancellation functions. services := map[string]func(){} var lastIndex uint64 for { - catalog := d.client.Catalog() - t0 := time.Now() - srvs, meta, err := catalog.Services(&consul.QueryOptions{ - WaitIndex: lastIndex, - WaitTime: watchTimeout, - }) - rpcDuration.WithLabelValues("catalog", "services").Observe(time.Since(t0).Seconds()) - // We have to check the context at least once. The checks during channel sends // do not guarantee that. select { @@ -226,92 +281,138 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { default: } + // Get the local datacenter first, if necessary. + err := d.getDatacenter() if err != nil { - level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err) - rpcFailuresCount.Inc() time.Sleep(retryInterval) continue } - // If the index equals the previous one, the watch timed out with no update. - if meta.LastIndex == lastIndex { - continue - } - lastIndex = meta.LastIndex - // If the datacenter was not set from clientConf, let's get it from the local Consul agent - // (Consul default is to use local node's datacenter if one isn't given for a query). - if d.clientDatacenter == "" { - info, err := d.client.Agent().Self() - if err != nil { - level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err) - time.Sleep(retryInterval) - continue + if len(d.watchedServices) == 0 || d.watchedTag != "" { + // We need to watch the catalog. + d.watchServices(ctx, ch, &lastIndex, services) + } else { + // We only have fully defined services. + for _, name := range d.watchedServices { + ctx, _ := context.WithCancel(ctx) + d.watchService(name, ctx, ch) } - d.clientDatacenter = info["Config"]["Datacenter"].(string) + // Wait for cancellation. + <-ctx.Done() + break } + } +} - // Check for new services. - for name := range srvs { - if !d.shouldWatch(name) { - continue - } - if _, ok := services[name]; ok { - continue // We are already watching the service. - } - - srv := &consulService{ - client: d.client, - name: name, - labels: model.LabelSet{ - serviceLabel: model.LabelValue(name), - datacenterLabel: model.LabelValue(d.clientDatacenter), - }, - tagSeparator: d.tagSeparator, - logger: d.logger, - } - - wctx, cancel := context.WithCancel(ctx) - go srv.watch(wctx, ch) +// Watch the catalog for services we would like to watch. This is only necessary +// for services filtered by tag. If we already have the name it is more efficient to +// watch nodes the service directly. +func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, lastIndex *uint64, services map[string]func()) error { + catalog := d.client.Catalog() + level.Debug(d.logger).Log("msg", "Watching services", "tag", d.watchedTag) + + t0 := time.Now() + srvs, meta, err := catalog.Services(&consul.QueryOptions{ + WaitIndex: *lastIndex, + WaitTime: watchTimeout, + AllowStale: d.allowStale, + NodeMeta: d.watchedNodeMeta, + }) + rpcDuration.WithLabelValues("catalog", "services").Observe(time.Since(t0).Seconds()) - services[name] = cancel + if err != nil { + level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err) + rpcFailuresCount.Inc() + time.Sleep(retryInterval) + return err + } + // If the index equals the previous one, the watch timed out with no update. + if meta.LastIndex == *lastIndex { + return nil + } + *lastIndex = meta.LastIndex + + // Check for new services. + for name := range srvs { + // catalog.Service() returns a map of service name to tags, we can use that to watch + // only the services that have the tag we are looking for (if specified). + // In the future consul will also support server side for service metadata. + // https://github.com/hashicorp/consul/issues/1107 + if !d.shouldWatch(name, srvs[name]) { + continue + } + if _, ok := services[name]; ok { + continue // We are already watching the service. } - // Check for removed services. - for name, cancel := range services { - if _, ok := srvs[name]; !ok { - // Call the watch cancelation function. - cancel() - delete(services, name) - - // Send clearing target group. - select { - case <-ctx.Done(): - return - case ch <- []*targetgroup.Group{{Source: name}}: - } + wctx, cancel := context.WithCancel(ctx) + d.watchService(name, wctx, ch) + services[name] = cancel + } + + // Check for removed services. + for name, cancel := range services { + if _, ok := srvs[name]; !ok { + // Call the watch cancellation function. + cancel() + delete(services, name) + + // Send clearing target group. + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- []*targetgroup.Group{{Source: name}}: } } } + + time.Sleep(d.refreshInterval) + return nil } // consulService contains data belonging to the same service. type consulService struct { name string + tag string labels model.LabelSet + discovery *Discovery client *consul.Client tagSeparator string logger log.Logger } +// Start watching a service. +func (d *Discovery) watchService(name string, ctx context.Context, ch chan<- []*targetgroup.Group) { + srv := &consulService{ + discovery: d, + client: d.client, + name: name, + tag: d.watchedTag, + labels: model.LabelSet{ + serviceLabel: model.LabelValue(name), + datacenterLabel: model.LabelValue(d.clientDatacenter), + }, + tagSeparator: d.tagSeparator, + logger: d.logger, + } + + go srv.watch(ctx, ch) +} + +// Continuously watch one service. func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Group) { catalog := srv.client.Catalog() lastIndex := uint64(0) for { + level.Debug(srv.logger).Log("msg", "Watching service", "service", srv.name, "tag", srv.tag) + t0 := time.Now() - nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ - WaitIndex: lastIndex, - WaitTime: watchTimeout, + nodes, meta, err := catalog.Service(srv.name, srv.tag, &consul.QueryOptions{ + WaitIndex: lastIndex, + WaitTime: watchTimeout, + AllowStale: srv.discovery.allowStale, + NodeMeta: srv.discovery.watchedNodeMeta, }) rpcDuration.WithLabelValues("catalog", "service").Observe(time.Since(t0).Seconds()) @@ -324,7 +425,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr } if err != nil { - level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "err", err) + level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "tag", srv.tag, "err", err) rpcFailuresCount.Inc() time.Sleep(retryInterval) continue @@ -374,7 +475,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr tgroup.Targets = append(tgroup.Targets, labels) } - // Check context twice to ensure we always catch cancelation. + // Check context twice to ensure we always catch cancellation. select { case <-ctx.Done(): return @@ -385,5 +486,6 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr return case ch <- []*targetgroup.Group{&tgroup}: } + time.Sleep(srv.discovery.refreshInterval) } } diff --git a/discovery/consul/consul_test.go b/discovery/consul/consul_test.go index 03996750485..d9f4927439e 100644 --- a/discovery/consul/consul_test.go +++ b/discovery/consul/consul_test.go @@ -15,6 +15,17 @@ package consul import ( "testing" + + "github.com/stretchr/testify/require" + "net/http" + "net/http/httptest" + "net/url" + + "context" + "github.com/go-kit/kit/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" + "time" ) func TestConfiguredService(t *testing.T) { @@ -25,14 +36,38 @@ func TestConfiguredService(t *testing.T) { if err != nil { t.Errorf("Unexpected error when initialising discovery %v", err) } - if !consulDiscovery.shouldWatch("configuredServiceName") { + if !consulDiscovery.shouldWatch("configuredServiceName", []string{""}) { t.Errorf("Expected service %s to be watched", "configuredServiceName") } - if consulDiscovery.shouldWatch("nonConfiguredServiceName") { + if consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}) { t.Errorf("Expected service %s to not be watched", "nonConfiguredServiceName") } } +func TestConfiguredServiceWithTag(t *testing.T) { + conf := &SDConfig{ + Services: []string{"configuredServiceName"}, + Tag: "http", + } + consulDiscovery, err := NewDiscovery(conf, nil) + + if err != nil { + t.Errorf("Unexpected error when initialising discovery %v", err) + } + if consulDiscovery.shouldWatch("configuredServiceName", []string{""}) { + t.Errorf("Expected service %s to not be watched without tag", "configuredServiceName") + } + if !consulDiscovery.shouldWatch("configuredServiceName", []string{"http"}) { + t.Errorf("Expected service %s to be watched with tag %s", "configuredServiceName", "http") + } + if consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}) { + t.Errorf("Expected service %s to not be watched without tag", "nonConfiguredServiceName") + } + if consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{"http"}) { + t.Errorf("Expected service %s to not be watched with tag", "nonConfiguredServiceName", "http") + } +} + func TestNonConfiguredService(t *testing.T) { conf := &SDConfig{} consulDiscovery, err := NewDiscovery(conf, nil) @@ -40,7 +75,133 @@ func TestNonConfiguredService(t *testing.T) { if err != nil { t.Errorf("Unexpected error when initialising discovery %v", err) } - if !consulDiscovery.shouldWatch("nonConfiguredServiceName") { + if !consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}) { t.Errorf("Expected service %s to be watched", "nonConfiguredServiceName") } } + +const ( + AgentAnswer = `{"Config": {"Datacenter": "test-dc"}}` + ServiceTestAnswer = `[{ +"ID": "b78c2e48-5ef3-1814-31b8-0d880f50471e", +"Node": "node1", +"Address": "1.1.1.1", +"Datacenter": "test-dc", +"NodeMeta": {"rack_name": "2304"}, +"ServiceID": "test", +"ServiceName": "test", +"ServiceTags": ["tag1"], +"ServicePort": 3341, +"CreateIndex": 1, +"ModifyIndex": 1 +}]` + ServicesTestAnswer = `{"test": ["tag1"], "other": ["tag2"]}` +) + +func newServer(t *testing.T) (*httptest.Server, *SDConfig) { + // github.com/hashicorp/consul/testutil/ would be nice but it needs a local consul binary. + stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := "" + switch r.URL.String() { + case "/v1/agent/self": + response = AgentAnswer + case "/v1/catalog/service/test?node-meta=rack_name%3A2304&stale=&tag=tag1&wait=30000ms": + response = ServiceTestAnswer + case "/v1/catalog/service/test?wait=30000ms": + response = ServiceTestAnswer + case "/v1/catalog/service/other?wait=30000ms": + response = `[]` + case "/v1/catalog/services?node-meta=rack_name%3A2304&stale=&wait=30000ms": + response = ServicesTestAnswer + case "/v1/catalog/services?wait=30000ms": + response = ServicesTestAnswer + case "/v1/catalog/services?index=1&node-meta=rack_name%3A2304&stale=&wait=30000ms": + time.Sleep(5 * time.Second) + response = ServicesTestAnswer + case "/v1/catalog/services?index=1&wait=30000ms": + time.Sleep(5 * time.Second) + response = ServicesTestAnswer + default: + t.Error("Unhandeld consul call: %s", r.URL) + } + w.Header().Add("X-Consul-Index", "1") + w.Write([]byte(response)) + })) + url, err := url.Parse(stub.URL) + require.NoError(t, err) + + config := &SDConfig{ + Server: url.Host, + Token: "fake-token", + RefreshInterval: model.Duration(1 * time.Second), + } + return stub, config +} + +func newDiscovery(t *testing.T, config *SDConfig) *Discovery { + logger := log.NewNopLogger() + d, err := NewDiscovery(config, logger) + require.NoError(t, err) + return d +} + +func checkOneTarget(t *testing.T, tg []*targetgroup.Group) { + require.Equal(t, 1, len(tg)) + target := tg[0] + require.Equal(t, "test-dc", string(target.Labels["__meta_consul_dc"])) + require.Equal(t, target.Source, string(target.Labels["__meta_consul_service"])) + if target.Source == "test" { + // test service should have one node. + require.NotEmpty(t, target.Targets) + } +} + +// Watch all the services in the catalog. +func TestAllServices(t *testing.T) { + stub, config := newServer(t) + defer stub.Close() + + d := newDiscovery(t, config) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []*targetgroup.Group) + go d.Run(ctx, ch) + checkOneTarget(t, <-ch) + checkOneTarget(t, <-ch) + cancel() +} + +// Watch only the test service. +func TestOneService(t *testing.T) { + stub, config := newServer(t) + defer stub.Close() + + config.Services = []string{"test"} + d := newDiscovery(t, config) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []*targetgroup.Group) + go d.Run(ctx, ch) + checkOneTarget(t, <-ch) + cancel() +} + +// Watch the test service with a specific tag and node-meta. +func TestAllOptions(t *testing.T) { + stub, config := newServer(t) + defer stub.Close() + + config.Services = []string{"test"} + config.NodeMeta = map[string]string{"rack_name": "2304"} + config.Tag = "tag1" + config.AllowStale = true + config.Token = "fake-token" + + d := newDiscovery(t, config) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []*targetgroup.Group) + go d.Run(ctx, ch) + checkOneTarget(t, <-ch) + cancel() +} diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 809287f4a53..57cc4ddf468 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -295,7 +295,7 @@ The following meta labels are available on targets during [relabeling](#relabel_ * `__meta_consul_address`: the address of the target * `__meta_consul_dc`: the datacenter name for the target -* `__meta_consul_metadata_`: each metadata key value of the target +* `__meta_consul_metadata_`: each node metadata key value of the target * `__meta_consul_node`: the node name defined for the target * `__meta_consul_service_address`: the service address of the target * `__meta_consul_service_id`: the service ID of the target @@ -321,8 +321,22 @@ tls_config: services: [ - ] +# An optional tag used to filter instances inside services. +[ tag: ] + +# Node metadata used to filter nodes. +[ node_meta: + [ : ... ] ] + # The string by which Consul tags are joined into the tag label. [ tag_separator: | default = , ] + +# Allow stale consul results (see https://www.consul.io/api/index.html#consistency-modes). Will reduce load on consul. +[ allow_stale: ] + +# The time after which the provided names are refreshed. By default refresh as soon as new target are discovered. +# On large setup it might be a good idea to increase this value because the catalog send to change all the time. +[ refresh_interval: | default = 0s ] ``` Note that the IP number and port used to scrape the targets is assembled as