/
outbox.go
103 lines (89 loc) · 1.75 KB
/
outbox.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package core
import (
"context"
"sync"
"time"
"github.com/hood-chat/core/entity"
"github.com/libp2p/go-libp2p/core/peer"
)
const Timeout = 60 * 5
type DataItem struct{
nvp *entity.Envelop
failed bool
}
type Data map[peer.ID][]DataItem
type outbox struct {
mux sync.Mutex
data Data
failed chan *entity.Envelop
bctx context.Context
bcancel context.CancelFunc
}
func newOutBox() *outbox {
return &outbox{
mux: sync.Mutex{},
data: make(Data),
failed: make(chan *entity.Envelop),
bctx: nil,
bcancel: nil,
}
}
func (o *outbox) put(key peer.ID, val *entity.Envelop) {
o.mux.Lock()
defer o.mux.Unlock()
o.data[key] = append(o.data[key], DataItem{val, false})
o.mayStart()
}
func (o *outbox) pop(key peer.ID) []*entity.Envelop {
o.mux.Lock()
var msgs []*entity.Envelop
da, ok := o.data[key]
if ok {
delete(o.data, key)
}
for _,v := range da {
msgs = append(msgs, v.nvp)
}
o.mux.Unlock()
o.mayStop()
return msgs
}
func (o *outbox) mayStart() {
if o.bctx == nil {
o.bctx, o.bcancel = context.WithCancel(context.Background())
go o.background(o.bctx)
}
}
func (o *outbox) mayStop() {
o.mux.Lock()
defer o.mux.Unlock()
if len(o.data) == 0 && o.bctx != nil {
o.bcancel()
o.bctx = nil
o.bcancel = nil
}
}
func (o *outbox) background(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
for {
select {
case t := <-ticker.C:
o.mux.Lock()
tmp := make(Data)
for k, v := range o.data {
for _, m := range v {
if m.nvp.CreatedAt+(Timeout) <= t.UTC().Unix() && !m.failed {
o.failed <- m.nvp
}
tmp[k] = append(tmp[k], m)
}
}
o.data = tmp
o.mux.Unlock()
o.mayStop()
case e:=<-ctx.Done():
log.Error("context error broke sender",e)
return
}
}
}