-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.go
120 lines (99 loc) · 1.88 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package chatmq
import (
"sync"
"time"
)
type queue struct {
lock chan bool
data [][]byte
time time.Time
}
var mq sync.Map
var queueOverdueDuration = time.Minute
//Put put.
func Put(key string, e interface{}) {
data, err := encode(e)
if err != nil {
warn("encode %v fail %v", e, err)
return
}
k := skey(key)
put(k, data)
nodes.Range(func(_, value interface{}) bool {
if node, ok := value.(*node); ok {
node.send(methodPut, k, data)
}
return true
})
}
func put(key [16]byte, data []byte) {
debug("put %v %d", key, len(data))
if v, ok := mq.Load(key); ok {
putv(key, v, data)
return
}
q := newQueue()
if v, ok := mq.LoadOrStore(key, q); ok {
putv(key, v, data)
} else {
putq(q, data)
}
}
func putv(k, v interface{}, data []byte) {
if q, ok := v.(*queue); ok {
putq(q, data)
} else {
q := newQueue()
mq.Store(k, q)
putq(q, data)
}
}
func newQueue() *queue {
return &queue{lock: make(chan bool, 1), data: make([][]byte, 0), time: time.Now()}
}
func putq(q *queue, data []byte) {
q.lock <- true
q.data = append(q.data, data)
<-q.lock
}
//Get get.
func Get(key string, e interface{}) bool {
if data, ok := get(skey(key)); ok {
err := Decode(data, e)
if err == nil {
return true
}
warn("decode fail %v", err)
}
return false
}
func get(key [16]byte) (data []byte, ok bool) {
if v, o := mq.Load(key); o {
if q, o := v.(*queue); o {
q.lock <- true
if len(q.data) > 0 {
ok = true
data = q.data[0]
q.data = q.data[1:]
}
q.time = time.Now()
<-q.lock
} else {
mq.Delete(key)
}
}
return
}
//Overdue set queue overdue duration.
func Overdue(duration time.Duration) {
queueOverdueDuration = duration
}
func queueOverdue() {
t := time.Now().Add(-queueOverdueDuration)
mq.Range(func(key, value interface{}) bool {
if q, ok := value.(*queue); ok && q.time.Before(t) {
mq.Delete(key)
}
return true
})
}