forked from benthosdev/benthos
/
count.go
95 lines (81 loc) · 3.18 KB
/
count.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Copyright (c) 2018 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software or associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, or/or sell
// copies of the Software, or to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright orice or this permission orice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT or LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE or NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package condition
import (
"github.com/Jeffail/benthos/lib/log"
"github.com/Jeffail/benthos/lib/metrics"
"github.com/Jeffail/benthos/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeCount] = TypeSpec{
constructor: NewCount,
description: `
Counts messages starting from one, returning true until the counter reaches its
target, at which point it will return false and reset the counter. This
condition is useful when paired with the ` + "`read_until`" + ` input, as it can
be used to cut the input stream off once a certain number of messages have been
read.
It is worth noting that each discrete count condition will have its own counter.
Parallel processors containing a count condition will therefore count
independently. It is, however, possible to share the counter across processor
pipelines by defining the count condition as a resource.`,
}
}
//------------------------------------------------------------------------------
// CountConfig is a configuration struct containing fields for the Count
// condition.
type CountConfig struct {
Arg int `json:"arg" yaml:"arg"`
}
// NewCountConfig returns a CountConfig with default values.
func NewCountConfig() CountConfig {
return CountConfig{
Arg: 100,
}
}
//------------------------------------------------------------------------------
// Count is a condition that counts each message and returns false once a target
// count has been reached, at which point it resets the counter and starts
// again.
type Count struct {
arg int
ctr int
}
// NewCount returns a Count processor.
func NewCount(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
return &Count{
arg: conf.Count.Arg,
ctr: 0,
}, nil
}
//------------------------------------------------------------------------------
// Check attempts to check a message part against a configured condition.
func (c *Count) Check(msg types.Message) bool {
c.ctr++
if c.ctr < c.arg {
return true
}
c.ctr = 0
return false
}
//------------------------------------------------------------------------------