-
Notifications
You must be signed in to change notification settings - Fork 3
/
mock.go
104 lines (90 loc) · 2.15 KB
/
mock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package implqueue
import (
"context"
"io"
"strconv"
"sync"
"github.com/arquivei/foundationkit/errors"
"github.com/arquivei/goduck"
)
type MockQueue struct {
items []goduck.RawMessage
consumedItems []bool
currentIdx int
mtx *sync.Mutex
}
func NewMock(items [][]byte) *MockQueue {
consumedItems := make([]bool, len(items))
messages := make([]goduck.RawMessage, len(items))
for i := 0; i < len(items); i++ {
consumedItems[i] = false
messages[i] = &mockRawMessage{
data: items[i],
idx: i,
}
}
return &MockQueue{
items: messages,
consumedItems: consumedItems,
currentIdx: 0,
mtx: &sync.Mutex{},
}
}
func NewDefaultQueue(nElems int) *MockQueue {
messages := make([][]byte, nElems)
for i := 0; i < nElems; i++ {
messages[i] = []byte(strconv.Itoa(i))
}
return NewMock(messages)
}
func (m *MockQueue) Next(ctx context.Context) (goduck.RawMessage, error) {
const op = errors.Op("MockQueue.Pool")
m.mtx.Lock()
defer m.mtx.Unlock()
nElems := len(m.consumedItems)
for tries := 0; tries < nElems; tries++ {
if !m.consumedItems[m.currentIdx] {
msg := m.items[m.currentIdx]
m.currentIdx = (m.currentIdx + 1) % nElems
return msg, nil
}
m.currentIdx = (m.currentIdx + 1) % nElems
}
return nil, io.EOF
}
func (m MockQueue) Done(ctx context.Context, msg goduck.RawMessage) error {
const op = errors.Op("MockQueue.Done")
m.mtx.Lock()
defer m.mtx.Unlock()
rawMsg, ok := msg.(*mockRawMessage)
if !ok {
return errors.E(op, "type cast error")
}
m.consumedItems[rawMsg.idx] = true
return nil
}
func (m MockQueue) Failed(ctx context.Context, msg goduck.RawMessage) error {
const op = errors.Op("MockQueue.Failed")
m.mtx.Lock()
defer m.mtx.Unlock()
rawMsg, ok := msg.(*mockRawMessage)
if !ok {
return errors.E(op, "type cast error")
}
if m.consumedItems[rawMsg.idx] {
return errors.E(op, "message status was 'Done'")
}
return nil
}
func (m MockQueue) Close() error {
return nil
}
// IsEmpty tests if all elements in the queue are consumed
func (m MockQueue) IsEmpty() bool {
for i := 0; i < len(m.consumedItems); i++ {
if !m.consumedItems[i] {
return false
}
}
return true
}