forked from go-stomp/stomp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
memory_queue.go
70 lines (57 loc) · 1.43 KB
/
memory_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package queue
import (
"container/list"
"github.com/go-stomp/stomp/frame"
)
// In-memory implementation of the QueueStorage interface.
type MemoryQueueStorage struct {
lists map[string]*list.List
}
func NewMemoryQueueStorage() Storage {
m := &MemoryQueueStorage{lists: make(map[string]*list.List)}
return m
}
func (m *MemoryQueueStorage) Enqueue(queue string, frame *frame.Frame) error {
l, ok := m.lists[queue]
if !ok {
l = list.New()
m.lists[queue] = l
}
l.PushBack(frame)
return nil
}
// Pushes a frame to the head of the queue. Sets
// the "message-id" header of the frame if it is not
// already set.
func (m *MemoryQueueStorage) Requeue(queue string, frame *frame.Frame) error {
l, ok := m.lists[queue]
if !ok {
l = list.New()
m.lists[queue] = l
}
l.PushFront(frame)
return nil
}
// Removes a frame from the head of the queue.
// Returns nil if no frame is available.
func (m *MemoryQueueStorage) Dequeue(queue string) (*frame.Frame, error) {
l, ok := m.lists[queue]
if !ok {
return nil, nil
}
element := l.Front()
if element == nil {
return nil, nil
}
return l.Remove(element).(*frame.Frame), nil
}
// Called at server startup. Allows the queue storage
// to perform any initialization.
func (m *MemoryQueueStorage) Start() {
m.lists = make(map[string]*list.List)
}
// Called prior to server shutdown. Allows the queue storage
// to perform any cleanup.
func (m *MemoryQueueStorage) Stop() {
m.lists = nil
}