forked from benthosdev/benthos
/
throttle.go
135 lines (107 loc) · 3.57 KB
/
throttle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package processor
import (
"fmt"
"sync"
"time"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/internal/tracing"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeThrottle] = TypeSpec{
constructor: NewThrottle,
Status: docs.StatusDeprecated,
Categories: []Category{
CategoryUtility,
},
Summary: `
Throttles the throughput of a pipeline to a maximum of one message batch per
period. This throttle is per processing pipeline, and therefore four threads
each with a throttle would result in four times the rate specified.`,
Description: `
The period should be specified as a time duration string. For example, '1s'
would be 1 second, '10ms' would be 10 milliseconds, etc.
### Alternatives
It's recommended that you use the ` + "[`rate_limit` processor](/docs/components/processors/rate_limit)" + ` instead.`,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("period", "The period to throttle to."),
},
}
}
//------------------------------------------------------------------------------
// ThrottleConfig contains configuration fields for the Throttle processor.
type ThrottleConfig struct {
Period string `json:"period" yaml:"period"`
}
// NewThrottleConfig returns a ThrottleConfig with default values.
func NewThrottleConfig() ThrottleConfig {
return ThrottleConfig{
Period: "100us",
}
}
//------------------------------------------------------------------------------
// Throttle is a processor that limits the stream of a pipeline to one message
// batch per period specified.
type Throttle struct {
conf Config
log log.Modular
stats metrics.Type
duration time.Duration
lastBatch time.Time
mut sync.Mutex
mCount metrics.StatCounter
mSent metrics.StatCounter
mBatchSent metrics.StatCounter
}
// NewThrottle returns a Throttle processor.
func NewThrottle(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
t := &Throttle{
conf: conf,
log: log,
stats: stats,
mCount: stats.GetCounter("count"),
mSent: stats.GetCounter("sent"),
mBatchSent: stats.GetCounter("batch.sent"),
}
var err error
if t.duration, err = time.ParseDuration(conf.Throttle.Period); err != nil {
return nil, fmt.Errorf("failed to parse period: %v", err)
}
return t, nil
}
//------------------------------------------------------------------------------
// ProcessMessage applies the processor to a message, either creating >0
// resulting messages or a response to be sent back to the message source.
func (m *Throttle) ProcessMessage(msg types.Message) ([]types.Message, types.Response) {
m.mCount.Incr(1)
m.mut.Lock()
defer m.mut.Unlock()
spans := tracing.CreateChildSpans(TypeThrottle, msg)
var throttleFor time.Duration
if since := time.Since(m.lastBatch); m.duration > since {
throttleFor = m.duration - since
time.Sleep(throttleFor)
}
for _, s := range spans {
s.SetTag("throttled_for", throttleFor.String())
s.Finish()
}
m.lastBatch = time.Now()
m.mBatchSent.Incr(1)
m.mSent.Incr(int64(msg.Len()))
msgs := [1]types.Message{msg}
return msgs[:], nil
}
// CloseAsync shuts down the processor and stops processing requests.
func (m *Throttle) CloseAsync() {
}
// WaitForClose blocks until the processor has closed down.
func (m *Throttle) WaitForClose(timeout time.Duration) error {
return nil
}
//------------------------------------------------------------------------------