-
Notifications
You must be signed in to change notification settings - Fork 0
/
message_test.go
144 lines (118 loc) · 3.57 KB
/
message_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package gopushserver
import (
"bytes"
"fmt"
"testing"
"time"
)
const (
TEST_DATA_PATTERN = "_DaTa_%d"
)
func newDataN(n int) []byte {
return []byte(fmt.Sprintf(TEST_DATA_PATTERN, n))
}
func isDataN(src []byte, n int) bool {
dataN := newDataN(n)
return (bytes.Compare(dataN, src) == 0)
}
func TestMessageQueueBasic(t *testing.T) {
q := NewQueue(7, time.Minute, time.Minute)
q.Enqueue(newDataN(0))
q.Enqueue(newDataN(1))
q.Enqueue(newDataN(2))
q.Enqueue(newDataN(3))
isDataN(q.Dequeue(), 0)
isDataN(q.Dequeue(), 1)
isDataN(q.Dequeue(), 2)
isDataN(q.Dequeue(), 3)
if q.Dequeue() != nil {
t.Fatalf("Expect nil.")
}
}
// func TestMessageQueue(t *testing.T) {
// var (
// QUEUE_SIZE int = 7
// TIME_DELTA time.Duration = time.Millisecond * 100
// MESSAGE_EXPIRES time.Duration = time.Second
// FISRT_ENQUEUE_AMOUNT int = 3
// FISRT_SLEEP_TIME time.Duration = time.Second / 2
// // SECOND_ENQUEUE_AMOUNT int = QUEUE_SIZE - FISRT_ENQUEUE_AMOUNT
// THIRD_ENQUEUE_AMOUNT int = 5
// RECYCLE_TIME time.Duration = time.Second * 5
// )
// q := NewQueue(QUEUE_SIZE, MESSAGE_EXPIRES, RECYCLE_TIME)
// for i := 0; i < FISRT_ENQUEUE_AMOUNT; i++ {
// q.Enqueue(newDataN(i))
// }
// time.Sleep(FISRT_SLEEP_TIME)
// for i := FISRT_ENQUEUE_AMOUNT; i < QUEUE_SIZE; i++ {
// q.Enqueue(newDataN(i))
// }
// // enqueue when overflow, the first item is removed.
// q.Enqueue(newDataN(QUEUE_SIZE))
// // all data are stored as expected
// for i, m := range q.Messages {
// // the first item is newly enqueued
// if i == 0 {
// i = QUEUE_SIZE
// }
// if !isDataN(m.Body, i) {
// t.Fatalf("unexpected data %s, %d\n", string(m.Body), i)
// }
// }
// // deque some data from the first part.
// for i := 0; i < FISRT_ENQUEUE_AMOUNT-1; i++ {
// data := q.Dequeue()
// if !isDataN(data, i+1) {
// t.Fatalf("unexpected data %s, %d\n", string(data), i)
// }
// }
// // wait for message of first part expired
// time.Sleep(MESSAGE_EXPIRES - TIME_DELTA)
// // now all message in the first part are expired,
// // dequeue some message from second part to verify it.
// for i := FISRT_ENQUEUE_AMOUNT; i < QUEUE_SIZE-1; i++ {
// data := q.Dequeue()
// if !isDataN(data, i) {
// t.Fatalf("unexpected data %s, %d\n", string(data), i)
// }
// }
// // verify current pointer and queue length
// if q.Current != QUEUE_SIZE-1 {
// t.Fatalf("unexpected current pointer %d", q.Current)
// }
// // second part left 1, and the overflow 1
// if q.Length != 2 {
// t.Fatalf("unexpected queue length %d", q.Length)
// }
// // enqueue third round of data
// for i := QUEUE_SIZE; i < QUEUE_SIZE+THIRD_ENQUEUE_AMOUNT; i++ {
// q.Enqueue(newDataN(i + 1))
// }
// // verify current pointer and queue length
// if q.Current != QUEUE_SIZE-1 {
// t.Fatalf("unexpected current pointer %d", q.Current)
// }
// if q.Length != 2+THIRD_ENQUEUE_AMOUNT {
// t.Fatalf("unexpected queue length %d", q.Length)
// }
// // dequeue and leave only 2 items
// for i := q.Current; q.Length > 2; i++ {
// data := q.Dequeue()
// if !isDataN(data, i) {
// t.Fatalf("unexpected data %s, %d\n", string(data), i)
// }
// }
// if q.Length != 2 {
// t.Fatalf("unexpected queue length %d", q.Length)
// }
// // wait for all item expire and recycle
// time.Sleep(RECYCLE_TIME)
// // verify current pointer and queue length
// if q.Current != 0 {
// t.Fatalf("unexpected current pointer %d", q.Current)
// }
// if q.Length != 0 {
// t.Fatalf("unexpected queue length %d", q.Length)
// }
// }