Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi input support for http provider #480

Merged
merged 16 commits into from Apr 26, 2023
95 changes: 75 additions & 20 deletions agent/metrics_agent.go
Expand Up @@ -4,10 +4,12 @@ import (
"errors"
"log"
"strings"
"sync"

"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs"
"flashcat.cloud/categraf/pkg/cfg"
"flashcat.cloud/categraf/pkg/checksum"
"flashcat.cloud/categraf/types"

// auto registry
Expand Down Expand Up @@ -74,15 +76,61 @@ import (

type MetricsAgent struct {
InputFilters map[string]struct{}
InputReaders map[string]*InputReader
InputReaders *Readers
InputProvider inputs.Provider
}

type Readers struct {
lock *sync.RWMutex
record map[string]map[checksum.Checksum]*InputReader
}

func NewReaders() *Readers {
return &Readers{
lock: new(sync.RWMutex),
record: make(map[string]map[checksum.Checksum]*InputReader),
}
}

func (r *Readers) Add(name string, sum checksum.Checksum, reader *InputReader) {
r.lock.Lock()
defer r.lock.Unlock()
if _, ok := r.record[name]; !ok {
r.record[name] = make(map[checksum.Checksum]*InputReader)
}
r.record[name][sum] = reader
}

func (r *Readers) Del(name string, sum checksum.Checksum) {
r.lock.Lock()
defer r.lock.Unlock()
if sum == 0 {
delete(r.record, name)
return
}
if _, ok := r.record[name]; ok {
delete(r.record[name], sum)
}
}

func (r *Readers) GetInput(name string) (map[checksum.Checksum]*InputReader, bool) {
r.lock.RLock()
defer r.lock.RUnlock()
m, has := r.record[name]
return m, has
}

func (r *Readers) Iter() map[string]map[checksum.Checksum]*InputReader {
r.lock.RLock()
defer r.lock.RUnlock()
return r.record
}

func NewMetricsAgent() AgentModule {
c := config.Config
agent := &MetricsAgent{
InputFilters: parseFilter(c.InputFilters),
InputReaders: make(map[string]*InputReader),
InputReaders: NewReaders(),
}

provider, err := inputs.NewProvider(c, agent)
Expand Down Expand Up @@ -119,7 +167,6 @@ func (ma *MetricsAgent) Start() error {
log.Println("I! no inputs")
return nil
}
ma.InputReaders = make(map[string]*InputReader)

for _, name := range names {
_, inputKey := inputs.ParseInputName(name)
Expand All @@ -140,8 +187,12 @@ func (ma *MetricsAgent) Start() error {

func (ma *MetricsAgent) Stop() error {
ma.InputProvider.StopReloader()
for name := range ma.InputReaders {
ma.InputReaders[name].Stop()
for name := range ma.InputReaders.Iter() {
inputs, _ := ma.InputReaders.GetInput(name)
for sum, r := range inputs {
r.Stop()
ma.InputReaders.Del(name, sum)
}
}
return nil
}
Expand All @@ -158,15 +209,19 @@ func (ma *MetricsAgent) RegisterInput(name string, configs []cfg.ConfigWithForma
return
}

// construct input instance
input := creator()

err := cfg.LoadConfigs(configs, input)
newInputs, err := ma.InputProvider.LoadInputConfig(configs, creator())
if err != nil {
log.Println("E! failed to load configuration of plugin:", name, "error:", err)
return
}

for sum, nInput := range newInputs {
ma.inputGo(name, sum, nInput)
}
}

func (ma *MetricsAgent) inputGo(name string, sum checksum.Checksum, input inputs.Input) {
var err error
if err = input.InitInternalConfig(); err != nil {
log.Println("E! failed to init input:", name, "error:", err)
return
Expand Down Expand Up @@ -199,6 +254,7 @@ func (ma *MetricsAgent) RegisterInput(name string, configs []cfg.ConfigWithForma

if empty {
if config.Config.DebugMode {
_, inputKey := inputs.ParseInputName(name)
log.Printf("W! no instances for input:%s", inputKey)
}
return
Expand All @@ -207,25 +263,24 @@ func (ma *MetricsAgent) RegisterInput(name string, configs []cfg.ConfigWithForma

reader := newInputReader(name, input)
go reader.startInput()
ma.InputReaders[name] = reader
ma.InputReaders.Add(name, sum, reader)
log.Println("I! input:", name, "started")
}

func (ma *MetricsAgent) DeregisterInput(name string) {
if reader, has := ma.InputReaders[name]; has {
reader.Stop()
delete(ma.InputReaders, name)
log.Println("I! input:", name, "stopped")
func (ma *MetricsAgent) DeregisterInput(name string, sum checksum.Checksum) {
if inputs, has := ma.InputReaders.GetInput(name); has {
for isum, input := range inputs {
if sum == 0 || sum == isum {
input.Stop()
}
}
ma.InputReaders.Del(name, sum)
log.Printf("I! input: %s[%d] stopped", name, sum)
} else {
log.Printf("W! dereigster input name [%s] not found", name)
}
}

func (ma *MetricsAgent) ReregisterInput(name string, configs []cfg.ConfigWithFormat) {
ma.DeregisterInput(name)
ma.RegisterInput(name, configs)
}

func parseFilter(filterStr string) map[string]struct{} {
filters := strings.Split(filterStr, ":")
filtermap := make(map[string]struct{})
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/heartbeat.go
Expand Up @@ -173,7 +173,7 @@ func memUsage(ps *system.SystemPS) float64 {
return 0
}

return 100 - 100*float64(vm.Available)/float64(vm.Total)
return vm.UsedPercent
}

func cpuUsage(ps *system.SystemPS) float64 {
Expand Down
8 changes: 8 additions & 0 deletions inputs/aliyun/cloud.go
Expand Up @@ -95,6 +95,14 @@ func init() {
})
}

func (a *Aliyun) Clone() inputs.Input {
return &Aliyun{}
}

func (a *Aliyun) Name() string {
return inputName
}

var _ inputs.SampleGatherer = new(Instance)
var _ inputs.Input = new(Aliyun)
var _ inputs.InstancesGetter = new(Aliyun)
Expand Down
8 changes: 8 additions & 0 deletions inputs/arp_packet/arp_packet.go
Expand Up @@ -32,6 +32,14 @@ func init() {
})
}

func (r *ArpPacket) Clone() inputs.Input {
return &ArpPacket{}
}

func (r *ArpPacket) Name() string {
return inputName
}

func (r *ArpPacket) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(r.Instances))
for i := 0; i < len(r.Instances); i++ {
Expand Down
8 changes: 8 additions & 0 deletions inputs/cloudwatch/cloudwatch.go
Expand Up @@ -552,6 +552,14 @@ func init() {
})
}

func (c *CloudWatch) Clone() inputs.Input {
return &CloudWatch{}
}

func (c *CloudWatch) Name() string {
return inputName
}

func sanitizeMeasurement(namespace string) string {
namespace = strings.ReplaceAll(namespace, "/", "_")
namespace = snakeCase(namespace)
Expand Down
12 changes: 11 additions & 1 deletion inputs/conntrack/conntrack.go
Expand Up @@ -15,6 +15,8 @@ import (
"flashcat.cloud/categraf/types"
)

const inputName = "conntrack"

type Conntrack struct {
config.PluginConfig
Dirs []string `toml:"dirs"`
Expand All @@ -35,11 +37,19 @@ var dfltFiles = []string{
}

func init() {
inputs.Add("conntrack", func() inputs.Input {
inputs.Add(inputName, func() inputs.Input {
return &Conntrack{}
})
}

func (c *Conntrack) Clone() inputs.Input {
return &Conntrack{}
}

func (c *Conntrack) Name() string {
return inputName
}

func (c *Conntrack) setDefaults() {
if len(c.Dirs) == 0 {
c.Dirs = dfltDirs
Expand Down
17 changes: 14 additions & 3 deletions inputs/cpu/cpu.go
Expand Up @@ -11,6 +11,8 @@ import (
"flashcat.cloud/categraf/types"
)

const inputName = "cpu"

type CPUStats struct {
ps system.PS
lastStats map[string]cpuUtil.TimesStat
Expand All @@ -20,14 +22,23 @@ type CPUStats struct {
}

func init() {
ps := system.NewSystemPS()
inputs.Add("cpu", func() inputs.Input {
inputs.Add(inputName, func() inputs.Input {
return &CPUStats{
ps: ps,
ps: system.NewSystemPS(),
}
})
}

func (c *CPUStats) Clone() inputs.Input {
return &CPUStats{
ps: system.NewSystemPS(),
}
}

func (c *CPUStats) Name() string {
return inputName
}

func (c *CPUStats) Gather(slist *types.SampleList) {
times, err := c.ps.CPUTimes(c.CollectPerCPU, true)
if err != nil {
Expand Down
17 changes: 14 additions & 3 deletions inputs/disk/disk.go
Expand Up @@ -11,6 +11,8 @@ import (
"flashcat.cloud/categraf/types"
)

const inputName = "disk"

type DiskStats struct {
ps system.PS

Expand All @@ -21,14 +23,23 @@ type DiskStats struct {
}

func init() {
ps := system.NewSystemPS()
inputs.Add("disk", func() inputs.Input {
inputs.Add(inputName, func() inputs.Input {
return &DiskStats{
ps: ps,
ps: system.NewSystemPS(),
}
})
}

func (s *DiskStats) Clone() inputs.Input {
return &DiskStats{
ps: system.NewSystemPS(),
}
}

func (s *DiskStats) Name() string {
return inputName
}

func (s *DiskStats) Gather(slist *types.SampleList) {
disks, partitions, err := s.ps.DiskUsage(s.MountPoints, s.IgnoreFS)
if err != nil {
Expand Down
17 changes: 14 additions & 3 deletions inputs/diskio/diskio.go
Expand Up @@ -11,6 +11,8 @@ import (
"flashcat.cloud/categraf/types"
)

const inputName = "diskio"

type DiskIO struct {
ps system.PS

Expand All @@ -20,14 +22,23 @@ type DiskIO struct {
}

func init() {
ps := system.NewSystemPS()
inputs.Add("diskio", func() inputs.Input {
inputs.Add(inputName, func() inputs.Input {
return &DiskIO{
ps: ps,
ps: system.NewSystemPS(),
}
})
}

func (d *DiskIO) Clone() inputs.Input {
return &DiskIO{
ps: system.NewSystemPS(),
}
}

func (c *DiskIO) Name() string {
return inputName
}

func (d *DiskIO) Init() error {
for _, device := range d.Devices {
if filter.HasMeta(device) {
Expand Down
12 changes: 11 additions & 1 deletion inputs/dns_query/dns_query.go
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/miekg/dns"
)

const inputName = "dns_query"

type ResultType uint64

const (
Expand All @@ -29,11 +31,19 @@ type DnsQuery struct {
}

func init() {
inputs.Add("dns_query", func() inputs.Input {
inputs.Add(inputName, func() inputs.Input {
return &DnsQuery{}
})
}

func (dq *DnsQuery) Clone() inputs.Input {
return &DnsQuery{}
}

func (c *DnsQuery) Name() string {
return inputName
}

func (dq *DnsQuery) GetInstances() []inputs.Instance {
ret := make([]inputs.Instance, len(dq.Instances))
for i := 0; i < len(dq.Instances); i++ {
Expand Down