Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for labels/filters from go-metrics #3369

Merged
merged 6 commits into from
Aug 8, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/agent/systemd"
Expand Down Expand Up @@ -94,6 +95,9 @@ type Agent struct {
// Used for streaming logs to
LogWriter *logger.LogWriter

// In-memory sink used for collecting metrics
MemSink *metrics.InmemSink

// delegate is either a *consul.Server or *consul.Client
// depending on the configuration
delegate delegate
Expand Down
32 changes: 32 additions & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ type Telemetry struct {
// DisableHostname will disable hostname prefixing for all metrics
DisableHostname bool `mapstructure:"disable_hostname"`

// PrefixFilter is a list of filter rules to apply for allowing/blocking metrics
// by prefix.
PrefixFilter []string `mapstructure:"prefix_filter"`
AllowedPrefixes []string `mapstructure:"-" json:"-"`
BlockedPrefixes []string `mapstructure:"-" json:"-"`

// FilterDefault is the default for whether to allow a metric that's not
// covered by the filter.
FilterDefault *bool `mapstructure:"filter_default"`

// DogStatsdAddr is the address of a dogstatsd instance. If provided,
// metrics will be sent to that instance
DogStatsdAddr string `mapstructure:"dogstatsd_addr"`
Expand Down Expand Up @@ -937,6 +947,7 @@ func DefaultConfig() *Config {
},
Telemetry: Telemetry{
StatsitePrefix: "consul",
FilterDefault: Bool(true),
},
Meta: make(map[string]string),
SyslogFacility: "LOCAL0",
Expand Down Expand Up @@ -1461,6 +1472,21 @@ func DecodeConfig(r io.Reader) (*Config, error) {
result.EnableACLReplication = true
}

// Parse the metric filters
for _, rule := range result.Telemetry.PrefixFilter {
if rule == "" {
return nil, fmt.Errorf("Cannot have empty filter rule in prefix_filter")
}
switch rule[0] {
case '+':
result.Telemetry.AllowedPrefixes = append(result.Telemetry.AllowedPrefixes, rule[1:])
case '-':
result.Telemetry.BlockedPrefixes = append(result.Telemetry.BlockedPrefixes, rule[1:])
default:
return nil, fmt.Errorf("Filter rule must begin with either '+' or '-': %s", rule)
}
}

return &result, nil
}

Expand Down Expand Up @@ -1755,6 +1781,12 @@ func MergeConfig(a, b *Config) *Config {
if b.Telemetry.DisableHostname == true {
result.Telemetry.DisableHostname = true
}
if len(b.Telemetry.PrefixFilter) != 0 {
result.Telemetry.PrefixFilter = append(result.Telemetry.PrefixFilter, b.Telemetry.PrefixFilter...)
}
if b.Telemetry.FilterDefault != nil {
result.Telemetry.FilterDefault = b.Telemetry.FilterDefault
}
if b.Telemetry.StatsdAddr != "" {
result.Telemetry.StatsdAddr = b.Telemetry.StatsdAddr
}
Expand Down
12 changes: 12 additions & 0 deletions agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,18 @@ func TestDecodeConfig(t *testing.T) {
in: `{"telemetry":{"dogstatsd_tags":["a","b"]}}`,
c: &Config{Telemetry: Telemetry{DogStatsdTags: []string{"a", "b"}}},
},
{
in: `{"telemetry":{"filter_default":true}}`,
c: &Config{Telemetry: Telemetry{FilterDefault: Bool(true)}},
},
{
in: `{"telemetry":{"prefix_filter":["+consul.metric","-consul.othermetric"]}}`,
c: &Config{Telemetry: Telemetry{
PrefixFilter: []string{"+consul.metric", "-consul.othermetric"},
AllowedPrefixes: []string{"consul.metric"},
BlockedPrefixes: []string{"consul.othermetric"},
}},
},
{
in: `{"telemetry":{"statsd_address":"a"}}`,
c: &Config{Telemetry: Telemetry{StatsdAddr: "a"}},
Expand Down
9 changes: 6 additions & 3 deletions agent/consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,15 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru

// Provide some metrics
if err == nil {
metrics.IncrCounter([]string{"consul", "catalog", "service", "query", args.ServiceName}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
if args.ServiceTag != "" {
metrics.IncrCounter([]string{"consul", "catalog", "service", "query-tag", args.ServiceName, args.ServiceTag}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(reply.ServiceNodes) == 0 {
metrics.IncrCounter([]string{"consul", "catalog", "service", "not-found", args.ServiceName}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err
Expand Down
15 changes: 10 additions & 5 deletions agent/consul/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "kvs", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case api.KVSet:
return c.state.KVSSet(index, &req.DirEnt)
Expand Down Expand Up @@ -216,7 +217,8 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "session", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.SessionCreate:
if err := c.state.SessionCreate(index, &req.Session); err != nil {
Expand All @@ -236,7 +238,8 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "acl", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.ACLBootstrapInit:
enabled, err := c.state.ACLBootstrapInit(index)
Expand Down Expand Up @@ -267,7 +270,8 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "tombstone", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.TombstoneReap:
return c.state.ReapTombstones(req.ReapIndex)
Expand Down Expand Up @@ -301,7 +305,8 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf
panic(fmt.Errorf("failed to decode request: %v", err))
}

defer metrics.MeasureSince([]string{"consul", "fsm", "prepared-query", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
return c.state.PreparedQuerySet(index, req.Query)
Expand Down
9 changes: 6 additions & 3 deletions agent/consul/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,15 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc

// Provide some metrics
if err == nil {
metrics.IncrCounter([]string{"consul", "health", "service", "query", args.ServiceName}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
if args.ServiceTag != "" {
metrics.IncrCounter([]string{"consul", "health", "service", "query-tag", args.ServiceName, args.ServiceTag}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(reply.Nodes) == 0 {
metrics.IncrCounter([]string{"consul", "health", "service", "not-found", args.ServiceName}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err
Expand Down
3 changes: 2 additions & 1 deletion agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
return structs.ErrNoDCPath
}

metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "rpc", "cross-dc"}, 1,
[]metrics.Label{{Name: "datacenter", Value: dc}})
if err := s.connPool.RPC(dc, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil {
manager.NotifyFailedServer(server)
s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err)
Expand Down
6 changes: 4 additions & 2 deletions agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ START:
func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) {
q := req.Question[0]
defer func(s time.Time) {
metrics.MeasureSince([]string{"consul", "dns", "ptr_query", d.agent.config.NodeName}, s)
metrics.MeasureSinceWithLabels([]string{"consul", "dns", "ptr_query"}, s,
[]metrics.Label{{Name: "node", Value: d.agent.config.NodeName}})
d.logger.Printf("[DEBUG] dns: request for %v (%v) from client %s (%s)",
q, time.Now().Sub(s), resp.RemoteAddr().String(),
resp.RemoteAddr().Network())
Expand Down Expand Up @@ -187,7 +188,8 @@ func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) {
func (d *DNSServer) handleQuery(resp dns.ResponseWriter, req *dns.Msg) {
q := req.Question[0]
defer func(s time.Time) {
metrics.MeasureSince([]string{"consul", "dns", "domain_query", d.agent.config.NodeName}, s)
metrics.MeasureSinceWithLabels([]string{"consul", "dns", "domain_query"}, s,
[]metrics.Label{{Name: "node", Value: d.agent.config.NodeName}})
d.logger.Printf("[DEBUG] dns: request for %v (%v) from client %s (%s)",
q, time.Now().Sub(s), resp.RemoteAddr().String(),
resp.RemoteAddr().Network())
Expand Down
2 changes: 2 additions & 0 deletions agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler {

// Register the wrapper, which will close over the expensive-to-compute
// parts from above.
// TODO (kyhavlov): Convert this to utilize metric labels in a major release
wrapper := func(resp http.ResponseWriter, req *http.Request) {
start := time.Now()
handler(resp, req)
Expand Down Expand Up @@ -97,6 +98,7 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler {
handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload))
handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor))
handleFuncMetrics("/v1/agent/metrics", s.wrap(s.agent.MemSink.DisplayMetrics))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to make an intermediate wrapper that applies the ACL policy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what sort of acl policy applies to be able to view metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'll require agent:read

handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))
Expand Down
26 changes: 26 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ type AgentToken struct {
Token string
}

// Metrics info is used to store different types of metric values from the agent.
type MetricsInfo struct {
Timestamp string
Gauges []map[string]interface{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These would be more useful with concrete types. The structs can be defined in the API package.

Points []map[string]interface{}
Counters []map[string]interface{}
Samples []map[string]interface{}
}

// Agent can be used to query the Agent endpoints
type Agent struct {
c *Client
Expand Down Expand Up @@ -126,6 +135,23 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) {
return out, nil
}

// Metrics is used to query the agent we are speaking to for
// its current internal metric data
func (a *Agent) Metrics() (*MetricsInfo, error) {
r := a.c.newRequest("GET", "/v1/agent/metrics")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()

var out *MetricsInfo
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}

// Reload triggers a configuration reload for the agent we are connected to.
func (a *Agent) Reload() error {
r := a.c.newRequest("PUT", "/v1/agent/reload")
Expand Down
22 changes: 22 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,28 @@ func TestAPI_AgentSelf(t *testing.T) {
}
}

func TestAPI_AgentMetrics(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

agent := c.Agent()

metrics, err := agent.Metrics()
if err != nil {
t.Fatalf("err: %v", err)
}

if len(metrics.Gauges) < 0 {
t.Fatalf("bad: %v", metrics)
}

name := metrics.Gauges[0]["Name"]
if name != "consul.runtime.alloc_bytes" {
t.Fatalf("bad: %v", metrics.Gauges[0])
}
}

func TestAPI_AgentReload(t *testing.T) {
t.Parallel()

Expand Down
17 changes: 10 additions & 7 deletions command/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,14 +601,15 @@ func circonusSink(config *agent.Config, hostname string) (metrics.MetricSink, er
return sink, nil
}

func startupTelemetry(config *agent.Config) error {
func startupTelemetry(config *agent.Config) (*metrics.InmemSink, error) {
// Setup telemetry
// Aggregate on 10 second intervals for 1 minute. Expose the
// metrics over stderr when there is a SIGUSR1 received.
memSink := metrics.NewInmemSink(10*time.Second, time.Minute)
metrics.DefaultInmemSignal(memSink)
metricsConf := metrics.DefaultConfig(config.Telemetry.StatsitePrefix)
metricsConf.EnableHostname = !config.Telemetry.DisableHostname
metricsConf.FilterDefault = *config.Telemetry.FilterDefault

var sinks metrics.FanoutSink
addSink := func(name string, fn func(*agent.Config, string) (metrics.MetricSink, error)) error {
Expand All @@ -623,16 +624,16 @@ func startupTelemetry(config *agent.Config) error {
}

if err := addSink("statsite", statsiteSink); err != nil {
return err
return nil, err
}
if err := addSink("statsd", statsdSink); err != nil {
return err
return nil, err
}
if err := addSink("dogstatd", dogstatdSink); err != nil {
return err
return nil, err
}
if err := addSink("circonus", circonusSink); err != nil {
return err
return nil, err
}

if len(sinks) > 0 {
Expand All @@ -642,7 +643,7 @@ func startupTelemetry(config *agent.Config) error {
metricsConf.EnableHostname = false
metrics.NewGlobal(metricsConf, memSink)
}
return nil
return memSink, nil
}

func (cmd *AgentCommand) Run(args []string) int {
Expand Down Expand Up @@ -682,7 +683,8 @@ func (cmd *AgentCommand) run(args []string) int {
cmd.logOutput = logOutput
cmd.logger = log.New(logOutput, "", log.LstdFlags)

if err := startupTelemetry(config); err != nil {
memSink, err := startupTelemetry(config)
if err != nil {
cmd.UI.Error(err.Error())
return 1
}
Expand All @@ -696,6 +698,7 @@ func (cmd *AgentCommand) run(args []string) int {
}
agent.LogOutput = logOutput
agent.LogWriter = logWriter
agent.MemSink = memSink

if err := agent.Start(); err != nil {
cmd.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
Expand Down
Loading