-
-
Notifications
You must be signed in to change notification settings - Fork 128
/
flood_wait.go
132 lines (113 loc) · 2.78 KB
/
flood_wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package invokers
import (
"context"
"time"
"golang.org/x/xerrors"
"github.com/gotd/td/bin"
"github.com/gotd/td/clock"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
)
// Waiter is a invoker middleware to handle FLOOD_WAIT errors from Telegram.
type Waiter struct {
prev tg.Invoker // immutable
clock clock.Clock
sch *scheduler
tick time.Duration
waitLimit int
retryLimit int
}
// NewWaiter creates new Waiter invoker middleware.
func NewWaiter(prev tg.Invoker) *Waiter {
return &Waiter{
prev: prev,
clock: clock.System,
sch: newScheduler(clock.System, time.Second),
tick: 1 * time.Millisecond,
waitLimit: 60,
retryLimit: 5,
}
}
// WithClock sets clock to use.
func (w *Waiter) WithClock(c clock.Clock) *Waiter {
w.clock = c
return w
}
// WithWaitLimit sets wait limit to use.
func (w *Waiter) WithWaitLimit(waitLimit int) *Waiter {
if waitLimit >= 0 {
w.waitLimit = waitLimit
}
return w
}
// WithRetryLimit sets retry limit to use.
func (w *Waiter) WithRetryLimit(retryLimit int) *Waiter {
if retryLimit >= 0 {
w.retryLimit = retryLimit
}
return w
}
// WithTick sets gather tick for Waiter.
func (w *Waiter) WithTick(tick time.Duration) *Waiter {
if tick > 0 {
w.tick = tick
}
return w
}
// Run runs send loop.
func (w *Waiter) Run(ctx context.Context) error {
ticker := w.clock.Ticker(w.tick)
defer ticker.Stop()
var requests []scheduled
for {
select {
case <-ticker.C():
requests = w.sch.gather(requests[:0])
if len(requests) < 1 {
continue
}
for _, s := range requests {
ret, err := w.send(s)
if ret {
select {
case s.request.result <- err:
default:
}
}
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (w *Waiter) send(s scheduled) (bool, error) {
err := w.prev.InvokeRaw(s.request.ctx, s.request.input, s.request.output)
floodWait, ok := tgerr.AsType(err, ErrFloodWait)
switch {
case !ok:
w.sch.nice(s.request.key)
return true, err
case floodWait.Argument >= w.waitLimit:
return true, xerrors.Errorf("FLOOD_WAIT argument is too big (%d >= %d)", floodWait.Argument, w.waitLimit)
case s.request.retry >= w.retryLimit:
return true, xerrors.Errorf("Retry limit exceeded (%d >= %d)", s.request.retry, w.retryLimit)
}
s.request.retry++
w.sch.flood(s.request, time.Duration(floodWait.Argument)*time.Second)
return false, nil
}
// Object is a abstraction for Telegram API object with TypeID.
type Object interface {
TypeID() uint32
}
// ErrFloodWait is error type of "FLOOD_WAIT" error.
const ErrFloodWait = "FLOOD_WAIT"
// InvokeRaw implements tg.Invoker.
func (w *Waiter) InvokeRaw(ctx context.Context, input bin.Encoder, output bin.Decoder) error {
select {
case err := <-w.sch.new(ctx, input, output):
return err
case <-ctx.Done():
return ctx.Err()
}
}