/
asker.go
84 lines (72 loc) · 1.24 KB
/
asker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package mbapp
import (
"context"
"sync"
)
type askID struct {
GroupID
Addr string
}
type ask struct {
once sync.Once
done chan struct{}
respBuf []byte
n int
errCode uint8
}
func (a *ask) await(ctx context.Context) error {
select {
case <-ctx.Done():
a.abort()
return ctx.Err()
case <-a.done:
return nil
}
}
func (a *ask) complete(resp []byte, errCode uint8) {
a.once.Do(func() {
a.errCode = errCode
a.n = copy(a.respBuf, resp)
close(a.done)
})
}
func (a *ask) abort() {
a.once.Do(func() {
close(a.done)
})
}
type asker struct {
mu sync.RWMutex
inFlight map[askID]*ask
}
func newAsker() *asker {
return &asker{
inFlight: make(map[askID]*ask),
}
}
func (a *asker) createAsk(id askID, respBuf []byte) *ask {
ask := &ask{
done: make(chan struct{}),
respBuf: respBuf,
}
a.mu.Lock()
defer a.mu.Unlock()
a.inFlight[id] = ask
return ask
}
func (a *asker) removeAsk(id askID) {
a.mu.Lock()
defer a.mu.Unlock()
delete(a.inFlight, id)
}
func (a *asker) getAndRemoveAsk(id askID) *ask {
a.mu.Lock()
defer a.mu.Unlock()
ask := a.inFlight[id]
delete(a.inFlight, id)
return ask
}
func retry(ctx context.Context, fn func() error) error {
// TODO: retry until context is cancelled.
return fn()
}