forked from mymmrac/telego
-
Notifications
You must be signed in to change notification settings - Fork 0
/
helpers_long_pulling.go
173 lines (141 loc) · 4.7 KB
/
helpers_long_pulling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package tgo
import (
"errors"
"fmt"
"sync"
"time"
)
const (
defaultLongPullingUpdateChanBuffer = 100 // Limited by number of updates in single Bot.GetUpdates() call
defaultLongPullingUpdateInterval = time.Second / 2 // 0.5s
defaultLongPullingRetryTimeout = time.Second * 3 // 3s
)
// longPullingContext represents configuration of getting updates via long pulling
type longPullingContext struct {
running bool
runningLock sync.RWMutex
stop chan struct{}
updateChanBuffer uint
updateInterval time.Duration
retryTimeout time.Duration
}
// LongPullingOption represents option that can be applied to longPullingContext
type LongPullingOption func(ctx *longPullingContext) error
// WithLongPullingUpdateInterval sets updates interval for long pulling. Ensures that between two calls of
// Bot.GetUpdates() will be at least specified time, but it could be longer. Default is 0.5s.
// Note: Telegram has built in timeout mechanism, to properly use it set GetUpdatesParams.Timeout to desired timeout
// and update interval to 0.
func WithLongPullingUpdateInterval(updateInterval time.Duration) LongPullingOption {
return func(ctx *longPullingContext) error {
if updateInterval < 0 {
return errors.New("update interval can't be negative")
}
ctx.updateInterval = updateInterval
return nil
}
}
// WithLongPullingRetryTimeout sets updates retry timeout for long pulling. Ensures that between two calls of
// Bot.GetUpdates() will be at least specified time if an error occurred, but it could be longer. Default is 3s.
func WithLongPullingRetryTimeout(retryTimeout time.Duration) LongPullingOption {
return func(ctx *longPullingContext) error {
if retryTimeout < 0 {
return errors.New("retry timeout can't be negative")
}
ctx.retryTimeout = retryTimeout
return nil
}
}
// WithLongPullingBuffer sets buffering for update chan. Default is 100.
func WithLongPullingBuffer(chanBuffer uint) LongPullingOption {
return func(ctx *longPullingContext) error {
ctx.updateChanBuffer = chanBuffer
return nil
}
}
// UpdatesViaLongPulling receive updates in chan using GetUpdates() method.
// Calling if already running (before StopLongPulling() method) will return an error.
// Note: After you done with getting updates you should call StopLongPulling() method which will close update chan.
func (b *Bot) UpdatesViaLongPulling(params *GetUpdatesParams, options ...LongPullingOption) (<-chan Update, error) {
if b.longPullingContext != nil {
return nil, errors.New("tgo: long pulling context already exist")
}
ctx, err := b.createLongPullingContext(options)
if err != nil {
return nil, err
}
ctx.runningLock.Lock()
defer ctx.runningLock.Unlock()
b.longPullingContext = ctx
ctx.stop = make(chan struct{})
ctx.running = true
updatesChan := make(chan Update, ctx.updateChanBuffer)
if params == nil {
params = &GetUpdatesParams{}
}
go func() {
defer close(updatesChan)
for {
select {
case <-ctx.stop:
return
default:
// Continue getting updates
}
updates, err := b.GetUpdates(params)
if err != nil {
b.log.Errorf("Getting updates: %v", err)
b.log.Errorf("Retrying to get updates in %s", ctx.retryTimeout.String())
time.Sleep(ctx.retryTimeout)
continue
}
for _, update := range updates {
if update.UpdateID >= params.Offset {
params.Offset = update.UpdateID + 1
updatesChan <- update
}
}
time.Sleep(ctx.updateInterval)
}
}()
return updatesChan, nil
}
func (b *Bot) createLongPullingContext(options []LongPullingOption) (*longPullingContext, error) {
ctx := &longPullingContext{
updateChanBuffer: defaultLongPullingUpdateChanBuffer,
updateInterval: defaultLongPullingUpdateInterval,
retryTimeout: defaultLongPullingRetryTimeout,
}
for _, option := range options {
if err := option(ctx); err != nil {
return nil, fmt.Errorf("tgo: options: %w", err)
}
}
return ctx, nil
}
// IsRunningLongPulling tells if UpdatesViaLongPulling() is running
func (b *Bot) IsRunningLongPulling() bool {
ctx := b.longPullingContext
if ctx == nil {
return false
}
ctx.runningLock.RLock()
defer ctx.runningLock.RUnlock()
return ctx.running
}
// StopLongPulling stop reviving updates from UpdatesViaLongPulling() method, stopping is non-blocking, it closes update
// chan, so it's caller's responsibility to process all unhandled updates after calling stop. Stop will only ensure
// that no more updates will come in update chan.
// Calling StopLongPulling() multiple times does nothing.
func (b *Bot) StopLongPulling() {
ctx := b.longPullingContext
if ctx == nil {
return
}
ctx.runningLock.Lock()
defer ctx.runningLock.Unlock()
if ctx.running {
ctx.running = false
close(ctx.stop)
b.longPullingContext = nil
}
}