-
Notifications
You must be signed in to change notification settings - Fork 782
/
check_field.go
186 lines (157 loc) · 4.86 KB
/
check_field.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package condition
import (
"encoding/json"
"errors"
"strings"
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/Jeffail/gabs/v2"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeCheckField] = TypeSpec{
constructor: NewCheckField,
Summary: `
Extracts the value of a field identified via [dot path](/docs/configuration/field_paths)
within messages (currently only JSON format is supported) and then tests the
extracted value against a child condition.`,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("path", "A [field path](/docs/configuration/field_paths) to check against the child condition."),
docs.FieldCommon("condition", "A child condition to test the field contents against."),
docs.FieldAdvanced(
"parts",
`An optional array of message indexes of a batch that the condition should apply to.
If left empty all messages are processed. This field is only applicable when
batching messages [at the input level](/docs/configuration/batching).
Indexes can be negative, and if so the part will be selected from the end
counting backwards starting from -1.`,
),
},
}
}
//------------------------------------------------------------------------------
// CheckFieldConfig contains configuration fields for the CheckField condition.
type CheckFieldConfig struct {
Parts []int `json:"parts" yaml:"parts"`
Path string `json:"path" yaml:"path"`
Condition *Config `json:"condition" yaml:"condition"`
}
// NewCheckFieldConfig returns a CheckFieldConfig with default values.
func NewCheckFieldConfig() CheckFieldConfig {
return CheckFieldConfig{
Parts: []int{},
Path: "",
Condition: nil,
}
}
//------------------------------------------------------------------------------
type dummyCheckFieldConfig struct {
Parts []int `json:"parts" yaml:"parts"`
Path string `json:"path" yaml:"path"`
Condition interface{} `json:"condition" yaml:"condition"`
}
// MarshalJSON prints an empty object instead of nil.
func (c CheckFieldConfig) MarshalJSON() ([]byte, error) {
dummy := dummyCheckFieldConfig{
Parts: c.Parts,
Path: c.Path,
Condition: c.Condition,
}
if c.Condition == nil {
dummy.Condition = struct{}{}
}
return json.Marshal(dummy)
}
// MarshalYAML prints an empty object instead of nil.
func (c CheckFieldConfig) MarshalYAML() (interface{}, error) {
dummy := dummyCheckFieldConfig{
Parts: c.Parts,
Path: c.Path,
Condition: c.Condition,
}
if c.Condition == nil {
dummy.Condition = struct{}{}
}
return dummy, nil
}
//------------------------------------------------------------------------------
// CheckField is a condition that extracts a field and checks the contents
// against a child condition.
type CheckField struct {
conf CheckFieldConfig
log log.Modular
stats metrics.Type
child Type
path []string
mCount metrics.StatCounter
mTrue metrics.StatCounter
mFalse metrics.StatCounter
mErrJSON metrics.StatCounter
mErr metrics.StatCounter
}
// NewCheckField returns a CheckField condition.
func NewCheckField(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
if conf.CheckField.Condition == nil {
return nil, errors.New("cannot create check_field condition without a child")
}
child, err := New(*conf.CheckField.Condition, mgr, log, stats)
if err != nil {
return nil, err
}
return &CheckField{
conf: conf.CheckField,
log: log,
stats: stats,
child: child,
path: strings.Split(conf.CheckField.Path, "."),
mCount: stats.GetCounter("count"),
mTrue: stats.GetCounter("true"),
mFalse: stats.GetCounter("false"),
mErrJSON: stats.GetCounter("error_json_parse"),
mErr: stats.GetCounter("error"),
}, nil
}
//------------------------------------------------------------------------------
// Check attempts to check a message part against a configured condition
func (c *CheckField) Check(msg types.Message) bool {
c.mCount.Incr(1)
payload := msg.Copy()
proc := func(index int) {
payload.Get(index).Set([]byte(""))
jpart, err := msg.Get(index).JSON()
if err != nil {
c.log.Debugf("Failed to parse message as JSON: %v\n", err)
c.mErrJSON.Incr(1)
c.mErr.Incr(1)
return
}
gpart := gabs.Wrap(jpart).S(c.path...)
switch t := gpart.Data().(type) {
case string:
payload.Get(index).Set([]byte(t))
default:
payload.Get(index).SetJSON(gpart.Data())
}
}
if len(c.conf.Parts) == 0 {
for i := 0; i < payload.Len(); i++ {
proc(i)
}
} else {
for _, index := range c.conf.Parts {
proc(index)
}
}
res := c.child.Check(payload)
if res {
c.mTrue.Incr(1)
} else {
c.mFalse.Incr(1)
}
return res
}
//------------------------------------------------------------------------------