forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 1
/
resource.go
120 lines (106 loc) · 3.86 KB
/
resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright (c) 2018 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package condition
import (
"fmt"
"github.com/Jeffail/benthos/lib/log"
"github.com/Jeffail/benthos/lib/metrics"
"github.com/Jeffail/benthos/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeResource] = TypeSpec{
constructor: NewResource,
description: `
Resource is a condition type that runs a condition resource by its name. This
condition allows you to run the same configured condition resource in multiple
processors, or as a branch of another condition.
For example, let's imagine we have two outputs, one of which only receives
messages that satisfy a condition and the other receives the logical NOT of that
same condition. In this example we can save ourselves the trouble of configuring
the same condition twice by referring to it as a resource, like this:
` + "``` yaml" + `
output:
type: broker
broker:
pattern: fan_out
outputs:
- type: foo
foo:
processors:
- type: filter
filter:
type: resource
resource: foobar
- type: bar
bar:
processors:
- type: filter
filter:
type: not
not:
type: resource
resource: foobar
resources:
conditions:
foobar:
type: text
text:
operator: equals_cs
part: 1
arg: filter me please
` + "```" + `
It is also worth noting that when conditions are used as resources in this way
they will only be executed once per message, regardless of how many times they
are referenced (unless the content is modified). Therefore, resource conditions
can act as a runtime optimisation as well as a config optimisation.`,
}
}
//------------------------------------------------------------------------------
// Resource is a condition that returns the result of a condition resource.
type Resource struct {
mgr types.Manager
name string
log log.Modular
}
// NewResource returns a resource condition.
func NewResource(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
if _, err := mgr.GetCondition(conf.Resource); err != nil {
return nil, fmt.Errorf("failed to obtain condition resource '%v': %v", conf.Resource, err)
}
return &Resource{
mgr: mgr,
name: conf.Resource,
log: log,
}, nil
}
//------------------------------------------------------------------------------
// Check attempts to check a message part against a configured condition.
func (c *Resource) Check(msg types.Message) bool {
cond, err := c.mgr.GetCondition(c.name)
if err != nil {
c.log.Debugf("Failed to obtain condition resource '%v': %v", c.name, err)
return false
}
return msg.LazyCondition(c.name, cond)
}
//------------------------------------------------------------------------------