forked from benthosdev/benthos
/
count.go
116 lines (97 loc) · 2.89 KB
/
count.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package condition
import (
"sync"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeCount] = TypeSpec{
constructor: NewCount,
Summary: `
Counts messages starting from one, returning true until the counter reaches its
target, at which point it will return false and reset the counter.`,
Description: `
Each discrete count condition will have its own counter. Parallel processors
containing a count condition will therefore count independently. It is, however,
possible to share the counter across processor pipelines by defining the count
condition as a resource.`,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("arg", "A number to count towards."),
},
Footnotes: `
## Examples
This condition is useful when paired with the
` + "[`read_until`](/docs/components/inputs/read_until)" + ` input, as it can be
used to cut the input stream off once a certain number of messages have been
read:
` + "```yaml" + `
# Only read 100 messages, and then exit.
input:
read_until:
input:
kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
condition:
not:
count:
arg: 100
` + "```" + ``,
}
}
//------------------------------------------------------------------------------
// CountConfig is a configuration struct containing fields for the Count
// condition.
type CountConfig struct {
Arg int `json:"arg" yaml:"arg"`
}
// NewCountConfig returns a CountConfig with default values.
func NewCountConfig() CountConfig {
return CountConfig{
Arg: 100,
}
}
//------------------------------------------------------------------------------
// Count is a condition that counts each message and returns false once a target
// count has been reached, at which point it resets the counter and starts
// again.
type Count struct {
arg int
ctr int
mut sync.Mutex
mCount metrics.StatCounter
mTrue metrics.StatCounter
mFalse metrics.StatCounter
}
// NewCount returns a Count condition.
func NewCount(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
return &Count{
arg: conf.Count.Arg,
ctr: 0,
mCount: stats.GetCounter("count"),
mTrue: stats.GetCounter("true"),
mFalse: stats.GetCounter("false"),
}, nil
}
//------------------------------------------------------------------------------
// Check attempts to check a message part against a configured condition.
func (c *Count) Check(msg types.Message) bool {
c.mut.Lock()
defer c.mut.Unlock()
c.mCount.Incr(1)
c.ctr++
if c.ctr < c.arg {
c.mFalse.Incr(1)
return true
}
c.ctr = 0
c.mTrue.Incr(1)
return false
}
//------------------------------------------------------------------------------