diff --git a/agent/metrics_agent.go b/agent/metrics_agent.go index 70fb553f..66ef4604 100644 --- a/agent/metrics_agent.go +++ b/agent/metrics_agent.go @@ -4,10 +4,12 @@ import ( "errors" "log" "strings" + "sync" "flashcat.cloud/categraf/config" "flashcat.cloud/categraf/inputs" "flashcat.cloud/categraf/pkg/cfg" + "flashcat.cloud/categraf/pkg/checksum" "flashcat.cloud/categraf/types" // auto registry @@ -74,15 +76,61 @@ import ( type MetricsAgent struct { InputFilters map[string]struct{} - InputReaders map[string]*InputReader + InputReaders *Readers InputProvider inputs.Provider } +type Readers struct { + lock *sync.RWMutex + record map[string]map[checksum.Checksum]*InputReader +} + +func NewReaders() *Readers { + return &Readers{ + lock: new(sync.RWMutex), + record: make(map[string]map[checksum.Checksum]*InputReader), + } +} + +func (r *Readers) Add(name string, sum checksum.Checksum, reader *InputReader) { + r.lock.Lock() + defer r.lock.Unlock() + if _, ok := r.record[name]; !ok { + r.record[name] = make(map[checksum.Checksum]*InputReader) + } + r.record[name][sum] = reader +} + +func (r *Readers) Del(name string, sum checksum.Checksum) { + r.lock.Lock() + defer r.lock.Unlock() + if sum == 0 { + delete(r.record, name) + return + } + if _, ok := r.record[name]; ok { + delete(r.record[name], sum) + } +} + +func (r *Readers) GetInput(name string) (map[checksum.Checksum]*InputReader, bool) { + r.lock.RLock() + defer r.lock.RUnlock() + m, has := r.record[name] + return m, has +} + +func (r *Readers) Iter() map[string]map[checksum.Checksum]*InputReader { + r.lock.RLock() + defer r.lock.RUnlock() + return r.record +} + func NewMetricsAgent() AgentModule { c := config.Config agent := &MetricsAgent{ InputFilters: parseFilter(c.InputFilters), - InputReaders: make(map[string]*InputReader), + InputReaders: NewReaders(), } provider, err := inputs.NewProvider(c, agent) @@ -119,7 +167,6 @@ func (ma *MetricsAgent) Start() error { log.Println("I! no inputs") return nil } - ma.InputReaders = make(map[string]*InputReader) for _, name := range names { _, inputKey := inputs.ParseInputName(name) @@ -140,8 +187,12 @@ func (ma *MetricsAgent) Start() error { func (ma *MetricsAgent) Stop() error { ma.InputProvider.StopReloader() - for name := range ma.InputReaders { - ma.InputReaders[name].Stop() + for name := range ma.InputReaders.Iter() { + inputs, _ := ma.InputReaders.GetInput(name) + for sum, r := range inputs { + r.Stop() + ma.InputReaders.Del(name, sum) + } } return nil } @@ -158,15 +209,19 @@ func (ma *MetricsAgent) RegisterInput(name string, configs []cfg.ConfigWithForma return } - // construct input instance - input := creator() - - err := cfg.LoadConfigs(configs, input) + newInputs, err := ma.InputProvider.LoadInputConfig(configs, creator()) if err != nil { log.Println("E! failed to load configuration of plugin:", name, "error:", err) return } + for sum, nInput := range newInputs { + ma.inputGo(name, sum, nInput) + } +} + +func (ma *MetricsAgent) inputGo(name string, sum checksum.Checksum, input inputs.Input) { + var err error if err = input.InitInternalConfig(); err != nil { log.Println("E! failed to init input:", name, "error:", err) return @@ -199,6 +254,7 @@ func (ma *MetricsAgent) RegisterInput(name string, configs []cfg.ConfigWithForma if empty { if config.Config.DebugMode { + _, inputKey := inputs.ParseInputName(name) log.Printf("W! no instances for input:%s", inputKey) } return @@ -207,25 +263,24 @@ func (ma *MetricsAgent) RegisterInput(name string, configs []cfg.ConfigWithForma reader := newInputReader(name, input) go reader.startInput() - ma.InputReaders[name] = reader + ma.InputReaders.Add(name, sum, reader) log.Println("I! input:", name, "started") } -func (ma *MetricsAgent) DeregisterInput(name string) { - if reader, has := ma.InputReaders[name]; has { - reader.Stop() - delete(ma.InputReaders, name) - log.Println("I! input:", name, "stopped") +func (ma *MetricsAgent) DeregisterInput(name string, sum checksum.Checksum) { + if inputs, has := ma.InputReaders.GetInput(name); has { + for isum, input := range inputs { + if sum == 0 || sum == isum { + input.Stop() + } + } + ma.InputReaders.Del(name, sum) + log.Printf("I! input: %s[%d] stopped", name, sum) } else { log.Printf("W! dereigster input name [%s] not found", name) } } -func (ma *MetricsAgent) ReregisterInput(name string, configs []cfg.ConfigWithFormat) { - ma.DeregisterInput(name) - ma.RegisterInput(name, configs) -} - func parseFilter(filterStr string) map[string]struct{} { filters := strings.Split(filterStr, ":") filtermap := make(map[string]struct{}) diff --git a/heartbeat/heartbeat.go b/heartbeat/heartbeat.go index 95463e61..20b5feea 100644 --- a/heartbeat/heartbeat.go +++ b/heartbeat/heartbeat.go @@ -173,7 +173,7 @@ func memUsage(ps *system.SystemPS) float64 { return 0 } - return 100 - 100*float64(vm.Available)/float64(vm.Total) + return vm.UsedPercent } func cpuUsage(ps *system.SystemPS) float64 { diff --git a/inputs/aliyun/cloud.go b/inputs/aliyun/cloud.go index 2334a25c..2243e31b 100644 --- a/inputs/aliyun/cloud.go +++ b/inputs/aliyun/cloud.go @@ -95,6 +95,14 @@ func init() { }) } +func (a *Aliyun) Clone() inputs.Input { + return &Aliyun{} +} + +func (a *Aliyun) Name() string { + return inputName +} + var _ inputs.SampleGatherer = new(Instance) var _ inputs.Input = new(Aliyun) var _ inputs.InstancesGetter = new(Aliyun) diff --git a/inputs/arp_packet/arp_packet.go b/inputs/arp_packet/arp_packet.go index 6f1206a1..a84077c0 100644 --- a/inputs/arp_packet/arp_packet.go +++ b/inputs/arp_packet/arp_packet.go @@ -32,6 +32,14 @@ func init() { }) } +func (r *ArpPacket) Clone() inputs.Input { + return &ArpPacket{} +} + +func (r *ArpPacket) Name() string { + return inputName +} + func (r *ArpPacket) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/cloudwatch/cloudwatch.go b/inputs/cloudwatch/cloudwatch.go index 3e76424a..e5df87a9 100644 --- a/inputs/cloudwatch/cloudwatch.go +++ b/inputs/cloudwatch/cloudwatch.go @@ -552,6 +552,14 @@ func init() { }) } +func (c *CloudWatch) Clone() inputs.Input { + return &CloudWatch{} +} + +func (c *CloudWatch) Name() string { + return inputName +} + func sanitizeMeasurement(namespace string) string { namespace = strings.ReplaceAll(namespace, "/", "_") namespace = snakeCase(namespace) diff --git a/inputs/conntrack/conntrack.go b/inputs/conntrack/conntrack.go index d800d0bb..56779a4b 100644 --- a/inputs/conntrack/conntrack.go +++ b/inputs/conntrack/conntrack.go @@ -15,6 +15,8 @@ import ( "flashcat.cloud/categraf/types" ) +const inputName = "conntrack" + type Conntrack struct { config.PluginConfig Dirs []string `toml:"dirs"` @@ -35,11 +37,19 @@ var dfltFiles = []string{ } func init() { - inputs.Add("conntrack", func() inputs.Input { + inputs.Add(inputName, func() inputs.Input { return &Conntrack{} }) } +func (c *Conntrack) Clone() inputs.Input { + return &Conntrack{} +} + +func (c *Conntrack) Name() string { + return inputName +} + func (c *Conntrack) setDefaults() { if len(c.Dirs) == 0 { c.Dirs = dfltDirs diff --git a/inputs/cpu/cpu.go b/inputs/cpu/cpu.go index 0edd32af..0fdd9e4a 100644 --- a/inputs/cpu/cpu.go +++ b/inputs/cpu/cpu.go @@ -11,6 +11,8 @@ import ( "flashcat.cloud/categraf/types" ) +const inputName = "cpu" + type CPUStats struct { ps system.PS lastStats map[string]cpuUtil.TimesStat @@ -20,14 +22,23 @@ type CPUStats struct { } func init() { - ps := system.NewSystemPS() - inputs.Add("cpu", func() inputs.Input { + inputs.Add(inputName, func() inputs.Input { return &CPUStats{ - ps: ps, + ps: system.NewSystemPS(), } }) } +func (c *CPUStats) Clone() inputs.Input { + return &CPUStats{ + ps: system.NewSystemPS(), + } +} + +func (c *CPUStats) Name() string { + return inputName +} + func (c *CPUStats) Gather(slist *types.SampleList) { times, err := c.ps.CPUTimes(c.CollectPerCPU, true) if err != nil { diff --git a/inputs/disk/disk.go b/inputs/disk/disk.go index 03d4a6cb..3463b957 100644 --- a/inputs/disk/disk.go +++ b/inputs/disk/disk.go @@ -11,6 +11,8 @@ import ( "flashcat.cloud/categraf/types" ) +const inputName = "disk" + type DiskStats struct { ps system.PS @@ -21,14 +23,23 @@ type DiskStats struct { } func init() { - ps := system.NewSystemPS() - inputs.Add("disk", func() inputs.Input { + inputs.Add(inputName, func() inputs.Input { return &DiskStats{ - ps: ps, + ps: system.NewSystemPS(), } }) } +func (s *DiskStats) Clone() inputs.Input { + return &DiskStats{ + ps: system.NewSystemPS(), + } +} + +func (s *DiskStats) Name() string { + return inputName +} + func (s *DiskStats) Gather(slist *types.SampleList) { disks, partitions, err := s.ps.DiskUsage(s.MountPoints, s.IgnoreFS) if err != nil { diff --git a/inputs/diskio/diskio.go b/inputs/diskio/diskio.go index 9d5121f3..ba80a9e0 100644 --- a/inputs/diskio/diskio.go +++ b/inputs/diskio/diskio.go @@ -11,6 +11,8 @@ import ( "flashcat.cloud/categraf/types" ) +const inputName = "diskio" + type DiskIO struct { ps system.PS @@ -20,14 +22,23 @@ type DiskIO struct { } func init() { - ps := system.NewSystemPS() - inputs.Add("diskio", func() inputs.Input { + inputs.Add(inputName, func() inputs.Input { return &DiskIO{ - ps: ps, + ps: system.NewSystemPS(), } }) } +func (d *DiskIO) Clone() inputs.Input { + return &DiskIO{ + ps: system.NewSystemPS(), + } +} + +func (c *DiskIO) Name() string { + return inputName +} + func (d *DiskIO) Init() error { for _, device := range d.Devices { if filter.HasMeta(device) { diff --git a/inputs/dns_query/dns_query.go b/inputs/dns_query/dns_query.go index 21273ca0..6926c1c7 100644 --- a/inputs/dns_query/dns_query.go +++ b/inputs/dns_query/dns_query.go @@ -15,6 +15,8 @@ import ( "github.com/miekg/dns" ) +const inputName = "dns_query" + type ResultType uint64 const ( @@ -29,11 +31,19 @@ type DnsQuery struct { } func init() { - inputs.Add("dns_query", func() inputs.Input { + inputs.Add(inputName, func() inputs.Input { return &DnsQuery{} }) } +func (dq *DnsQuery) Clone() inputs.Input { + return &DnsQuery{} +} + +func (c *DnsQuery) Name() string { + return inputName +} + func (dq *DnsQuery) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(dq.Instances)) for i := 0; i < len(dq.Instances); i++ { diff --git a/inputs/docker/docker.go b/inputs/docker/docker.go index 7fe63c82..4945e42e 100644 --- a/inputs/docker/docker.go +++ b/inputs/docker/docker.go @@ -24,6 +24,8 @@ import ( itypes "flashcat.cloud/categraf/types" ) +const inputName = "docker" + // KB, MB, GB, TB, PB...human friendly const ( KB = 1000 @@ -45,11 +47,19 @@ type Docker struct { } func init() { - inputs.Add("docker", func() inputs.Input { + inputs.Add(inputName, func() inputs.Input { return &Docker{} }) } +func (d *Docker) Clone() inputs.Input { + return &Docker{} +} + +func (c Docker) Name() string { + return inputName +} + func (d *Docker) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(d.Instances)) for i := 0; i < len(d.Instances); i++ { diff --git a/inputs/elasticsearch/elasticsearch.go b/inputs/elasticsearch/elasticsearch.go index 3743cb88..d37bdd5e 100644 --- a/inputs/elasticsearch/elasticsearch.go +++ b/inputs/elasticsearch/elasticsearch.go @@ -19,6 +19,8 @@ import ( "flashcat.cloud/categraf/types" ) +const inputName = "elasticsearch" + // Nodestats are always generated, so simply define a constant for these endpoints const statsPath = "/_nodes/stats" const statsPathLocal = "/_nodes/_local/stats" @@ -89,11 +91,19 @@ type Elasticsearch struct { } func init() { - inputs.Add("elasticsearch", func() inputs.Input { + inputs.Add(inputName, func() inputs.Input { return &Elasticsearch{} }) } +func (r *Elasticsearch) Clone() inputs.Input { + return &Elasticsearch{} +} + +func (c *Elasticsearch) Name() string { + return inputName +} + func (r *Elasticsearch) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/exec/exec.go b/inputs/exec/exec.go index fd27a202..c01940d6 100644 --- a/inputs/exec/exec.go +++ b/inputs/exec/exec.go @@ -22,6 +22,8 @@ import ( "flashcat.cloud/categraf/types" ) +const inputName = "exec" + const MaxStderrBytes int = 512 type Instance struct { @@ -39,11 +41,19 @@ type Exec struct { } func init() { - inputs.Add("exec", func() inputs.Input { + inputs.Add(inputName, func() inputs.Input { return &Exec{} }) } +func (e *Exec) Clone() inputs.Input { + return &Exec{} +} + +func (c *Exec) Name() string { + return inputName +} + func (e *Exec) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(e.Instances)) for i := 0; i < len(e.Instances); i++ { diff --git a/inputs/greenplum/greenplum.go b/inputs/greenplum/greenplum.go index b66254a8..64c5b45b 100644 --- a/inputs/greenplum/greenplum.go +++ b/inputs/greenplum/greenplum.go @@ -2,8 +2,8 @@ package greenplum import ( "log" - "strings" "os/exec" + "strings" "flashcat.cloud/categraf/config" "flashcat.cloud/categraf/inputs" @@ -22,6 +22,14 @@ func init() { }) } +func (e *Greenplum) Clone() inputs.Input { + return &Greenplum{} +} + +func (e *Greenplum) Name() string { + return inputName +} + func (ins *Greenplum) Gather(slist *types.SampleList) { var tags = map[string]string{} bin, err := exec.LookPath("gpstate") @@ -64,4 +72,4 @@ func (ins *Greenplum) Gather(slist *types.SampleList) { slist.PushSample(inputName, "Status", status, gptags) slist.PushSample(inputName, "Data_Status", dataStatus, gptags) } -} \ No newline at end of file +} diff --git a/inputs/haproxy/haproxy.go b/inputs/haproxy/haproxy.go index e78e67da..642aaf0e 100644 --- a/inputs/haproxy/haproxy.go +++ b/inputs/haproxy/haproxy.go @@ -23,6 +23,14 @@ func init() { }) } +func (r *HAProxy) Clone() inputs.Input { + return &HAProxy{} +} + +func (r *HAProxy) Name() string { + return inputName +} + func (r *HAProxy) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/http_provider.go b/inputs/http_provider.go new file mode 100644 index 00000000..eb06f6e9 --- /dev/null +++ b/inputs/http_provider.go @@ -0,0 +1,371 @@ +package inputs + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strings" + "sync" + "time" + + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/pkg/cfg" + "flashcat.cloud/categraf/pkg/checksum" + "flashcat.cloud/categraf/pkg/tls" +) + +// HTTPProvider provider a mechanism to get config from remote http server at a fixed interval +// If input config is changed, the provider will reload the input without reload whole agent +type ( + HTTPProvider struct { + sync.RWMutex + + RemoteUrl string + Headers []string + AuthUsername string + AuthPassword string + + Timeout int + ReloadInterval int + + tls.ClientConfig + client *http.Client + stopCh chan struct{} + op InputOperation + + configMap map[string][]cfg.ConfigWithFormat + version string + + cache *innerCache + add *innerCache + del *innerCache + } + innerCache struct { + lock *sync.RWMutex + record map[string]map[checksum.Checksum]cfg.ConfigWithFormat + } +) + +func newInnerCache() *innerCache { + return &innerCache{ + lock: &sync.RWMutex{}, + record: make(map[string]map[checksum.Checksum]cfg.ConfigWithFormat), + } +} + +func (ic *innerCache) get(inputName string) (map[checksum.Checksum]cfg.ConfigWithFormat, bool) { + ic.lock.RLock() + defer ic.lock.RUnlock() + + m, has := ic.record[inputName] + return m, has +} + +func (ic innerCache) put(inputName string, sum checksum.Checksum, config cfg.ConfigWithFormat) { + ic.lock.Lock() + defer ic.lock.Unlock() + if ic.record[inputName] == nil { + ic.record[inputName] = make(map[checksum.Checksum]cfg.ConfigWithFormat) + } + ic.record[inputName][sum] = config +} + +func (ic *innerCache) del(inputName string, sum checksum.Checksum) { + ic.lock.Lock() + defer ic.lock.Unlock() + if sum == 0 { + delete(ic.record, inputName) + return + } + if _, ok := ic.record[inputName]; ok { + delete(ic.record[inputName], sum) + } +} + +func (ic *innerCache) iter() map[string]map[checksum.Checksum]cfg.ConfigWithFormat { + ic.lock.RLock() + defer ic.lock.RUnlock() + return ic.record +} + +func (ic *innerCache) len() int { + ic.lock.Lock() + defer ic.lock.Unlock() + return len(ic.record) +} + +type httpProviderResponse struct { + // version is signature/md5 of current Config, server side should deal with the Version calculate + Version string `json:"version"` + + // ConfigMap (InputName -> Config), if version is identical, server side can set Config to nil + Configs map[string][]cfg.ConfigWithFormat `json:"configs"` +} + +func (hrp *HTTPProvider) Name() string { + return "http" +} + +func newHTTPProvider(c *config.ConfigType, op InputOperation) (*HTTPProvider, error) { + if c.HTTPProviderConfig == nil { + return nil, fmt.Errorf("no http provider config found") + } + + provider := &HTTPProvider{ + RemoteUrl: c.HTTPProviderConfig.RemoteUrl, + Headers: c.HTTPProviderConfig.Headers, + AuthUsername: c.HTTPProviderConfig.AuthUsername, + AuthPassword: c.HTTPProviderConfig.AuthPassword, + ClientConfig: c.HTTPProviderConfig.ClientConfig, + Timeout: c.HTTPProviderConfig.Timeout, + ReloadInterval: c.HTTPProviderConfig.ReloadInterval, + stopCh: make(chan struct{}, 1), + op: op, + cache: newInnerCache(), + } + + if err := provider.check(); err != nil { + return nil, err + } + + return provider, nil +} + +func (hrp *HTTPProvider) check() error { + if hrp.Timeout <= 0 { + hrp.Timeout = 5 + } + + if hrp.ReloadInterval <= 0 { + hrp.ReloadInterval = 120 + } + + if !strings.HasPrefix(hrp.RemoteUrl, "http") { + return fmt.Errorf("http provider: bad remote url config: %s", hrp.RemoteUrl) + } + + tlsc, err := hrp.TLSConfig() + if err != nil { + return err + } + + hrp.client = &http.Client{ + Timeout: time.Duration(hrp.Timeout) * time.Second, + Transport: &http.Transport{ + TLSClientConfig: tlsc, + }, + } + + return nil +} + +func (hrp *HTTPProvider) doReq() (*httpProviderResponse, error) { + req, err := http.NewRequest("GET", hrp.RemoteUrl, nil) + if err != nil { + log.Println("E! http provider: build reload config request error:", err) + return nil, err + } + + for i := 0; i < len(hrp.Headers); i += 2 { + req.Header.Add(hrp.Headers[i], hrp.Headers[i+1]) + if hrp.Headers[i] == "Host" { + req.Host = hrp.Headers[i+1] + } + } + + if hrp.AuthUsername != "" || hrp.AuthPassword != "" { + req.SetBasicAuth(hrp.AuthUsername, hrp.AuthPassword) + } + + // build query parameters + q := req.URL.Query() + for k, v := range config.Config.Global.Labels { + q.Add(k, v) + } + q.Add("timestamp", fmt.Sprint(time.Now().Unix())) + q.Add("version", hrp.version) + q.Add("agent_hostname", config.Config.GetHostname()) + req.URL.RawQuery = q.Encode() + + resp, err := hrp.client.Do(req) + if err != nil { + log.Println("E! http provider: request reload config error:", err) + return nil, err + } + defer resp.Body.Close() + respData, err := io.ReadAll(resp.Body) + if err != nil { + log.Println("E! http provider: request reload config error:", err) + return nil, err + } + + confResp := &httpProviderResponse{} + err = json.Unmarshal(respData, confResp) + if err != nil { + log.Println("E! http provider: unmarshal result error:", err) + return nil, err + } + + // set checksum for each config + for k, v := range confResp.Configs { + for kk, vv := range v { + confResp.Configs[k][kk].SetCheckSum(checksum.New(vv.Config)) + } + } + + return confResp, nil +} + +func (hrp *HTTPProvider) LoadConfig() (bool, error) { + log.Println("I! http provider: start reload config from remote:", hrp.RemoteUrl) + + confResp, err := hrp.doReq() + if err != nil { + log.Printf("W! http provider: request remote err: [%+v]", err) + return false, err + } + + // if config version is identical, means config is not changed + if confResp.Version == hrp.version { + return false, nil + } + log.Printf("I! remote version:%s, current version:%s", confResp.Version, hrp.version) + + // delete empty entries + for k, v := range confResp.Configs { + if len(v) == 0 { + delete(confResp.Configs, k) + } + } + + hrp.caculateDiff(confResp.Configs) + changed := hrp.add.len()+hrp.del.len() > 0 + if changed { + hrp.Lock() + hrp.configMap = confResp.Configs + hrp.version = confResp.Version + hrp.Unlock() + } + + return changed, nil +} + +func (hrp *HTTPProvider) StartReloader() { + go func() { + for { + select { + case <-time.After(time.Duration(hrp.ReloadInterval) * time.Second): + changed, err := hrp.LoadConfig() + if err != nil { + continue + } + if changed { + if hrp.add.len() > 0 { + log.Println("I! http provider: new or updated inputs:", hrp.add) + for inputKey, cm := range hrp.add.iter() { + for _, conf := range cm { + hrp.op.RegisterInput(FormatInputName(hrp.Name(), inputKey), []cfg.ConfigWithFormat{conf}) + } + } + } + + if hrp.del.len() > 0 { + log.Println("I! http provider: deleted inputs:", hrp.del) + for inputKey, cm := range hrp.del.iter() { + for sum := range cm { + hrp.op.DeregisterInput(FormatInputName(hrp.Name(), inputKey), sum) + } + } + } + + } + case <-hrp.stopCh: + return + } + } + }() +} + +func (hrp *HTTPProvider) StopReloader() { + hrp.stopCh <- struct{}{} +} + +func (hrp *HTTPProvider) GetInputs() ([]string, error) { + hrp.RLock() + defer hrp.RUnlock() + + inputs := make([]string, 0, len(hrp.configMap)) + for k := range hrp.configMap { + inputs = append(inputs, k) + } + + return inputs, nil +} + +func (hrp *HTTPProvider) GetInputConfig(inputKey string) ([]cfg.ConfigWithFormat, error) { + hrp.RLock() + defer hrp.RUnlock() + + if configs, has := hrp.configMap[inputKey]; has { + return configs, nil + } + + return nil, nil +} + +func (hrp *HTTPProvider) caculateDiff(newConfigs map[string][]cfg.ConfigWithFormat) { + hrp.add = newInnerCache() + hrp.del = newInnerCache() + cache := newInnerCache() + for inputKey, configs := range newConfigs { + for _, config := range configs { + cache.put(inputKey, config.CheckSum(), config) + } + } + + for inputKey, configMap := range cache.iter() { + if oldConfigMap, has := hrp.cache.get(inputKey); has { + new := NewSet().Load(configMap) + add, del := new.Diff(NewSet().Load(oldConfigMap)) + for sum := range add { + hrp.add.put(inputKey, sum, configMap[sum]) + } + for sum := range del { + hrp.del.put(inputKey, sum, oldConfigMap[sum]) + } + } else { + for _, config := range configMap { + hrp.add.put(inputKey, config.CheckSum(), config) + } + } + } + + for inputKey, configMap := range hrp.cache.iter() { + if _, has := cache.get(inputKey); !has { + for _, config := range configMap { + hrp.del.put(inputKey, config.CheckSum(), config) + } + } + } + if hrp.add.len()+hrp.del.len() > 0 { + hrp.Lock() + hrp.cache = cache + hrp.Unlock() + } + +} + +func (hrp *HTTPProvider) LoadInputConfig(configs []cfg.ConfigWithFormat, input Input) (map[checksum.Checksum]Input, error) { + inputs := make(map[checksum.Checksum]Input) + for _, c := range configs { + nInput := input.Clone() + err := cfg.LoadSingleConfig(c, nInput) + if err != nil { + return nil, err + } + inputs[c.CheckSum()] = nInput + } + return inputs, nil +} diff --git a/inputs/http_response/http_response.go b/inputs/http_response/http_response.go index d15b9bfa..f08c9d96 100644 --- a/inputs/http_response/http_response.go +++ b/inputs/http_response/http_response.go @@ -145,6 +145,14 @@ func init() { }) } +func (h *HTTPResponse) Clone() inputs.Input { + return &HTTPResponse{} +} + +func (h *HTTPResponse) Name() string { + return inputName +} + func (h *HTTPResponse) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(h.Instances)) for i := 0; i < len(h.Instances); i++ { diff --git a/inputs/inputs.go b/inputs/inputs.go index a8dd6164..51ed51a0 100644 --- a/inputs/inputs.go +++ b/inputs/inputs.go @@ -47,12 +47,20 @@ func MayGetInstances(t interface{}) []Instance { return nil } -type Input interface { - GetLabels() map[string]string - GetInterval() config.Duration - InitInternalConfig() error - Process(*types.SampleList) *types.SampleList -} +type ( + Cloneable interface { + Clone() Input + } + + Input interface { + Cloneable + Name() string + GetLabels() map[string]string + GetInterval() config.Duration + InitInternalConfig() error + Process(*types.SampleList) *types.SampleList + } +) type Creator func() Input diff --git a/inputs/ipvs/ipvs_linux_amd64.go b/inputs/ipvs/ipvs_linux_amd64.go index 3f57bf7e..785072af 100644 --- a/inputs/ipvs/ipvs_linux_amd64.go +++ b/inputs/ipvs/ipvs_linux_amd64.go @@ -29,6 +29,14 @@ func init() { }) } +func (i *IPVS) Clone() inputs.Input { + return &IPVS{} +} + +func (i *IPVS) Name() string { + return inputName +} + // Gather gathers the stats func (i *IPVS) Gather(slist *types.SampleList) error { if i.handle == nil { diff --git a/inputs/jenkins/jenkins.go b/inputs/jenkins/jenkins.go index ab343958..06fdf7fb 100644 --- a/inputs/jenkins/jenkins.go +++ b/inputs/jenkins/jenkins.go @@ -32,6 +32,14 @@ func init() { }) } +func (j *Jenkins) Clone() inputs.Input { + return &Jenkins{} +} + +func (j *Jenkins) Name() string { + return inputName +} + func (j *Jenkins) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(j.Instances)) for i := 0; i < len(j.Instances); i++ { @@ -94,7 +102,7 @@ func (ins *Instance) Gather(slist *types.SampleList) { ins.gatherJobs(slist) } -///////////////////////////////////////////////////////////// +// /////////////////////////////////////////////////////////// // measurement const ( @@ -455,7 +463,7 @@ func mapResultCode(s string) int { return -1 } -////////////////////////////////////////////////////// +// //////////////////////////////////////////////////// type client struct { baseURL string diff --git a/inputs/jolokia_agent/jolokia_agent.go b/inputs/jolokia_agent/jolokia_agent.go index 8e2aea11..f54f9941 100644 --- a/inputs/jolokia_agent/jolokia_agent.go +++ b/inputs/jolokia_agent/jolokia_agent.go @@ -26,6 +26,14 @@ func init() { }) } +func (r *JolokiaAgent) Clone() inputs.Input { + return &JolokiaAgent{} +} + +func (r *JolokiaAgent) Name() string { + return inputName +} + func (r *JolokiaAgent) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/jolokia_proxy/jolokia_proxy.go b/inputs/jolokia_proxy/jolokia_proxy.go index 3f4a2c28..475730a1 100644 --- a/inputs/jolokia_proxy/jolokia_proxy.go +++ b/inputs/jolokia_proxy/jolokia_proxy.go @@ -25,6 +25,14 @@ func init() { }) } +func (r *JolokiaProxy) Clone() inputs.Input { + return &JolokiaProxy{} +} + +func (r *JolokiaProxy) Name() string { + return inputName +} + func (r *JolokiaProxy) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/kafka/kafka.go b/inputs/kafka/kafka.go index 1c4f51ee..9f341a27 100644 --- a/inputs/kafka/kafka.go +++ b/inputs/kafka/kafka.go @@ -29,6 +29,13 @@ func init() { return &Kafka{} }) } +func (r *Kafka) Clone() inputs.Input { + return &Kafka{} +} + +func (r *Kafka) Name() string { + return inputName +} func (r *Kafka) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) diff --git a/inputs/kernel/kernel.go b/inputs/kernel/kernel.go index 516351e1..fcacfa03 100644 --- a/inputs/kernel/kernel.go +++ b/inputs/kernel/kernel.go @@ -42,6 +42,16 @@ func init() { } }) } +func (s *KernelStats) Clone() inputs.Input { + return &KernelStats{ + statFile: "/proc/stat", + entropyStatFile: "/proc/sys/kernel/random/entropy_avail", + } +} + +func (s *KernelStats) Name() string { + return inputName +} func (s *KernelStats) Gather(slist *types.SampleList) { data, err := s.getProcStat() diff --git a/inputs/kernel_vmstat/kernel_vmstat.go b/inputs/kernel_vmstat/kernel_vmstat.go index 2c4aebff..85a6e968 100644 --- a/inputs/kernel_vmstat/kernel_vmstat.go +++ b/inputs/kernel_vmstat/kernel_vmstat.go @@ -32,6 +32,16 @@ func init() { }) } +func (s *KernelVmstat) Clone() inputs.Input { + return &KernelVmstat{ + statFile: "/proc/vmstat", + } +} + +func (s *KernelVmstat) Name() string { + return inputName +} + func (s *KernelVmstat) Gather(slist *types.SampleList) { data, err := s.getProcVmstat() if err != nil { diff --git a/inputs/kubernetes/kubernetes.go b/inputs/kubernetes/kubernetes.go index 185a20b7..83e3c65d 100644 --- a/inputs/kubernetes/kubernetes.go +++ b/inputs/kubernetes/kubernetes.go @@ -32,6 +32,14 @@ func init() { }) } +func (k *Kubernetes) Clone() inputs.Input { + return &Kubernetes{} +} + +func (k *Kubernetes) Name() string { + return inputName +} + func (k *Kubernetes) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(k.Instances)) for i := 0; i < len(k.Instances); i++ { diff --git a/inputs/linux_sysctl_fs/linux_sysctl_fs_linux.go b/inputs/linux_sysctl_fs/linux_sysctl_fs_linux.go index 5a95e492..6e06f356 100644 --- a/inputs/linux_sysctl_fs/linux_sysctl_fs_linux.go +++ b/inputs/linux_sysctl_fs/linux_sysctl_fs_linux.go @@ -33,6 +33,16 @@ func init() { }) } +func (s *SysctlFS) Clone() inputs.Input { + return &SysctlFS{ + path: path.Join(osx.GetHostProc(), "/sys/fs"), + } +} + +func (s *SysctlFS) Name() string { + return inputName +} + func (s *SysctlFS) Gather(slist *types.SampleList) { fields := map[string]interface{}{} diff --git a/inputs/local_provider.go b/inputs/local_provider.go new file mode 100644 index 00000000..8aff94f4 --- /dev/null +++ b/inputs/local_provider.go @@ -0,0 +1,111 @@ +package inputs + +import ( + "fmt" + "path" + "strings" + "sync" + + "github.com/toolkits/pkg/file" + + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/pkg/cfg" + "flashcat.cloud/categraf/pkg/checksum" + "flashcat.cloud/categraf/pkg/choice" +) + +type LocalProvider struct { + sync.RWMutex + + configDir string + inputNames []string +} + +func newLocalProvider(c *config.ConfigType) (*LocalProvider, error) { + return &LocalProvider{ + configDir: c.ConfigDir, + }, nil +} + +func (lp *LocalProvider) Name() string { + return "local" +} + +// StartReloader 内部可以检查是否有配置的变更,如果有变更,则可以手动执行reloadFunc来重启插件 +func (lp *LocalProvider) StartReloader() {} + +func (lp *LocalProvider) StopReloader() {} + +func (lp *LocalProvider) LoadConfig() (bool, error) { + dirs, err := file.DirsUnder(lp.configDir) + if err != nil { + return false, fmt.Errorf("failed to get dirs under %s : %v", config.Config.ConfigDir, err) + } + + names := make([]string, 0, len(dirs)) + for _, dir := range dirs { + if strings.HasPrefix(dir, inputFilePrefix) { + names = append(names, dir[len(inputFilePrefix):]) + } + } + + lp.Lock() + lp.inputNames = names + lp.Unlock() + + return false, nil +} + +func (lp *LocalProvider) GetInputs() ([]string, error) { + lp.RLock() + defer lp.RUnlock() + + inputs := make([]string, 0, len(lp.inputNames)) + inputs = append(inputs, lp.inputNames...) + return inputs, nil +} + +func (lp *LocalProvider) GetInputConfig(inputKey string) ([]cfg.ConfigWithFormat, error) { + // 插件配置不在这个provider中 + lp.RLock() + if !choice.Contains(inputKey, lp.inputNames) { + lp.RUnlock() + return nil, nil + } + lp.RUnlock() + + files, err := file.FilesUnder(path.Join(lp.configDir, inputFilePrefix+inputKey)) + if err != nil { + return nil, fmt.Errorf("failed to list files under: %s : %v", lp.configDir, err) + } + + cwf := make([]cfg.ConfigWithFormat, 0, len(files)) + for _, f := range files { + if !(strings.HasSuffix(f, ".yaml") || + strings.HasSuffix(f, ".yml") || + strings.HasSuffix(f, ".json") || + strings.HasSuffix(f, ".toml")) { + continue + } + c, err := file.ReadBytes(path.Join(lp.configDir, inputFilePrefix+inputKey, f)) + if err != nil { + return nil, err + } + cwf = append(cwf, cfg.ConfigWithFormat{ + Config: string(c), + Format: cfg.GuessFormat(f), + }) + } + + return cwf, nil +} + +func (lp *LocalProvider) LoadInputConfig(configs []cfg.ConfigWithFormat, input Input) (map[checksum.Checksum]Input, error) { + err := cfg.LoadConfigs(configs, input) + if err != nil { + return nil, err + } + return map[checksum.Checksum]Input{ + checksum.Checksum(0): input, + }, nil +} diff --git a/inputs/logstash/logstash.go b/inputs/logstash/logstash.go index 89e8a82c..becb0b19 100644 --- a/inputs/logstash/logstash.go +++ b/inputs/logstash/logstash.go @@ -30,6 +30,13 @@ func init() { return &Logstash{} }) } +func (l *Logstash) Clone() inputs.Input { + return &Logstash{} +} + +func (l *Logstash) Name() string { + return inputName +} func (l *Logstash) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(l.Instances)) diff --git a/inputs/mem/mem.go b/inputs/mem/mem.go index 22976ad5..a16e27dc 100644 --- a/inputs/mem/mem.go +++ b/inputs/mem/mem.go @@ -28,6 +28,16 @@ func init() { }) } +func (s *MemStats) Clone() inputs.Input { + return &MemStats{ + ps: system.NewSystemPS(), + } +} + +func (s *MemStats) Name() string { + return inputName +} + func (s *MemStats) Gather(slist *types.SampleList) { vm, err := s.ps.VMStat() if err != nil { diff --git a/inputs/mongodb/mongodb.go b/inputs/mongodb/mongodb.go index e358d64a..dfd07813 100644 --- a/inputs/mongodb/mongodb.go +++ b/inputs/mongodb/mongodb.go @@ -25,6 +25,14 @@ func init() { }) } +func (r *MongoDB) Clone() inputs.Input { + return &MongoDB{} +} + +func (r *MongoDB) Name() string { + return inputName +} + func (r *MongoDB) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/mtail/mtail.go b/inputs/mtail/mtail.go index c2ad333b..86f90354 100644 --- a/inputs/mtail/mtail.go +++ b/inputs/mtail/mtail.go @@ -150,6 +150,14 @@ func init() { }) } +func (s *MTail) Clone() inputs.Input { + return &MTail{} +} + +func (s *MTail) Name() string { + return inputName +} + func (s *MTail) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(s.Instances)) for i := 0; i < len(s.Instances); i++ { diff --git a/inputs/mysql/mysql.go b/inputs/mysql/mysql.go index b00e11ea..163b8fed 100644 --- a/inputs/mysql/mysql.go +++ b/inputs/mysql/mysql.go @@ -70,7 +70,7 @@ func (ins *Instance) Init() error { net := "tcp" if strings.HasSuffix(ins.Address, ".sock") { net = "unix" - } + } ins.dsn = fmt.Sprintf("%s:%s@%s(%s)/?%s", ins.Username, ins.Password, net, ins.Address, ins.Parameters) conf, err := mysql.ParseDSN(ins.dsn) if err != nil { @@ -162,6 +162,14 @@ func init() { }) } +func (m *MySQL) Clone() inputs.Input { + return &MySQL{} +} + +func (m *MySQL) Name() string { + return inputName +} + func (m *MySQL) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(m.Instances)) for i := 0; i < len(m.Instances); i++ { diff --git a/inputs/net/net.go b/inputs/net/net.go index 2005daa7..4e1a658f 100644 --- a/inputs/net/net.go +++ b/inputs/net/net.go @@ -33,6 +33,16 @@ func init() { }) } +func (s *NetIOStats) Clone() inputs.Input { + return &NetIOStats{ + ps: system.NewSystemPS(), + } +} + +func (s *NetIOStats) Name() string { + return inputName +} + func (s *NetIOStats) Init() error { var err error diff --git a/inputs/net_response/net_response.go b/inputs/net_response/net_response.go index 2eb0d945..15a6cc4d 100644 --- a/inputs/net_response/net_response.go +++ b/inputs/net_response/net_response.go @@ -93,6 +93,14 @@ func init() { }) } +func (n *NetResponse) Clone() inputs.Input { + return &NetResponse{} +} + +func (n *NetResponse) Name() string { + return inputName +} + func (n *NetResponse) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(n.Instances)) for i := 0; i < len(n.Instances); i++ { diff --git a/inputs/netstat/netstat.go b/inputs/netstat/netstat.go index 7a5362e9..02c2a78a 100644 --- a/inputs/netstat/netstat.go +++ b/inputs/netstat/netstat.go @@ -42,6 +42,16 @@ func init() { }) } +func (s *NetStats) Clone() inputs.Input { + return &NetStats{ + ps: system.NewSystemPS(), + } +} + +func (s *NetStats) Name() string { + return inputName +} + func (s *NetStats) gatherSummary(slist *types.SampleList) { if runtime.GOOS != "linux" { log.Println("W! netstat_summary is only supported on linux") diff --git a/inputs/netstat_filter/netstat_filter.go b/inputs/netstat_filter/netstat_filter.go index bb04c5e6..c6262c37 100644 --- a/inputs/netstat_filter/netstat_filter.go +++ b/inputs/netstat_filter/netstat_filter.go @@ -34,6 +34,14 @@ func init() { }) } +func (l *NetStatFilter) Clone() inputs.Input { + return &NetStatFilter{} +} + +func (l *NetStatFilter) Name() string { + return inputName +} + func (l *NetStatFilter) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(l.Instances)) for i := 0; i < len(l.Instances); i++ { diff --git a/inputs/nfsclient/nfsclient.go b/inputs/nfsclient/nfsclient.go index 0d0c78b2..7c980003 100644 --- a/inputs/nfsclient/nfsclient.go +++ b/inputs/nfsclient/nfsclient.go @@ -36,6 +36,14 @@ func init() { }) } +func (s *NfsClient) Clone() inputs.Input { + return &NfsClient{} +} + +func (s *NfsClient) Name() string { + return inputName +} + func (s *NfsClient) Init() error { var nfs3Fields = []string{ "NULL", diff --git a/inputs/nginx/nginx.go b/inputs/nginx/nginx.go index e297757f..aa4eee21 100644 --- a/inputs/nginx/nginx.go +++ b/inputs/nginx/nginx.go @@ -32,6 +32,14 @@ func init() { }) } +func (ngx *Nginx) Clone() inputs.Input { + return &Nginx{} +} + +func (ngx *Nginx) Name() string { + return inputName +} + func (ngx *Nginx) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(ngx.Instances)) for i := 0; i < len(ngx.Instances); i++ { diff --git a/inputs/nginx_upstream_check/nginx_upstream_check.go b/inputs/nginx_upstream_check/nginx_upstream_check.go index e887807f..8da9613a 100644 --- a/inputs/nginx_upstream_check/nginx_upstream_check.go +++ b/inputs/nginx_upstream_check/nginx_upstream_check.go @@ -32,6 +32,14 @@ func init() { }) } +func (r *NginxUpstreamCheck) Clone() inputs.Input { + return &NginxUpstreamCheck{} +} + +func (r *NginxUpstreamCheck) Name() string { + return inputName +} + func (r *NginxUpstreamCheck) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/ntp/ntp.go b/inputs/ntp/ntp.go index dd92b75a..0aaf1af4 100644 --- a/inputs/ntp/ntp.go +++ b/inputs/ntp/ntp.go @@ -24,6 +24,14 @@ func init() { }) } +func (n *NTPStat) Clone() inputs.Input { + return &NTPStat{} +} + +func (n *NTPStat) Name() string { + return inputName +} + func (n *NTPStat) Init() error { if len(n.NTPServers) == 0 { return types.ErrInstancesEmpty diff --git a/inputs/nvidia_smi/nvidia_smi.go b/inputs/nvidia_smi/nvidia_smi.go index 21eab153..152c0ceb 100644 --- a/inputs/nvidia_smi/nvidia_smi.go +++ b/inputs/nvidia_smi/nvidia_smi.go @@ -30,6 +30,14 @@ func init() { }) } +func (s *GPUStats) Clone() inputs.Input { + return &GPUStats{} +} + +func (s *GPUStats) Name() string { + return inputName +} + func (s *GPUStats) Init() error { if s.NvidiaSmiCommand == "" { return types.ErrInstancesEmpty diff --git a/inputs/oracle/oracle_linux_amd64.go b/inputs/oracle/oracle_linux_amd64.go index 2430d137..c48636ab 100644 --- a/inputs/oracle/oracle_linux_amd64.go +++ b/inputs/oracle/oracle_linux_amd64.go @@ -59,6 +59,14 @@ func init() { }) } +func (o *Oracle) Clone() inputs.Input { + return &Oracle{} +} + +func (o *Oracle) Name() string { + return inputName +} + func (o *Oracle) Drop() { for i := 0; i < len(o.Instances); i++ { o.Instances[i].Drop() @@ -102,7 +110,7 @@ func (ins *Instance) Drop() error { if config.Config.DebugMode { log.Println("D! dropping oracle connection:", ins.Address) } - + if len(ins.Address) == 0 || ins.client == nil { if config.Config.DebugMode { log.Println("D! oracle address is empty or client is nil, so there is no need to close") diff --git a/inputs/phpfpm/phpfpm.go b/inputs/phpfpm/phpfpm.go index 80085b91..feb4ec41 100644 --- a/inputs/phpfpm/phpfpm.go +++ b/inputs/phpfpm/phpfpm.go @@ -52,6 +52,14 @@ func init() { }) } +func (pt *PhpFpm) Clone() inputs.Input { + return &PhpFpm{} +} + +func (pt *PhpFpm) Name() string { + return inputName +} + func (pt *PhpFpm) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(pt.Instances)) for i := 0; i < len(pt.Instances); i++ { @@ -317,14 +325,14 @@ func globUnixSocketPath(pathPattern string) ([]string, error) { return nil, fmt.Errorf("the file is not exist") } - //Check whether the path is an absolute path. + // Check whether the path is an absolute path. if !filepath.IsAbs(pathPattern) { return nil, fmt.Errorf("the file is not absolute: %s", pathPattern) } if !strings.ContainsAny(pathPattern, "*?[") { file, err := os.Stat(pathPattern) - //无法识别文件或者该文件不是 unix socket 类型 + // 无法识别文件或者该文件不是 unix socket 类型 if err != nil || !isUnixSocketFile(file) { return nil, fmt.Errorf("the file is not of type socket:%s", pathPattern) } @@ -339,7 +347,7 @@ func globUnixSocketPath(pathPattern string) ([]string, error) { for _, path := range paths { file, err := os.Stat(path) if err != nil || !isUnixSocketFile(file) { - //无法识别文件或者该文件不是 unix socket 类型 + // 无法识别文件或者该文件不是 unix socket 类型 continue } validPaths = append(validPaths, path) diff --git a/inputs/ping/ping.go b/inputs/ping/ping.go index d895fbfb..fc934613 100644 --- a/inputs/ping/ping.go +++ b/inputs/ping/ping.go @@ -94,6 +94,14 @@ func init() { }) } +func (p *Ping) Clone() inputs.Input { + return &Ping{} +} + +func (p *Ping) Name() string { + return inputName +} + func (p *Ping) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(p.Instances)) for i := 0; i < len(p.Instances); i++ { diff --git a/inputs/postgresql/postgresql.go b/inputs/postgresql/postgresql.go index 34b1d10a..0c8372b7 100644 --- a/inputs/postgresql/postgresql.go +++ b/inputs/postgresql/postgresql.go @@ -38,6 +38,14 @@ func init() { }) } +func (pt *Postgresql) Clone() inputs.Input { + return &Postgresql{} +} + +func (pt *Postgresql) Name() string { + return inputName +} + func (pt *Postgresql) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(pt.Instances)) for i := 0; i < len(pt.Instances); i++ { @@ -92,7 +100,7 @@ func (ins *Instance) Init() error { } ins.MaxIdle = 1 ins.MaxOpen = 1 - //ins.MaxLifetime = config.Duration(0) + // ins.MaxLifetime = config.Duration(0) if !ins.IsPgBouncer { ins.PreparedStatements = true ins.IsPgBouncer = false @@ -134,7 +142,7 @@ func (ins *Instance) Init() error { return nil } -// closes any necessary channels and connections +// closes any necessary channels and connections func (p *Instance) Drop() { // Ignore the returned error as we cannot do anything about it anyway //nolint:errcheck,revive @@ -382,7 +390,7 @@ func (ins *Instance) accRow(row scanner, slist *types.SampleList, columns []stri fields[col] = *val } } - //acc.AddFields("postgresql", fields, tags) + // acc.AddFields("postgresql", fields, tags) for key, val := range fields { slist.PushSample(inputName, key, val, tags) } diff --git a/inputs/processes/processes_notwindows.go b/inputs/processes/processes_notwindows.go index 1fe8b383..05e41179 100644 --- a/inputs/processes/processes_notwindows.go +++ b/inputs/processes/processes_notwindows.go @@ -34,6 +34,14 @@ func init() { }) } +func (p *Processes) Clone() inputs.Input { + return &Processes{} +} + +func (p *Processes) Name() string { + return inputName +} + func (p *Processes) Gather(slist *types.SampleList) { // Get an empty map of metric fields fields := getEmptyFields() diff --git a/inputs/procstat/procstat.go b/inputs/procstat/procstat.go index e314b8a1..2330631a 100644 --- a/inputs/procstat/procstat.go +++ b/inputs/procstat/procstat.go @@ -73,6 +73,14 @@ func init() { }) } +func (p *Procstat) Clone() inputs.Input { + return &Procstat{} +} + +func (p *Procstat) Name() string { + return inputName +} + var _ inputs.SampleGatherer = new(Instance) func (s *Procstat) GetInstances() []inputs.Instance { diff --git a/inputs/prometheus/prometheus.go b/inputs/prometheus/prometheus.go index 85c1c086..194178ac 100644 --- a/inputs/prometheus/prometheus.go +++ b/inputs/prometheus/prometheus.go @@ -135,6 +135,14 @@ func init() { }) } +func (p *Prometheus) Clone() inputs.Input { + return &Prometheus{} +} + +func (p *Prometheus) Name() string { + return inputName +} + func (p *Prometheus) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(p.Instances)) for i := 0; i < len(p.Instances); i++ { diff --git a/inputs/provider.go b/inputs/provider.go deleted file mode 100644 index c5660866..00000000 --- a/inputs/provider.go +++ /dev/null @@ -1,525 +0,0 @@ -package inputs - -import ( - "encoding/json" - "fmt" - "io" - "log" - "net/http" - "path" - "strings" - "sync" - "time" - - "github.com/toolkits/pkg/file" - - "flashcat.cloud/categraf/config" - "flashcat.cloud/categraf/pkg/cfg" - "flashcat.cloud/categraf/pkg/choice" - "flashcat.cloud/categraf/pkg/tls" -) - -const inputFilePrefix = "input." - -type InputOperation interface { - RegisterInput(string, []cfg.ConfigWithFormat) - DeregisterInput(string) - ReregisterInput(string, []cfg.ConfigWithFormat) -} - -// FormatInputName providerName + '.' + inputKey -func FormatInputName(provider, inputKey string) string { - return provider + "." + inputKey -} - -// ParseInputName parse name into providerName and inputName -func ParseInputName(name string) (string, string) { - data := strings.SplitN(name, ".", 2) - if len(data) == 0 { - return "", "" - } - if len(data) == 1 { - return "", data[0] - } - return data[0], data[1] -} - -// Provider InputProvider的抽象,可以实现此抽象来提供个性化的插件配置能力,如从远端定时读取配置等 -type Provider interface { - // Name 用于给input加前缀使用 - Name() string - - // StartReloader Provider初始化后会调用此方法 - // 可以根据需求实现定时加载配置的逻辑 - StartReloader() - - StopReloader() - - // LoadConfig 加载配置的方法,如果配置改变,返回true;提供给 StartReloader 以及 HUP信号的Reload使用 - LoadConfig() (bool, error) - - // GetInputs 获取当前Provider提供了哪些插件 - GetInputs() ([]string, error) - - // GetInputConfig 获取input的配置,注意处理时先判断配置是否在provider中,如果在provider并且读取错误再返回error - GetInputConfig(inputName string) ([]cfg.ConfigWithFormat, error) -} - -func NewProvider(c *config.ConfigType, op InputOperation) (Provider, error) { - log.Println("I! use input provider:", c.Global.Providers) - // 不添加provider配置 则默认使用local - // 兼容老版本 - if len(c.Global.Providers) == 0 { - c.Global.Providers = append(c.Global.Providers, "local") - } - providers := make([]Provider, 0, len(c.Global.Providers)) - for _, p := range c.Global.Providers { - name := strings.ToLower(p) - switch name { - case "http": - provider, err := newHTTPProvider(c, op) - if err != nil { - return nil, err - } - providers = append(providers, provider) - default: - provider, err := newLocalProvider(c) - if err != nil { - return nil, err - } - providers = append(providers, provider) - } - } - - return &ProviderManager{ - providers: providers, - }, nil -} - -// ProviderManager combines multiple Provider's config together -type ProviderManager struct { - providers []Provider -} - -func (pm *ProviderManager) Name() string { - return "pm" -} - -func (pm *ProviderManager) StartReloader() { - for _, p := range pm.providers { - p.StartReloader() - } -} - -func (pm *ProviderManager) StopReloader() { - for _, p := range pm.providers { - p.StopReloader() - } -} - -func (pm *ProviderManager) LoadConfig() (bool, error) { - for _, p := range pm.providers { - _, err := p.LoadConfig() - if err != nil { - log.Printf("E! provider manager, LoadConfig of %s err: %s", p.Name(), err) - } - } - return false, nil -} - -// GetInputs 返回带有provider前缀的inputName -func (pm *ProviderManager) GetInputs() ([]string, error) { - inputs := make([]string, 0, 40) - for _, p := range pm.providers { - pInputs, err := p.GetInputs() - if err != nil { - log.Printf("E! provider manager, GetInputs of %s error: %v, skip", p.Name(), err) - continue - } - for _, inputKey := range pInputs { - inputs = append(inputs, FormatInputName(p.Name(), inputKey)) - } - } - - return inputs, nil -} - -// GetInputConfig 寻找匹配的Provider,从中查找input -func (pm *ProviderManager) GetInputConfig(inputName string) ([]cfg.ConfigWithFormat, error) { - cwf := make([]cfg.ConfigWithFormat, 0, len(pm.providers)) - providerName, inputKey := ParseInputName(inputName) - for _, p := range pm.providers { - // 没有匹配,说明input不是该provider提供的 - if providerName != p.Name() { - continue - } - - pcwf, err := p.GetInputConfig(inputKey) - if err != nil { - log.Printf("E! provider manager, failed to get config of %s from %s, error: %s", inputName, p.Name(), err) - continue - } - - cwf = append(cwf, pcwf...) - } - - if len(cwf) == 0 { - return nil, fmt.Errorf("provider manager, failed to get config of %s", inputName) - } - - return cwf, nil -} - -type LocalProvider struct { - sync.RWMutex - - configDir string - inputNames []string -} - -func newLocalProvider(c *config.ConfigType) (*LocalProvider, error) { - return &LocalProvider{ - configDir: c.ConfigDir, - }, nil -} - -func (lp *LocalProvider) Name() string { - return "local" -} - -// StartReloader 内部可以检查是否有配置的变更,如果有变更,则可以手动执行reloadFunc来重启插件 -func (lp *LocalProvider) StartReloader() {} - -func (lp *LocalProvider) StopReloader() {} - -func (lp *LocalProvider) LoadConfig() (bool, error) { - dirs, err := file.DirsUnder(lp.configDir) - if err != nil { - return false, fmt.Errorf("failed to get dirs under %s : %v", config.Config.ConfigDir, err) - } - - names := make([]string, 0, len(dirs)) - for _, dir := range dirs { - if strings.HasPrefix(dir, inputFilePrefix) { - names = append(names, dir[len(inputFilePrefix):]) - } - } - - lp.Lock() - lp.inputNames = names - lp.Unlock() - - return false, nil -} - -func (lp *LocalProvider) GetInputs() ([]string, error) { - lp.RLock() - defer lp.RUnlock() - - inputs := make([]string, 0, len(lp.inputNames)) - inputs = append(inputs, lp.inputNames...) - return inputs, nil -} - -func (lp *LocalProvider) GetInputConfig(inputKey string) ([]cfg.ConfigWithFormat, error) { - // 插件配置不在这个provider中 - lp.RLock() - if !choice.Contains(inputKey, lp.inputNames) { - lp.RUnlock() - return nil, nil - } - lp.RUnlock() - - files, err := file.FilesUnder(path.Join(lp.configDir, inputFilePrefix+inputKey)) - if err != nil { - return nil, fmt.Errorf("failed to list files under: %s : %v", lp.configDir, err) - } - - cwf := make([]cfg.ConfigWithFormat, 0, len(files)) - for _, f := range files { - if !(strings.HasSuffix(f, ".yaml") || - strings.HasSuffix(f, ".yml") || - strings.HasSuffix(f, ".json") || - strings.HasSuffix(f, ".toml")) { - continue - } - c, err := file.ReadBytes(path.Join(lp.configDir, inputFilePrefix+inputKey, f)) - if err != nil { - return nil, err - } - cwf = append(cwf, cfg.ConfigWithFormat{ - Config: string(c), - Format: cfg.GuessFormat(f), - }) - } - - return cwf, nil -} - -// HTTPProvider provider a mechanism to get config from remote http server at a fixed interval -// If input config is changed, the provider will reload the input without reload whole agent -type HTTPProvider struct { - sync.RWMutex - - RemoteUrl string - Headers []string - AuthUsername string - AuthPassword string - - Timeout int - ReloadInterval int - - tls.ClientConfig - client *http.Client - stopCh chan struct{} - op InputOperation - - configMap map[string]cfg.ConfigWithFormat - version string - - compareNewsCache []string - compareUpdatesCache []string - compareDeletesCache []string -} - -type httpProviderResponse struct { - // version is signature/md5 of current Config, server side should deal with the Version calculate - Version string `json:"version"` - - // ConfigMap (InputName -> Config), if version is identical, server side can set Config to nil - Configs map[string]cfg.ConfigWithFormat `json:"configs"` -} - -func (hrp *HTTPProvider) Name() string { - return "http" -} - -func newHTTPProvider(c *config.ConfigType, op InputOperation) (*HTTPProvider, error) { - if c.HTTPProviderConfig == nil { - return nil, fmt.Errorf("no http provider config found") - } - - provider := &HTTPProvider{ - RemoteUrl: c.HTTPProviderConfig.RemoteUrl, - Headers: c.HTTPProviderConfig.Headers, - AuthUsername: c.HTTPProviderConfig.AuthUsername, - AuthPassword: c.HTTPProviderConfig.AuthPassword, - ClientConfig: c.HTTPProviderConfig.ClientConfig, - Timeout: c.HTTPProviderConfig.Timeout, - ReloadInterval: c.HTTPProviderConfig.ReloadInterval, - stopCh: make(chan struct{}, 1), - op: op, - } - - if err := provider.check(); err != nil { - return nil, err - } - - return provider, nil -} - -func (hrp *HTTPProvider) check() error { - if hrp.Timeout <= 0 { - hrp.Timeout = 5 - } - - if hrp.ReloadInterval <= 0 { - hrp.ReloadInterval = 120 - } - - if !strings.HasPrefix(hrp.RemoteUrl, "http") { - return fmt.Errorf("http provider: bad remote url config: %s", hrp.RemoteUrl) - } - - tlsc, err := hrp.TLSConfig() - if err != nil { - return err - } - - hrp.client = &http.Client{ - Timeout: time.Duration(hrp.Timeout) * time.Second, - Transport: &http.Transport{ - TLSClientConfig: tlsc, - }, - } - - return nil -} - -func (hrp *HTTPProvider) doReq() (*httpProviderResponse, error) { - req, err := http.NewRequest("GET", hrp.RemoteUrl, nil) - if err != nil { - log.Println("E! http provider: build reload config request error:", err) - return nil, err - } - - for i := 0; i < len(hrp.Headers); i += 2 { - req.Header.Add(hrp.Headers[i], hrp.Headers[i+1]) - if hrp.Headers[i] == "Host" { - req.Host = hrp.Headers[i+1] - } - } - - if hrp.AuthUsername != "" || hrp.AuthPassword != "" { - req.SetBasicAuth(hrp.AuthUsername, hrp.AuthPassword) - } - - // build query parameters - q := req.URL.Query() - for k, v := range config.Config.Global.Labels { - q.Add(k, v) - } - q.Add("timestamp", fmt.Sprint(time.Now().Unix())) - q.Add("version", hrp.version) - q.Add("agent_hostname", config.Config.GetHostname()) - req.URL.RawQuery = q.Encode() - - resp, err := hrp.client.Do(req) - if err != nil { - log.Println("E! http provider: request reload config error:", err) - return nil, err - } - defer resp.Body.Close() - respData, err := io.ReadAll(resp.Body) - if err != nil { - log.Println("E! http provider: request reload config error:", err) - return nil, err - } - - confResp := &httpProviderResponse{} - err = json.Unmarshal(respData, confResp) - if err != nil { - log.Println("E! http provider: unmarshal result error:", err) - return nil, err - } - return confResp, nil -} - -func (hrp *HTTPProvider) LoadConfig() (bool, error) { - log.Println("I! http provider: start reload config from remote:", hrp.RemoteUrl) - - confResp, err := hrp.doReq() - if err != nil { - log.Printf("W! http provider: request remote err: [%+v]", err) - return false, err - } - - // if config version is identical, means config is not changed - if confResp.Version == hrp.version { - return false, nil - } - - // if config is nil, may some error occurs in server side, ignore this instead of deleting all configs - if confResp.Configs == nil { - log.Println("W! http provider: received config is empty") - return false, nil - } - - // delete empty entries - for k, v := range confResp.Configs { - if len(v.Config) == 0 { - delete(confResp.Configs, k) - } - } - - hrp.compareNewsCache, hrp.compareUpdatesCache, hrp.compareDeletesCache = compareConfig(hrp.configMap, confResp.Configs) - changed := len(hrp.compareNewsCache)+len(hrp.compareUpdatesCache)+len(hrp.compareDeletesCache) > 0 - if changed { - hrp.Lock() - defer hrp.Unlock() - hrp.configMap = confResp.Configs - hrp.version = confResp.Version - } - - return changed, nil -} - -func (hrp *HTTPProvider) StartReloader() { - go func() { - for { - select { - case <-time.After(time.Duration(hrp.ReloadInterval) * time.Second): - changed, err := hrp.LoadConfig() - if err != nil { - continue - } - if changed { - if len(hrp.compareNewsCache) > 0 { - log.Println("I! http provider: new inputs:", hrp.compareNewsCache) - for _, newInput := range hrp.compareNewsCache { - hrp.op.RegisterInput(FormatInputName(hrp.Name(), newInput), []cfg.ConfigWithFormat{hrp.configMap[newInput]}) - } - } - - if len(hrp.compareUpdatesCache) > 0 { - log.Println("I! http provider: updated inputs:", hrp.compareUpdatesCache) - for _, updatedInput := range hrp.compareUpdatesCache { - hrp.op.ReregisterInput(FormatInputName(hrp.Name(), updatedInput), []cfg.ConfigWithFormat{hrp.configMap[updatedInput]}) - } - } - - if len(hrp.compareDeletesCache) > 0 { - log.Println("I! http provider: deleted inputs:", hrp.compareDeletesCache) - for _, deletedInput := range hrp.compareDeletesCache { - hrp.op.DeregisterInput(FormatInputName(hrp.Name(), deletedInput)) - } - } - } - case <-hrp.stopCh: - return - } - } - }() -} - -func (hrp *HTTPProvider) StopReloader() { - hrp.stopCh <- struct{}{} -} - -func (hrp *HTTPProvider) GetInputs() ([]string, error) { - hrp.RLock() - defer hrp.RUnlock() - - inputs := make([]string, 0, len(hrp.configMap)) - for k := range hrp.configMap { - inputs = append(inputs, k) - } - - return inputs, nil -} - -func (hrp *HTTPProvider) GetInputConfig(inputKey string) ([]cfg.ConfigWithFormat, error) { - hrp.RLock() - defer hrp.RUnlock() - - if conf, has := hrp.configMap[inputKey]; has { - return []cfg.ConfigWithFormat{conf}, nil - } - - return nil, nil -} - -// compareConfig 比较新旧两个配置的差异 -func compareConfig(cold, cnew map[string]cfg.ConfigWithFormat) (news, updates, deletes []string) { - news = make([]string, 0, len(cnew)) - updates = make([]string, 0, len(cnew)) - deletes = make([]string, 0, len(cnew)) - - for kold, vold := range cold { - if vnew, has := cnew[kold]; has { - if vold.Config != vnew.Config || vold.Format != vnew.Format { - updates = append(updates, kold) - } - } else { - deletes = append(deletes, kold) - } - } - - for knew := range cnew { - if _, has := cold[knew]; !has { - news = append(news, knew) - } - } - - return -} diff --git a/inputs/provider_manager.go b/inputs/provider_manager.go new file mode 100644 index 00000000..a7afc0e8 --- /dev/null +++ b/inputs/provider_manager.go @@ -0,0 +1,183 @@ +package inputs + +import ( + "fmt" + "log" + "strings" + + "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/pkg/cfg" + "flashcat.cloud/categraf/pkg/checksum" +) + +const inputFilePrefix = "input." + +type InputOperation interface { + RegisterInput(string, []cfg.ConfigWithFormat) + DeregisterInput(string, checksum.Checksum) +} + +// FormatInputName providerName + '.' + inputKey +func FormatInputName(provider, inputKey string) string { + return provider + "." + inputKey +} + +// ParseInputName parse name into providerName and inputName +func ParseInputName(name string) (string, string) { + data := strings.SplitN(name, ".", 2) + if len(data) == 0 { + return "", "" + } + if len(data) == 1 { + return "", data[0] + } + return data[0], data[1] +} + +// Provider InputProvider的抽象,可以实现此抽象来提供个性化的插件配置能力,如从远端定时读取配置等 +type Provider interface { + // Name 用于给input加前缀使用 + Name() string + + // StartReloader Provider初始化后会调用此方法 + // 可以根据需求实现定时加载配置的逻辑 + StartReloader() + + StopReloader() + + // LoadConfig 加载配置的方法,如果配置改变,返回true;提供给 StartReloader 以及 HUP信号的Reload使用 + LoadConfig() (bool, error) + + // GetInputs 获取当前Provider提供了哪些插件 + GetInputs() ([]string, error) + + // GetInputConfig 获取input的配置,注意处理时先判断配置是否在provider中,如果在provider并且读取错误再返回error + GetInputConfig(inputName string) ([]cfg.ConfigWithFormat, error) + + // 加载 input 的配置 + LoadInputConfig([]cfg.ConfigWithFormat, Input) (map[checksum.Checksum]Input, error) +} + +func NewProvider(c *config.ConfigType, op InputOperation) (Provider, error) { + log.Println("I! use input provider:", c.Global.Providers) + // 不添加provider配置 则默认使用local + // 兼容老版本 + if len(c.Global.Providers) == 0 { + c.Global.Providers = append(c.Global.Providers, "local") + } + providers := make([]Provider, 0, len(c.Global.Providers)) + for _, p := range c.Global.Providers { + name := strings.ToLower(p) + switch name { + case "http": + provider, err := newHTTPProvider(c, op) + if err != nil { + return nil, err + } + providers = append(providers, provider) + default: + provider, err := newLocalProvider(c) + if err != nil { + return nil, err + } + providers = append(providers, provider) + } + } + + return &ProviderManager{ + providers: providers, + }, nil +} + +// ProviderManager combines multiple Provider's config together +type ProviderManager struct { + providers []Provider +} + +func (pm *ProviderManager) Name() string { + return "pm" +} + +func (pm *ProviderManager) StartReloader() { + for _, p := range pm.providers { + p.StartReloader() + } +} + +func (pm *ProviderManager) StopReloader() { + for _, p := range pm.providers { + p.StopReloader() + } +} + +func (pm *ProviderManager) LoadConfig() (bool, error) { + changed := false + for _, p := range pm.providers { + ok, err := p.LoadConfig() + if err != nil { + log.Printf("E! provider manager, LoadConfig of %s err: %s", p.Name(), err) + } else { + changed = changed || ok + } + } + return changed, nil +} + +// GetInputs 返回带有provider前缀的inputName +func (pm *ProviderManager) GetInputs() ([]string, error) { + inputs := make([]string, 0, 40) + for _, p := range pm.providers { + pInputs, err := p.GetInputs() + if err != nil { + log.Printf("E! provider manager, GetInputs of %s error: %v, skip", p.Name(), err) + continue + } + for _, inputKey := range pInputs { + inputs = append(inputs, FormatInputName(p.Name(), inputKey)) + } + } + + return inputs, nil +} + +// GetInputConfig 寻找匹配的Provider,从中查找input +func (pm *ProviderManager) GetInputConfig(inputName string) ([]cfg.ConfigWithFormat, error) { + cwf := make([]cfg.ConfigWithFormat, 0, len(pm.providers)) + providerName, inputKey := ParseInputName(inputName) + for _, p := range pm.providers { + // 没有匹配,说明input不是该provider提供的 + if providerName != p.Name() { + continue + } + + pcwf, err := p.GetInputConfig(inputKey) + if err != nil { + log.Printf("E! provider manager, failed to get config of %s from %s, error: %s", inputName, p.Name(), err) + continue + } + + cwf = append(cwf, pcwf...) + } + + if len(cwf) == 0 { + return nil, fmt.Errorf("provider manager, failed to get config of %s", inputName) + } + + return cwf, nil +} + +func (pm *ProviderManager) LoadInputConfig(configs []cfg.ConfigWithFormat, input Input) (map[checksum.Checksum]Input, error) { + // 从配置中获取provider + inputs := make(map[checksum.Checksum]Input) + for _, p := range pm.providers { + is, err := p.LoadInputConfig(configs, input) + if err != nil { + return nil, err + } + for s, i := range is { + inputs[s] = i + } + } + + return inputs, nil +} diff --git a/inputs/rabbitmq/rabbitmq.go b/inputs/rabbitmq/rabbitmq.go index 5f736ca6..0b0377c9 100644 --- a/inputs/rabbitmq/rabbitmq.go +++ b/inputs/rabbitmq/rabbitmq.go @@ -29,6 +29,14 @@ func init() { }) } +func (r *RabbitMQ) Clone() inputs.Input { + return &RabbitMQ{} +} + +func (r *RabbitMQ) Name() string { + return inputName +} + func (r *RabbitMQ) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/redis/redis.go b/inputs/redis/redis.go index 23b6180d..9da936c2 100644 --- a/inputs/redis/redis.go +++ b/inputs/redis/redis.go @@ -75,6 +75,14 @@ func init() { }) } +func (r *Redis) Clone() inputs.Input { + return &Redis{} +} + +func (r *Redis) Name() string { + return inputName +} + func (r *Redis) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/redis_sentinel/redis_sentinel.go b/inputs/redis_sentinel/redis_sentinel.go index b2775492..63e15f20 100644 --- a/inputs/redis_sentinel/redis_sentinel.go +++ b/inputs/redis_sentinel/redis_sentinel.go @@ -35,6 +35,14 @@ func init() { }) } +func (r *RedisSentinel) Clone() inputs.Input { + return &RedisSentinel{} +} + +func (r *RedisSentinel) Name() string { + return inputName +} + func (r *RedisSentinel) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(r.Instances)) for i := 0; i < len(r.Instances); i++ { diff --git a/inputs/rocketmq_offset/rocketmq.go b/inputs/rocketmq_offset/rocketmq.go index ae660b93..b8d18056 100644 --- a/inputs/rocketmq_offset/rocketmq.go +++ b/inputs/rocketmq_offset/rocketmq.go @@ -30,6 +30,14 @@ func init() { }) } +func (pt *RocketMQ) Clone() inputs.Input { + return &RocketMQ{} +} + +func (pt *RocketMQ) Name() string { + return inputName +} + func (pt *RocketMQ) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(pt.Instances)) for i := 0; i < len(pt.Instances); i++ { @@ -52,34 +60,34 @@ func (ins *Instance) Init() error { } func (ins *Instance) Gather(slist *types.SampleList) { - //获取rocketmq集群中的topicNameList + // 获取rocketmq集群中的topicNameList topicNameArray := GetTopicNameList(ins.RocketMQConsoleIPAndPort) if topicNameArray == nil { log.Println("E! fail to get topic,please check config!") return } - //按照topic聚合msgDiff + // 按照topic聚合msgDiff var diff_Topic_Map = make(map[string]*MsgDiffTopic) - //按照consumerGroup聚合msgDiff - //var diff_ConsumerGroup_Slice []model.MsgDiff_ConsumerGroup = []model.MsgDiff_ConsumerGroup{} + // 按照consumerGroup聚合msgDiff + // var diff_ConsumerGroup_Slice []model.MsgDiff_ConsumerGroup = []model.MsgDiff_ConsumerGroup{} var diff_ConsumerGroup_Map = make(map[string]*MsgDiffConsumerGroup) - //按照topic, consumeGroup聚合msgDiff - //var diff_Topic_ConsumerGroup_Slice []model.MsgDiff_Topics_ConsumerGroup = []model.MsgDiff_Topics_ConsumerGroup{} + // 按照topic, consumeGroup聚合msgDiff + // var diff_Topic_ConsumerGroup_Slice []model.MsgDiff_Topics_ConsumerGroup = []model.MsgDiff_Topics_ConsumerGroup{} var diff_Topic_ConsumerGroup_Map = make(map[string]*MsgDiffTopicConsumerGroup) - //按照broker聚合msgDiff - //var diff_Broker_Slice []model.MsgDiff_Broker = []model.MsgDiff_Broker{} + // 按照broker聚合msgDiff + // var diff_Broker_Slice []model.MsgDiff_Broker = []model.MsgDiff_Broker{} var diff_Broker_Map = make(map[string]*MsgDiffBroker) - //按照clientInfo聚合msgDiff - //var diff_Clientinfo_Slice []model.MsgDiff_ClientInfo = []model.MsgDiff_ClientInfo{} + // 按照clientInfo聚合msgDiff + // var diff_Clientinfo_Slice []model.MsgDiff_ClientInfo = []model.MsgDiff_ClientInfo{} var diff_Clientinfo_Map = make(map[string]*MsgDiffClientInfo) - //按照queue聚合msgDiff - //var MsgDiff_Queue_Slice []model.MsgDiff_Queue = []model.MsgDiff_Queue{} + // 按照queue聚合msgDiff + // var MsgDiff_Queue_Slice []model.MsgDiff_Queue = []model.MsgDiff_Queue{} var diff_Queue_Map = make(map[string]*MsgDiffQueue) for i := range topicNameArray { @@ -106,7 +114,7 @@ func (ins *Instance) Gather(slist *types.SampleList) { for cgName, consumerInfo := range topicConsumerGroups { topic := consumerInfo.Topic - //获取当前consumer信息及对应的rocketmq-queue的信息 + // 获取当前consumer信息及对应的rocketmq-queue的信息 queueStatInfoList := consumerInfo.QueueStatInfoList for i := range queueStatInfoList { @@ -143,9 +151,9 @@ func (ins *Instance) Gather(slist *types.SampleList) { } slist.PushSample(inputName, "diffDetail", diff, tags) - //按照topic进行msgDiff聚合 + // 按照topic进行msgDiff聚合 if _, ok := diff_Topic_Map[topic]; ok { - //如果已经存在,计算diff + // 如果已经存在,计算diff diff_Topic_Map[topic].Diff = diff_Topic_Map[topic].Diff + diff } else { var diffTopic *MsgDiffTopic = new(MsgDiffTopic) @@ -156,7 +164,7 @@ func (ins *Instance) Gather(slist *types.SampleList) { diff_Topic_Map[topic] = diffTopic } - //按照consumerGroup进行msgDiff聚合 + // 按照consumerGroup进行msgDiff聚合 if _, ok := diff_ConsumerGroup_Map[cgName]; ok { diff_ConsumerGroup_Map[cgName].Diff = diff_ConsumerGroup_Map[cgName].Diff + diff } else { @@ -168,7 +176,7 @@ func (ins *Instance) Gather(slist *types.SampleList) { diff_ConsumerGroup_Map[cgName] = diffConsumerGroup } - //按照topic, consumerGroup进行msgDiff聚合 + // 按照topic, consumerGroup进行msgDiff聚合 topic_cgName := topic + ":" + cgName if _, ok := diff_Topic_ConsumerGroup_Map[topic_cgName]; ok { diff_Topic_ConsumerGroup_Map[topic_cgName].Diff = diff_Topic_ConsumerGroup_Map[topic_cgName].Diff + diff @@ -184,7 +192,7 @@ func (ins *Instance) Gather(slist *types.SampleList) { } - //按照broker进行msgDiff聚合 + // 按照broker进行msgDiff聚合 if _, ok := diff_Broker_Map[brokerName]; ok { diff_Broker_Map[brokerName].Diff = diff_Broker_Map[brokerName].Diff + diff } else { @@ -196,7 +204,7 @@ func (ins *Instance) Gather(slist *types.SampleList) { diff_Broker_Map[brokerName] = diff_Broker } - //按照queueId进行msgDiff聚合 + // 按照queueId进行msgDiff聚合 queuestr := brokerName + ":" + string(queueId) if _, ok := diff_Queue_Map[string(queueId)]; ok { diff_Queue_Map[queuestr].Diff = diff_Queue_Map[queuestr].Diff + diff @@ -210,7 +218,7 @@ func (ins *Instance) Gather(slist *types.SampleList) { diff_Queue_Map[queuestr] = diff_Queue } - //按照clientInfo进行msgDiff聚合 + // 按照clientInfo进行msgDiff聚合 if _, ok := diff_Clientinfo_Map[clientInfo]; ok { diff_Clientinfo_Map[clientInfo].Diff = diff_Clientinfo_Map[clientInfo].Diff + diff diff --git a/inputs/self_metrics/metrics.go b/inputs/self_metrics/metrics.go index ec5fbadd..20083bd9 100644 --- a/inputs/self_metrics/metrics.go +++ b/inputs/self_metrics/metrics.go @@ -28,6 +28,14 @@ func init() { }) } +func (pt *Categraf) Clone() inputs.Input { + return &Categraf{} +} + +func (pt *Categraf) Name() string { + return inputName +} + func (ins *Categraf) Gather(slist *types.SampleList) { mfs, err := prometheus.DefaultGatherer.Gather() if err != nil { diff --git a/inputs/set.go b/inputs/set.go new file mode 100644 index 00000000..44fffacd --- /dev/null +++ b/inputs/set.go @@ -0,0 +1,73 @@ +package inputs + +import ( + "flashcat.cloud/categraf/pkg/cfg" + "flashcat.cloud/categraf/pkg/checksum" +) + +type ( + Empty struct{} + Set map[checksum.Checksum]Empty +) + +func NewSet() Set { + return make(Set) +} + +func (s Set) Add(elem checksum.Checksum) { + s[elem] = Empty{} +} + +func (s Set) Has(elem checksum.Checksum) bool { + _, ok := s[elem] + return ok +} + +func (s Set) Load(elems map[checksum.Checksum]cfg.ConfigWithFormat) Set { + for k, _ := range elems { + s.Add(k) + } + return s +} + +func (s Set) Clear() Set { + for k := range s { + delete(s, k) + } + return s +} + +func (src Set) Diff(dst Set) (add, del Set) { + record := map[checksum.Checksum]int{} + for elem := range src { + record[elem]++ + } + for elem := range dst { + record[elem]++ + } + + intersection := NewSet() + for k, v := range record { + if v == 2 { + intersection.Add(k) + } + } + + // del := dst - interaction + del = NewSet() + for elem := range dst { + if intersection.Has(elem) { + continue + } + del.Add(elem) + } + // add := src - interaction + add = NewSet() + for elem := range src { + if intersection.Has(elem) { + continue + } + add.Add(elem) + } + return add, del +} diff --git a/inputs/snmp/snmp.go b/inputs/snmp/snmp.go index 8f66a4d0..df354389 100644 --- a/inputs/snmp/snmp.go +++ b/inputs/snmp/snmp.go @@ -387,6 +387,14 @@ func init() { }) } +func (s *Snmp) Clone() inputs.Input { + return &Snmp{} +} + +func (s *Snmp) Name() string { + return inputName +} + func (s *Snmp) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(s.Instances)) for i := 0; i < len(s.Instances); i++ { diff --git a/inputs/sockstat/sockstat.go b/inputs/sockstat/sockstat.go index cbad45cb..9ce34440 100644 --- a/inputs/sockstat/sockstat.go +++ b/inputs/sockstat/sockstat.go @@ -21,6 +21,14 @@ func init() { }) } +func (s *SockStat) Clone() inputs.Input { + return &SockStat{} +} + +func (s *SockStat) Name() string { + return inputName +} + type SockStat struct { config.PluginConfig diff --git a/inputs/sqlserver/sqlserver.go b/inputs/sqlserver/sqlserver.go index d74063e3..a9506f38 100644 --- a/inputs/sqlserver/sqlserver.go +++ b/inputs/sqlserver/sqlserver.go @@ -79,6 +79,15 @@ func init() { return &SQLServer{} }) } + +func (pt *SQLServer) Clone() inputs.Input { + return &SQLServer{} +} + +func (pt *SQLServer) Name() string { + return inputName +} + func (pt *SQLServer) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(pt.Instances)) for i := 0; i < len(pt.Instances); i++ { @@ -136,7 +145,7 @@ func (s *Instance) initQueries() error { // To prevent query definition conflicts // Constant definitions for type "SQLServer" start with sqlServer - if s.DatabaseType == typeSQLServer { //These are still V2 queries and have not been refactored yet. + if s.DatabaseType == typeSQLServer { // These are still V2 queries and have not been refactored yet. queries["SQLServerPerformanceCounters"] = Query{ScriptName: "SQLServerPerformanceCounters", Script: sqlServerPerformanceCounters, ResultByRow: false} queries["SQLServerWaitStatsCategorized"] = Query{ScriptName: "SQLServerWaitStatsCategorized", Script: sqlServerWaitStatsCategorized, ResultByRow: false} queries["SQLServerDatabaseIO"] = Query{ScriptName: "SQLServerDatabaseIO", Script: sqlServerDatabaseIO, ResultByRow: false} diff --git a/inputs/switch_legacy/switch_legacy.go b/inputs/switch_legacy/switch_legacy.go index 90503842..dc821ff3 100644 --- a/inputs/switch_legacy/switch_legacy.go +++ b/inputs/switch_legacy/switch_legacy.go @@ -33,6 +33,14 @@ func init() { }) } +func (s *Switch) Clone() inputs.Input { + return &Switch{} +} + +func (s *Switch) Name() string { + return inputName +} + func (s *Switch) MappingIP(ip string) string { val, has := s.Mappings[ip] if has { diff --git a/inputs/system/system.go b/inputs/system/system.go index 89389f58..f753bbdc 100644 --- a/inputs/system/system.go +++ b/inputs/system/system.go @@ -26,6 +26,14 @@ func init() { }) } +func (s *SystemStats) Clone() inputs.Input { + return &SystemStats{} +} + +func (s *SystemStats) Name() string { + return inputName +} + func (s *SystemStats) Gather(slist *types.SampleList) { loadavg, err := load.Avg() if err != nil && !strings.Contains(err.Error(), "not implemented") { diff --git a/inputs/systemd/systemd.go b/inputs/systemd/systemd.go index 003b0f33..198b90be 100644 --- a/inputs/systemd/systemd.go +++ b/inputs/systemd/systemd.go @@ -51,3 +51,11 @@ func init() { return &Systemd{} }) } + +func (s *Systemd) Clone() inputs.Input { + return &Systemd{} +} + +func (s *Systemd) Name() string { + return inputName +} diff --git a/inputs/tomcat/tomcat.go b/inputs/tomcat/tomcat.go index b37d7b3e..81a52dc2 100644 --- a/inputs/tomcat/tomcat.go +++ b/inputs/tomcat/tomcat.go @@ -139,6 +139,14 @@ func init() { }) } +func (t *Tomcat) Clone() inputs.Input { + return &Tomcat{} +} + +func (t *Tomcat) Name() string { + return inputName +} + func (t *Tomcat) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(t.Instances)) for i := 0; i < len(t.Instances); i++ { diff --git a/inputs/tpl/tpl.go b/inputs/tpl/tpl.go index a3a86f54..9c8dbaf7 100644 --- a/inputs/tpl/tpl.go +++ b/inputs/tpl/tpl.go @@ -18,6 +18,14 @@ func init() { }) } +func (pt *PluginTpl) Clone() inputs.Input { + return &PluginTpl{} +} + +func (pt *PluginTpl) Name() string { + return inputName +} + func (pt *PluginTpl) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(pt.Instances)) for i := 0; i < len(pt.Instances); i++ { diff --git a/inputs/vsphere/vsphere.go b/inputs/vsphere/vsphere.go index 2b08c997..394a4474 100644 --- a/inputs/vsphere/vsphere.go +++ b/inputs/vsphere/vsphere.go @@ -28,6 +28,14 @@ func init() { }) } +func (vs *VSphere) Clone() inputs.Input { + return &VSphere{} +} + +func (vs *VSphere) Name() string { + return inputName +} + func (pt *VSphere) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(pt.Instances)) for i := 0; i < len(pt.Instances); i++ { @@ -205,7 +213,7 @@ func (ins *Instance) Gather(slist *types.SampleList) { err = nil } if err != nil { - //acc.AddError(err) + // acc.AddError(err) log.Printf("E! fail to gather\n", err) } diff --git a/inputs/xskyapi/xskyapi.go b/inputs/xskyapi/xskyapi.go index 2b5d8df0..692cc5e1 100644 --- a/inputs/xskyapi/xskyapi.go +++ b/inputs/xskyapi/xskyapi.go @@ -37,6 +37,14 @@ func init() { }) } +func (pt *XskyApi) Clone() inputs.Input { + return &XskyApi{} +} + +func (pt *XskyApi) Name() string { + return inputName +} + func (pt *XskyApi) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(pt.Instances)) for i := 0; i < len(pt.Instances); i++ { diff --git a/inputs/zookeeper/zookeeper.go b/inputs/zookeeper/zookeeper.go index 6f15db8f..51e81165 100644 --- a/inputs/zookeeper/zookeeper.go +++ b/inputs/zookeeper/zookeeper.go @@ -71,6 +71,14 @@ func init() { }) } +func (z *Zookeeper) Clone() inputs.Input { + return &Zookeeper{} +} + +func (z *Zookeeper) Name() string { + return inputName +} + func (z *Zookeeper) GetInstances() []inputs.Instance { ret := make([]inputs.Instance, len(z.Instances)) for i := 0; i < len(z.Instances); i++ { diff --git a/pkg/cfg/cfg.go b/pkg/cfg/cfg.go index 628b899d..f053acb9 100644 --- a/pkg/cfg/cfg.go +++ b/pkg/cfg/cfg.go @@ -6,6 +6,7 @@ import ( "path" "strings" + "flashcat.cloud/categraf/pkg/checksum" "github.com/koding/multiconfig" "github.com/toolkits/pkg/file" ) @@ -19,8 +20,17 @@ const ( ) type ConfigWithFormat struct { - Config string `json:"config"` - Format ConfigFormat `json:"format"` + Config string `json:"config"` + Format ConfigFormat `json:"format"` + checkSum checksum.Checksum `json:"-"` +} + +func (cwf *ConfigWithFormat) CheckSum() checksum.Checksum { + return cwf.checkSum +} + +func (cwf *ConfigWithFormat) SetCheckSum(checkSum checksum.Checksum) { + cwf.checkSum = checkSum } func GuessFormat(fpath string) ConfigFormat { @@ -111,3 +121,26 @@ func LoadConfigs(configs []ConfigWithFormat, configPtr interface{}) error { } return m.Load(configPtr) } + +func LoadSingleConfig(c ConfigWithFormat, configPtr interface{}) error { + loaders := []multiconfig.Loader{ + &multiconfig.TagLoader{}, + &multiconfig.EnvironmentLoader{}, + } + + switch c.Format { + case TomlFormat: + loaders = append(loaders, &multiconfig.TOMLLoader{Reader: bytes.NewReader([]byte(c.Config))}) + case YamlFormat: + loaders = append(loaders, &multiconfig.YAMLLoader{Reader: bytes.NewReader([]byte(c.Config))}) + case JsonFormat: + loaders = append(loaders, &multiconfig.JSONLoader{Reader: bytes.NewReader([]byte(c.Config))}) + + } + + m := multiconfig.DefaultLoader{ + Loader: multiconfig.MultiLoader(loaders...), + Validator: multiconfig.MultiValidator(&multiconfig.RequiredValidator{}), + } + return m.Load(configPtr) +} diff --git a/pkg/checksum/checksum.go b/pkg/checksum/checksum.go new file mode 100644 index 00000000..dc0c46e2 --- /dev/null +++ b/pkg/checksum/checksum.go @@ -0,0 +1,46 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checksum + +import ( + "errors" + "hash/fnv" + + hashutil "flashcat.cloud/categraf/pkg/hash" +) + +// Data to be stored as checkpoint +type Checksum uint64 + +// VerifyChecksum verifies that passed checksum is same as calculated checksum +func (cs Checksum) Verify(data interface{}) error { + if cs != New(data) { + return errors.New("checksum mismatch") + } + return nil +} + +func New(data interface{}) Checksum { + return Checksum(getChecksum(data)) +} + +// Get returns calculated checksum of checkpoint data +func getChecksum(data interface{}) uint64 { + hash := fnv.New32a() + hashutil.DeepHashObject(hash, data) + return uint64(hash.Sum32()) +} diff --git a/pkg/hash/hash.go b/pkg/hash/hash.go new file mode 100644 index 00000000..803f066a --- /dev/null +++ b/pkg/hash/hash.go @@ -0,0 +1,37 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hash + +import ( + "hash" + + "github.com/davecgh/go-spew/spew" +) + +// DeepHashObject writes specified object to hash using the spew library +// which follows pointers and prints actual values of the nested objects +// ensuring the hash does not change when a pointer changes. +func DeepHashObject(hasher hash.Hash, objectToWrite interface{}) { + hasher.Reset() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + printer.Fprintf(hasher, "%#v", objectToWrite) +}