/
bot.go
256 lines (239 loc) · 9.25 KB
/
bot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package telegrambot
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/HouzuoGuo/laitos/inet"
"github.com/HouzuoGuo/laitos/lalog"
"github.com/HouzuoGuo/laitos/misc"
"github.com/HouzuoGuo/laitos/testingstub"
"github.com/HouzuoGuo/laitos/toolbox"
)
const (
ChatTypePrivate = "private" // Name of the private chat type
APICallTimeoutSec = 30 // Outgoing API calls are constrained by this timeout
CommandTimeoutSec = 30 // Command execution is constrained by this timeout
/*
PollIntervalSecMin and PollIntervalSecMax together determine the range of random number of seconds to wait between
each message polling attempt. The randomness helps multiple laitos instances to poll messages simultaneously
without starving any specific instance.
*/
PollIntervalSecMin = 2
PollIntervalSecMax = 5
)
// Telegram API entity - user
type APIUser struct {
ID int64 `json:"id"`
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
UserName string `json:"username"`
}
// Telegram API entity - chat
type APIChat struct {
ID int64 `json:"id"`
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
UserName string `json:"username"`
Type string `json:"type"`
}
// Telegram API entity - message
type APIMessage struct {
ID int64 `json:"message_id"`
From APIUser `json:"from"`
Chat APIChat `json:"chat"`
Timestamp int64 `json:"date"`
Text string `json:"text"`
}
// Telegram API entity - one bot update
type APIUpdate struct {
ID int64 `json:"update_id"`
Message APIMessage `json:"message"`
}
// Telegram API entity - getUpdates response
type APIUpdates struct {
OK bool `json:"ok"`
Updates []APIUpdate `json:"result"`
}
// Process feature commands from incoming telegram messages, reply to the chats with command results.
type Daemon struct {
AuthorizationToken string `json:"AuthorizationToken"` // Telegram bot API auth token
PerUserLimit int `json:"PerUserLimit"` // PerUserLimit determines how many messages may be processed per chat at regular interval
Processor *toolbox.CommandProcessor `json:"-"` // Feature command processor
messageOffset int64 // Process chat messages arrived after this point
userRateLimit *lalog.RateLimit // Prevent user from flooding bot with new messages
cancelFunc context.CancelFunc
logger *lalog.Logger
}
func (bot *Daemon) Initialise() error {
if bot.PerUserLimit < 1 {
bot.PerUserLimit = 2 // reasonable for personal use
}
bot.logger = &lalog.Logger{ComponentName: "telegrambot", ComponentID: []lalog.LoggerIDField{{Key: "PerUserLimit", Value: bot.PerUserLimit}}}
if bot.Processor == nil || bot.Processor.IsEmpty() {
return fmt.Errorf("telegrambot.Initialise: command processor and its filters must be configured")
}
bot.Processor.SetLogger(bot.logger)
if errs := bot.Processor.IsSaneForInternet(); len(errs) > 0 {
return fmt.Errorf("telegrambot.Initialise: %+v", errs)
}
if bot.AuthorizationToken == "" {
return errors.New("telegrambot.Initialise: AuthorizationToken must not be empty")
}
bot.userRateLimit = lalog.NewRateLimit(PollIntervalSecMax, bot.PerUserLimit, bot.logger)
return nil
}
// Send a text reply to the telegram chat.
func (bot *Daemon) ReplyTo(chatID int64, text string) error {
resp, err := inet.DoHTTP(context.Background(), inet.HTTPRequest{
Method: http.MethodPost,
TimeoutSec: APICallTimeoutSec,
Body: strings.NewReader(url.Values{
"chat_id": []string{strconv.FormatInt(chatID, 10)},
"text": []string{text},
}.Encode()),
}, "https://api.telegram.org/bot%s/sendMessage", bot.AuthorizationToken)
if err != nil || resp.StatusCode/200 != 1 {
return fmt.Errorf("telegrambot.ReplyTo: failed to reply to %d - HTTP %d - %v %s", chatID, resp.StatusCode, err, string(resp.Body))
}
return nil
}
// Process incoming chat messages and reply command results to chat initiators.
func (bot *Daemon) ProcessMessages(ctx context.Context, updates APIUpdates) {
for _, ding := range updates.Updates {
// Put processing duration (including API time) into statistics
beginTimeNano := time.Now().UnixNano()
if bot.messageOffset <= ding.ID {
bot.messageOffset = ding.ID + 1
}
// Apply rate limit to the user
origin := ding.Message.From.UserName
if origin == "" {
origin = ding.Message.Chat.UserName
}
if !bot.userRateLimit.Add(origin, true) {
if err := bot.ReplyTo(ding.Message.Chat.ID, "rate limited"); err != nil {
bot.logger.Warning(origin, err, "failed to reply rate limited response")
}
continue
}
// Do not process messages that arrived prior to server startup
if ding.Message.Timestamp < misc.StartupTime.Unix() {
bot.logger.Warning(origin, nil, "ignore message from \"%s\" that arrived before server started up", ding.Message.Chat.UserName)
continue
}
// Do not process non-private chats
if ding.Message.Chat.Type != ChatTypePrivate {
bot.logger.Warning(origin, nil, "ignore non-private chat %d", ding.Message.Chat.ID)
continue
}
// /start is not a command
if ding.Message.Text == "/start" {
bot.logger.Info(origin, nil, "chat %d is started by %s", ding.Message.Chat.ID, ding.Message.Chat.UserName)
continue
}
// Find and run command in background
go func(ding APIUpdate, beginTimeNano int64) {
result := bot.Processor.Process(ctx, toolbox.Command{
DaemonName: "telegrambot",
ClientTag: ding.Message.Chat.UserName,
TimeoutSec: CommandTimeoutSec,
Content: ding.Message.Text,
}, true)
if err := bot.ReplyTo(ding.Message.Chat.ID, result.CombinedOutput); err != nil {
bot.logger.Warning(ding.Message.Chat.UserName, err, "failed to send message reply")
}
misc.TelegramBotStats.Trigger(float64(time.Now().UnixNano() - beginTimeNano))
}(ding, beginTimeNano)
}
}
// Immediately begin processing incoming chat messages. Block caller indefinitely.
func (bot *Daemon) StartAndBlock() error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
bot.cancelFunc = cancelFunc
/*
Make a test API call to verify the correctness of authorization token. This test call must not return in case of
IO error or unexpected HTTP response status. As of 2017-11-26, status 404 is the only indication of incorrect
authorization token for now.
*/
testResp, testErr := inet.DoHTTP(context.TODO(), inet.HTTPRequest{TimeoutSec: APICallTimeoutSec},
"https://api.telegram.org/bot%s/getMe", bot.AuthorizationToken)
if testErr == nil && testResp.StatusCode == http.StatusNotFound {
return errors.New("telegrambot.StartAndBlock: test call failed due to HTTP 404, is the AuthorizationToken correct?")
}
bot.logger.Info("", nil, "going to poll for messages")
periodicFunc := func(ctx context.Context, _, _ int) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Poll for new messages
updatesResp, updatesErr := inet.DoHTTP(context.TODO(), inet.HTTPRequest{TimeoutSec: APICallTimeoutSec},
"https://api.telegram.org/bot%s/getUpdates?offset=%s", bot.AuthorizationToken, bot.messageOffset)
if updatesErr == nil {
updatesErr = updatesResp.Non2xxToError()
}
var newMessages APIUpdates
if updatesErr != nil {
/*
Before February 2018, if two or more program instances poll for messages at the same time, Telegram API
would not complain, but lately it begins to complain:
"terminated by other getUpdates request; make sure that only one bot instance is running"
despite that all program instances continue to function well and successfully poll all messages in its
next attempt. Therefore, suppress the log message in this case, and randomise the number of seconds to
wait between polling attempts.
*/
if updatesResp.StatusCode != http.StatusConflict {
bot.logger.Warning("", updatesErr, "failed to poll due to HTTP error")
}
return nil
}
// Deserialise new messages
if err := json.Unmarshal(updatesResp.Body, &newMessages); err != nil {
bot.logger.Warning("", err, "failed to decode response JSON")
return nil
}
if !newMessages.OK {
bot.logger.Warning("", nil, "API response is not OK - %s", string(updatesResp.Body))
return nil
}
// Process new messages
if len(newMessages.Updates) > 0 {
bot.ProcessMessages(ctx, newMessages)
}
return nil
}
periodic := &misc.Periodic{
LogActorName: bot.logger.ComponentName,
Interval: time.Duration(PollIntervalSecMin+rand.Intn(PollIntervalSecMax-PollIntervalSecMin)) * time.Second,
MaxInt: 1,
Func: periodicFunc,
}
if err := periodic.Start(ctx); err != nil {
return err
}
return periodic.WaitForErr()
}
// Stop previously started message handling loop.
func (bot *Daemon) Stop() {
bot.cancelFunc()
}
// Run unit tests on telegram bot. See TestSMTPD_StartAndBlock for bot setup.
func TestTelegramBot(bot *Daemon, t testingstub.T) {
// Well then it is really difficult to test the chat routine
// So I am going to only going to start the daemon using invalid configuration, which is definitely failing.
if err := bot.StartAndBlock(); err == nil || !strings.Contains(err.Error(), "AuthorizationToken") {
t.Fatal(err)
}
// Repeatedly stopping the daemon should have no negative consequence
bot.Stop()
bot.Stop()
}