Skip to content

Commit

Permalink
add consul plugin (#530)
Browse files Browse the repository at this point in the history
* add consul plugin

* update README.md and check address
  • Loading branch information
yangliyl committed Jun 7, 2023
1 parent 03d7034 commit 356adbc
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 0 deletions.
1 change: 1 addition & 0 deletions agent/metrics_agent.go
Expand Up @@ -19,6 +19,7 @@ import (
_ "flashcat.cloud/categraf/inputs/clickhouse"
_ "flashcat.cloud/categraf/inputs/cloudwatch"
_ "flashcat.cloud/categraf/inputs/conntrack"
_ "flashcat.cloud/categraf/inputs/consul"
_ "flashcat.cloud/categraf/inputs/cpu"
_ "flashcat.cloud/categraf/inputs/disk"
_ "flashcat.cloud/categraf/inputs/diskio"
Expand Down
26 changes: 26 additions & 0 deletions conf/input.consul/consul.toml
@@ -0,0 +1,26 @@
# # collect interval
# interval = 15

[[instances]]
## Consul server address
#address = "localhost:8500"

## URI scheme for the Consul server, one of "http", "https"
#scheme = "http"

## ACL token used in every request
# token = ""

## HTTP Basic Authentication username and password.
# username = ""
# password = ""

## Data center to query the health checks from
# datacenter = ""

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = true
59 changes: 59 additions & 0 deletions inputs/consul/README.md
@@ -0,0 +1,59 @@
# Consul Input Plugin

This plugin will collect statistics about all health checks registered in the
Consul. It uses [Consul API][1] to query the data. It will not report the
[telemetry][2] but Consul can report those stats already using StatsD protocol
if needed.

[1]: https://www.consul.io/docs/agent/http/health.html#health_state

[2]: https://www.consul.io/docs/agent/telemetry.html

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

## Configuration

```toml @sample.conf
# Gather health check statuses from services registered in Consul
[[instances]]
## Consul server address
# address = "localhost:8500"

## URI scheme for the Consul server, one of "http", "https"
# scheme = "http"

## ACL token used in every request
# token = ""

## HTTP Basic Authentication username and password.
# username = ""
# password = ""

## Data center to query the health checks from
# datacenter = ""

## Optional TLS Config
# tls_ca = "/etc/categraf/ca.pem"
# tls_cert = "/etc/categraf/cert.pem"
# tls_key = "/etc/categraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = true
```

## Example Output

```text
consul_up address=localhost:8500 agent_hostname=hostname 1
consul_health_service_status address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo status=passing 1
consul_health_service_status address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo status=warning 0
consul_health_service_status address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo status=critical 0
consul_health_service_status address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo status=maintenance 0
consul_health_checks_service_tag address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo tag=tag1 1
consul_health_checks_service_tag address=localhost:8500 agent_hostname=hostname check_id=service:demo check_name=Service 'demo' check node=localhost.localdomain service_id=demo service_name=demo tag=tag2 1
consul_scrape_use_seconds address=localhost:8500 agent_hostname=hostname 0.015674053
```
173 changes: 173 additions & 0 deletions inputs/consul/consul.go
@@ -0,0 +1,173 @@
package consul

import (
"log"
"net/http"
"time"

"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/tls"
"flashcat.cloud/categraf/types"

"github.com/hashicorp/consul/api"
)

const inputName = "consul"

type Instance struct {
Address string `toml:"address"`
Scheme string `toml:"scheme"`
Token string `toml:"token"`
Username string `toml:"username"`
Password string `toml:"password"`
Datacenter string `toml:"datacenter"`
tls.ClientConfig

// client used to connect to Consul agnet
client *api.Client

config.InstanceConfig
}

func (ins *Instance) Init() error {
conf := api.DefaultConfig()

if ins.Address == "" {
return types.ErrInstancesEmpty
}
conf.Address = ins.Address

if ins.Scheme != "" {
conf.Scheme = ins.Scheme
}

if ins.Datacenter != "" {
conf.Datacenter = ins.Datacenter
}

if ins.Token != "" {
conf.Token = ins.Token
}

if ins.Username != "" {
conf.HttpAuth = &api.HttpBasicAuth{
Username: ins.Username,
Password: ins.Password,
}
}

tlsCfg, err := ins.ClientConfig.TLSConfig()
if err != nil {
return err
}

conf.Transport = &http.Transport{
TLSClientConfig: tlsCfg,
}

client, err := api.NewClient(conf)
if err != nil {
return err
}
ins.client = client

return nil
}

type Consul struct {
config.PluginConfig
Instances []*Instance `toml:"instances"`
}

func init() {
inputs.Add(inputName, func() inputs.Input {
return &Consul{}
})
}

func (c *Consul) Clone() inputs.Input {
return &Consul{}
}

func (c *Consul) Name() string {
return inputName
}

func (c *Consul) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(c.Instances))
for i := 0; i < len(c.Instances); i++ {
ret[i] = c.Instances[i]
}
return ret
}

func (ins *Instance) Gather(slist *types.SampleList) {
tag := map[string]string{"address": ins.Address}
begun := time.Now()

// scrape use seconds
defer func(begun time.Time) {
use := time.Since(begun).Seconds()
slist.PushFront(types.NewSample(inputName, "scrape_use_seconds", use, tag))
}(begun)

checks, _, err := ins.client.Health().State("any", nil)
if err != nil {
slist.PushFront(types.NewSample(inputName, "up", 0, tag))
log.Println("E! failed to gather http target:", ins.Address, "error:", err)
return
}
slist.PushFront(types.NewSample(inputName, "up", 1, tag))

for _, check := range checks {
tags := make(map[string]string)
for k, v := range tag {
tags[k] = v
}
tags["check_id"] = check.CheckID
tags["check_name"] = check.Name
tags["node"] = check.Node

var passing, warning, critical, maintenance float64
switch check.Status {
case api.HealthPassing:
passing = 1
tags["status"] = api.HealthPassing
case api.HealthWarning:
warning = 1
tags["status"] = api.HealthWarning
case api.HealthCritical:
critical = 1
tags["status"] = api.HealthCritical
case api.HealthMaint:
maintenance = 1
tags["status"] = api.HealthMaint
}

if check.ServiceID == "" {
slist.PushFront(types.NewSample(inputName, "health_node_status", passing, tags))
slist.PushFront(types.NewSample(inputName, "health_node_status", warning, tags))
slist.PushFront(types.NewSample(inputName, "health_node_status", critical, tags))
slist.PushFront(types.NewSample(inputName, "health_node_status", maintenance, tags))
} else {
tags["service_id"] = check.ServiceID
tags["service_name"] = check.ServiceName
slist.PushFront(types.NewSample(inputName, "health_service_status", passing, tags))
slist.PushFront(types.NewSample(inputName, "health_service_status", warning, tags))
slist.PushFront(types.NewSample(inputName, "health_service_status", critical, tags))
slist.PushFront(types.NewSample(inputName, "health_service_status", maintenance, tags))
}

delete(tags, "status")

set := make(map[string]struct{})
for _, t := range check.ServiceTags {
if _, ok := set[t]; ok {
continue
}
slist.PushFront(types.NewSample(inputName, "health_checks_service_tag", 1, tags, map[string]string{"tag": t}))
set[t] = struct{}{}
}
}
}

0 comments on commit 356adbc

Please sign in to comment.