-
Notifications
You must be signed in to change notification settings - Fork 373
/
processors.go
70 lines (64 loc) · 2.05 KB
/
processors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Copyright (c) 2021 Terminus, Inc.
//
// This program is free software: you can use, redistribute, and/or modify
// it under the terms of the GNU Affero General Public License, version 3
// or later ("AGPL"), as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package analysis
import (
"github.com/recallsong/go-utils/encoding"
"github.com/recallsong/go-utils/reflectx"
"github.com/erda-project/erda/modules/extensions/loghub/metrics/analysis/processors"
_ "github.com/erda-project/erda/modules/extensions/loghub/metrics/analysis/processors/regex" //
)
type processorConfig struct {
Type string `json:"type"`
Config encoding.RawBytes `json:"config"`
}
type tag struct {
Key string `json:"key"`
Value string `json:"value"`
}
func (p *provider) loadProcessors() error {
list, err := p.db.LogMetricConfig.QueryEnabledByScope(p.C.Processors.Scope, p.C.Processors.ScopeID)
if err != nil {
return err
}
ps := processors.New()
for _, item := range list {
if len(item.Filters) <= 0 {
continue
}
var taglist []*tag
err := json.Unmarshal(reflectx.StringToBytes(item.Filters), &taglist)
if err != nil {
p.L.Debugf("fail to parse log filters: %s", err)
continue
}
tags := make(map[string]string, len(taglist)+4)
for _, item := range taglist {
tags[item.Key] = item.Value
}
var configs []*processorConfig
err = json.Unmarshal(reflectx.StringToBytes(item.Processors), &configs)
if err != nil {
p.L.Debugf("fail to parse log processors: %s", err)
continue
}
for _, cfg := range configs {
err := ps.Add(item.ScopeID, tags, item.Metric, cfg.Type, cfg.Config)
if err != nil {
p.L.Debugf("fail to add log processor: %s", err)
continue
}
}
}
p.processors.Store(ps)
return nil
}