diff --git a/agent/metrics_agent.go b/agent/metrics_agent.go index f9c9cb11..546bab9e 100644 --- a/agent/metrics_agent.go +++ b/agent/metrics_agent.go @@ -19,6 +19,7 @@ import ( _ "flashcat.cloud/categraf/inputs/clickhouse" _ "flashcat.cloud/categraf/inputs/cloudwatch" _ "flashcat.cloud/categraf/inputs/conntrack" + _ "flashcat.cloud/categraf/inputs/consul" _ "flashcat.cloud/categraf/inputs/cpu" _ "flashcat.cloud/categraf/inputs/disk" _ "flashcat.cloud/categraf/inputs/diskio" diff --git a/conf/input.consul/consul.toml b/conf/input.consul/consul.toml new file mode 100644 index 00000000..645dd5a7 --- /dev/null +++ b/conf/input.consul/consul.toml @@ -0,0 +1,26 @@ +# # collect interval +# interval = 15 + +[[instances]] + ## Consul server address + #address = "localhost:8500" + + ## URI scheme for the Consul server, one of "http", "https" + #scheme = "http" + + ## ACL token used in every request + # token = "" + + ## HTTP Basic Authentication username and password. + # username = "" + # password = "" + + ## Data center to query the health checks from + # datacenter = "" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = true diff --git a/inputs/consul/README.md b/inputs/consul/README.md new file mode 100644 index 00000000..45035da7 --- /dev/null +++ b/inputs/consul/README.md @@ -0,0 +1,59 @@ +# Consul Input Plugin + +This plugin will collect statistics about all health checks registered in the +Consul. It uses [Consul API][1] to query the data. It will not report the +[telemetry][2] but Consul can report those stats already using StatsD protocol +if needed. + +[1]: https://www.consul.io/docs/agent/http/health.html#health_state + +[2]: https://www.consul.io/docs/agent/telemetry.html + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +## Configuration + +```toml @sample.conf +# Gather health check statuses from services registered in Consul +[[instances]] + ## Consul server address + # address = "localhost:8500" + + ## URI scheme for the Consul server, one of "http", "https" + # scheme = "http" + + ## ACL token used in every request + # token = "" + + ## HTTP Basic Authentication username and password. + # username = "" + # password = "" + + ## Data center to query the health checks from + # datacenter = "" + + ## Optional TLS Config + # tls_ca = "/etc/categraf/ca.pem" + # tls_cert = "/etc/categraf/cert.pem" + # tls_key = "/etc/categraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = true +``` + +## Example Output + +```text +consul_up address=localhost:8500 agent_hostname=hostname 1 +consul_health_service_status address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo status=passing 1 +consul_health_service_status address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo status=warning 0 +consul_health_service_status address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo status=critical 0 +consul_health_service_status address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo status=maintenance 0 +consul_health_checks_service_tag address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo tag=tag1 1 +consul_health_checks_service_tag address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo tag=tag2 1 +consul_scrape_use_seconds address=localhost:8500 agent_hostname=hostname 0.015674053 +``` diff --git a/inputs/consul/consul.go b/inputs/consul/consul.go new file mode 100644 index 00000000..8b54431e --- /dev/null +++ b/inputs/consul/consul.go @@ -0,0 +1,173 @@ +package consul + +import ( + "log" + "net/http" + "time" + + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/inputs" + "flashcat.cloud/categraf/pkg/tls" + "flashcat.cloud/categraf/types" + + "github.com/hashicorp/consul/api" +) + +const inputName = "consul" + +type Instance struct { + Address string `toml:"address"` + Scheme string `toml:"scheme"` + Token string `toml:"token"` + Username string `toml:"username"` + Password string `toml:"password"` + Datacenter string `toml:"datacenter"` + tls.ClientConfig + + // client used to connect to Consul agnet + client *api.Client + + config.InstanceConfig +} + +func (ins *Instance) Init() error { + conf := api.DefaultConfig() + + if ins.Address == "" { + return types.ErrInstancesEmpty + } + conf.Address = ins.Address + + if ins.Scheme != "" { + conf.Scheme = ins.Scheme + } + + if ins.Datacenter != "" { + conf.Datacenter = ins.Datacenter + } + + if ins.Token != "" { + conf.Token = ins.Token + } + + if ins.Username != "" { + conf.HttpAuth = &api.HttpBasicAuth{ + Username: ins.Username, + Password: ins.Password, + } + } + + tlsCfg, err := ins.ClientConfig.TLSConfig() + if err != nil { + return err + } + + conf.Transport = &http.Transport{ + TLSClientConfig: tlsCfg, + } + + client, err := api.NewClient(conf) + if err != nil { + return err + } + ins.client = client + + return nil +} + +type Consul struct { + config.PluginConfig + Instances []*Instance `toml:"instances"` +} + +func init() { + inputs.Add(inputName, func() inputs.Input { + return &Consul{} + }) +} + +func (c *Consul) Clone() inputs.Input { + return &Consul{} +} + +func (c *Consul) Name() string { + return inputName +} + +func (c *Consul) GetInstances() []inputs.Instance { + ret := make([]inputs.Instance, len(c.Instances)) + for i := 0; i < len(c.Instances); i++ { + ret[i] = c.Instances[i] + } + return ret +} + +func (ins *Instance) Gather(slist *types.SampleList) { + tag := map[string]string{"address": ins.Address} + begun := time.Now() + + // scrape use seconds + defer func(begun time.Time) { + use := time.Since(begun).Seconds() + slist.PushFront(types.NewSample(inputName, "scrape_use_seconds", use, tag)) + }(begun) + + checks, _, err := ins.client.Health().State("any", nil) + if err != nil { + slist.PushFront(types.NewSample(inputName, "up", 0, tag)) + log.Println("E! failed to gather http target:", ins.Address, "error:", err) + return + } + slist.PushFront(types.NewSample(inputName, "up", 1, tag)) + + for _, check := range checks { + tags := make(map[string]string) + for k, v := range tag { + tags[k] = v + } + tags["check_id"] = check.CheckID + tags["check_name"] = check.Name + tags["node"] = check.Node + + var passing, warning, critical, maintenance float64 + switch check.Status { + case api.HealthPassing: + passing = 1 + tags["status"] = api.HealthPassing + case api.HealthWarning: + warning = 1 + tags["status"] = api.HealthWarning + case api.HealthCritical: + critical = 1 + tags["status"] = api.HealthCritical + case api.HealthMaint: + maintenance = 1 + tags["status"] = api.HealthMaint + } + + if check.ServiceID == "" { + slist.PushFront(types.NewSample(inputName, "health_node_status", passing, tags)) + slist.PushFront(types.NewSample(inputName, "health_node_status", warning, tags)) + slist.PushFront(types.NewSample(inputName, "health_node_status", critical, tags)) + slist.PushFront(types.NewSample(inputName, "health_node_status", maintenance, tags)) + } else { + tags["service_id"] = check.ServiceID + tags["service_name"] = check.ServiceName + slist.PushFront(types.NewSample(inputName, "health_service_status", passing, tags)) + slist.PushFront(types.NewSample(inputName, "health_service_status", warning, tags)) + slist.PushFront(types.NewSample(inputName, "health_service_status", critical, tags)) + slist.PushFront(types.NewSample(inputName, "health_service_status", maintenance, tags)) + } + + delete(tags, "status") + + set := make(map[string]struct{}) + for _, t := range check.ServiceTags { + if _, ok := set[t]; ok { + continue + } + slist.PushFront(types.NewSample(inputName, "health_checks_service_tag", 1, tags, map[string]string{"tag": t})) + set[t] = struct{}{} + } + } +}