/
handle_idle.go
120 lines (100 loc) · 2.72 KB
/
handle_idle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package session
import (
"context"
"time"
"github.com/ProtonMail/gluon/async"
"github.com/ProtonMail/gluon/imap/command"
"github.com/ProtonMail/gluon/internal/response"
"github.com/ProtonMail/gluon/logging"
"github.com/ProtonMail/gluon/profiling"
"github.com/sirupsen/logrus"
)
// GOMSRV-86: What does it mean to do IDLE when you're not selected?
// GOMSRV-87: Should IDLE be stopped automatically when the context is cancelled?
func (s *Session) handleIdle(ctx context.Context, tag string, _ *command.Idle, cmdCh <-chan commandResult) error {
profiling.Start(ctx, profiling.CmdTypeIdle)
defer profiling.Stop(ctx, profiling.CmdTypeIdle)
if s.state == nil {
return ErrNotAuthenticated
}
return s.state.Idle(ctx, func(pending []response.Response, resCh chan response.Response) error {
async.GoAnnotated(ctx, s.panicHandler, func(ctx context.Context) {
if s.idleBulkTime != 0 {
sendResponsesInBulks(s, resCh, s.idleBulkTime)
} else {
for res := range resCh {
if err := res.Send(s); err != nil {
logrus.WithError(err).Error("Failed to send IDLE update")
}
}
}
}, logging.Labels{
"Action": "Sending IDLE updates",
"SessionID": s.sessionID,
})
if err := response.Continuation().Send(s); err != nil {
return err
}
for _, res := range pending {
if err := res.Send(s); err != nil {
return err
}
}
var cmd commandResult
for {
select {
case res, ok := <-cmdCh:
if !ok {
return nil
}
if res.err != nil {
return res.err
}
cmd = res
case <-s.state.Done():
return nil
case stateUpdate := <-s.state.GetStateUpdatesCh():
if err := s.state.ApplyUpdate(ctx, stateUpdate); err != nil {
logrus.WithError(err).Error("Failed to apply state update during idle")
}
continue
case <-ctx.Done():
return ctx.Err()
}
switch cmd.command.Payload.(type) {
case *command.Done:
return response.Ok(tag).WithMessage("IDLE").Send(s)
default:
return response.Bad(tag).Send(s)
}
}
})
}
func sendMergedResponses(s *Session, buffer []response.Response) {
for _, res := range response.Merge(buffer) {
if err := res.Send(s); err != nil {
logrus.WithError(err).Error("Failed to send IDLE update")
}
}
}
func sendResponsesInBulks(s *Session, resCh chan response.Response, idleBulkTime time.Duration) {
buffer := []response.Response{}
ticker := time.NewTicker(idleBulkTime)
defer ticker.Stop()
for {
select {
case res, ok := <-resCh:
if !ok {
sendMergedResponses(s, buffer)
return
}
if res != nil {
buffer = append(buffer, res)
logrus.WithField("response", res).Trace("Buffered")
}
case <-ticker.C:
sendMergedResponses(s, buffer)
buffer = []response.Response{}
}
}
}