-
Notifications
You must be signed in to change notification settings - Fork 13
/
blaze_handler.go
99 lines (80 loc) · 1.89 KB
/
blaze_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package mixin
import (
"context"
"time"
"github.com/gorilla/websocket"
"golang.org/x/sync/errgroup"
)
type blazeHandler struct {
*Client
conn *websocket.Conn
readDeadline time.Time
}
func (b *blazeHandler) SetReadDeadline(t time.Time) error {
if err := b.conn.SetReadDeadline(t); err != nil {
return err
}
b.readDeadline = t
return nil
}
func (b *blazeHandler) ack(ctx context.Context, ackBuffer <-chan string) {
const dur = time.Second
t := time.NewTimer(dur)
const maxBatch = 8 * ackBatch // 640
requests := make([]*AcknowledgementRequest, 0, ackBatch)
for {
select {
case id, ok := <-ackBuffer:
if !ok {
return
}
requests = append(requests, &AcknowledgementRequest{
MessageID: id,
Status: MessageStatusRead,
})
if count := len(requests); count >= maxBatch {
count = maxBatch
if err := b.sendAcknowledgements(ctx, requests[:count]); err == nil {
remain := requests[count:]
copy(requests, remain)
requests = requests[:len(remain)]
if len(requests) == 0 {
if !t.Stop() {
<-t.C
}
t.Reset(dur)
}
}
}
case <-t.C:
if count := len(requests); count > 0 {
if count > maxBatch {
count = maxBatch
}
if err := b.sendAcknowledgements(ctx, requests[:count]); err == nil {
remain := requests[count:]
copy(requests, remain)
requests = requests[:len(remain)]
}
}
t.Reset(dur)
}
}
}
func (b *blazeHandler) sendAcknowledgements(ctx context.Context, requests []*AcknowledgementRequest) error {
if len(requests) <= ackBatch {
return b.SendAcknowledgements(ctx, requests)
}
var g errgroup.Group
for idx := 0; idx < len(requests); idx += ackBatch {
right := idx + ackBatch
if right > len(requests) {
right = len(requests)
}
batch := requests[idx:right]
g.Go(func() error {
return b.SendAcknowledgements(ctx, batch)
})
}
return g.Wait()
}