/
local.go
130 lines (110 loc) · 3.44 KB
/
local.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package ratelimit
import (
"errors"
"fmt"
"sync"
"time"
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeLocal] = TypeSpec{
constructor: NewLocal,
Summary: `
The local rate limit is a simple X every Y type rate limit that can be shared
across any number of components within the pipeline but does not support
distributed rate limits across multiple running instances of Benthos.`,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("count", "The maximum number of requests to allow for a given period of time."),
docs.FieldCommon("interval", "The time window to limit requests by."),
},
}
}
//------------------------------------------------------------------------------
// LocalConfig is a config struct containing rate limit fields for a local rate
// limit.
type LocalConfig struct {
Count int `json:"count" yaml:"count"`
Interval string `json:"interval" yaml:"interval"`
}
// NewLocalConfig returns a local rate limit configuration struct with default
// values.
func NewLocalConfig() LocalConfig {
return LocalConfig{
Count: 1000,
Interval: "1s",
}
}
//------------------------------------------------------------------------------
// Local is a structure that tracks a rate limit, it can be shared across
// parallel processes in order to maintain a maximum rate of a protected
// resource.
type Local struct {
mut sync.Mutex
bucket int
lastRefresh time.Time
size int
period time.Duration
mChecked metrics.StatCounter
mLimited metrics.StatCounter
mErr metrics.StatCounter
}
// NewLocal creates a local rate limit from a configuration struct. This type is
// safe to share and call from parallel goroutines.
func NewLocal(
conf Config,
mgr types.Manager,
logger log.Modular,
stats metrics.Type,
) (types.RateLimit, error) {
if conf.Local.Count <= 0 {
return nil, errors.New("count must be larger than zero")
}
period, err := time.ParseDuration(conf.Local.Interval)
if err != nil {
return nil, fmt.Errorf("failed to parse interval: %v", err)
}
return &Local{
bucket: conf.Local.Count,
lastRefresh: time.Now(),
size: conf.Local.Count,
period: period,
mChecked: stats.GetCounter("checked"),
mLimited: stats.GetCounter("limited"),
mErr: stats.GetCounter("error"),
}, nil
}
//------------------------------------------------------------------------------
// Access the rate limited resource. Returns a duration or an error if the rate
// limit check fails. The returned duration is either zero (meaning the resource
// can be accessed) or a reasonable length of time to wait before requesting
// again.
func (r *Local) Access() (time.Duration, error) {
r.mChecked.Incr(1)
r.mut.Lock()
r.bucket--
if r.bucket < 0 {
r.bucket = 0
remaining := r.period - time.Since(r.lastRefresh)
if remaining > 0 {
r.mut.Unlock()
r.mLimited.Incr(1)
return remaining, nil
}
r.bucket = r.size - 1
r.lastRefresh = time.Now()
}
r.mut.Unlock()
return 0, nil
}
// CloseAsync shuts down the rate limit.
func (r *Local) CloseAsync() {
}
// WaitForClose blocks until the rate limit has closed down.
func (r *Local) WaitForClose(timeout time.Duration) error {
return nil
}
//------------------------------------------------------------------------------