-
Notifications
You must be signed in to change notification settings - Fork 1
/
incrlimiter.go
99 lines (92 loc) · 2.37 KB
/
incrlimiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//Package limiter 限制器
//可以用于防止短时间内大量请求同时处理,比如缓存防击穿,防爬虫等
package incrlimiter
import (
"context"
"errors"
"github.com/Golang-Tools/optparams"
"github.com/Golang-Tools/redishelper/v2/limiterhelper"
"github.com/Golang-Tools/redishelper/v2/middlewarehelper"
"github.com/go-redis/redis/v8"
)
//Limiter 分布式限制器
type Limiter struct {
opt Options
*limiterhelper.LimiterABC
*middlewarehelper.MiddleWareAbc
}
//New 创建一个限制器
//@params client redis.UniversalClient 客户端对象
//@params opts ...Option limiter的可设置项
func New(cli redis.UniversalClient, opts ...optparams.Option[Options]) (*Limiter, error) {
k := new(Limiter)
k.opt = defaultOptions
optparams.GetOption(&k.opt, opts...)
l, err := limiterhelper.New(k.opt.LimiterOpts...)
if err != nil {
return nil, err
}
m, err := middlewarehelper.New(cli, "limiter", k.opt.MiddlewareOpts...)
if err != nil {
return nil, err
}
if m.MaxTTL() <= 0 {
return nil, errors.New("incrlimiter must set MaxTTL")
}
k.MiddleWareAbc = m
k.LimiterABC = l
return k, nil
}
//Flood 灌注
//当返回为true说明注水成功,false表示无法注入(满了)
func (c *Limiter) Flood(ctx context.Context, value int64) (bool, error) {
// if c.opt.MaxTTL != 0 {
// defer c.RefreshTTL(ctx)
// }
first := false
flag, err := c.Client().Exists(ctx, c.Key()).Result()
if err != nil {
return true, err
}
if flag != 0 {
first = true
}
res, err := c.Client().IncrBy(ctx, c.Key(), value).Result()
if err != nil {
return true, err
}
if first && c.MaxTTL() != 0 {
_, err := c.Client().Expire(ctx, c.Key(), c.MaxTTL()).Result()
if err != nil {
return true, err
}
}
full := c.LimiterABC.CheckWaterline(res, false)
if full {
diff := c.Capacity() - res
if diff < 0 {
go c.Client().IncrBy(ctx, c.Key(), diff).Result()
}
}
return !full, nil
}
//WaterLevel 当前水位
func (c *Limiter) WaterLevel(ctx context.Context) (int64, error) {
res, err := c.Client().IncrBy(ctx, c.Key(), 0).Result()
if err != nil {
return 0, err
}
return res, err
}
//IsFull 观测水位是否已满
func (c *Limiter) IsFull(ctx context.Context) (bool, error) {
notfull, err := c.Flood(ctx, 0)
return !notfull, err
}
func (c *Limiter) Reset(ctx context.Context) error {
_, err := c.Client().Del(ctx, c.Key()).Result()
if err != nil {
return err
}
return nil
}