forked from go-stomp/stomp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscription_list.go
107 lines (97 loc) · 2.96 KB
/
subscription_list.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package client
import (
"container/list"
)
// Maintains a list of subscriptions. Not thread-safe.
type SubscriptionList struct {
// TODO: implement linked list locally, adding next and prev
// pointers to the Subscription struct itself.
subs *list.List
}
func NewSubscriptionList() *SubscriptionList {
return &SubscriptionList{list.New()}
}
// Add a subscription to the back of the list. Will panic if
// the subscription destination does not match the subscription
// list destination. Will also panic if the subscription has already
// been added to a subscription list.
func (sl *SubscriptionList) Add(sub *Subscription) {
if sub.subList != nil {
panic("subscription is already in a subscription list")
}
sl.subs.PushBack(sub)
sub.subList = sl
}
// Gets the first subscription in the list, or nil if there
// are no subscriptions available. The subscription is removed
// from the list.
func (sl *SubscriptionList) Get() *Subscription {
if sl.subs.Len() == 0 {
return nil
}
front := sl.subs.Front()
sub := front.Value.(*Subscription)
sl.subs.Remove(front)
sub.subList = nil
return sub
}
// Removes the subscription from the list.
func (sl *SubscriptionList) Remove(s *Subscription) {
for e := sl.subs.Front(); e != nil; e = e.Next() {
if e.Value.(*Subscription) == s {
sl.subs.Remove(e)
s.subList = nil
return
}
}
}
// Search for a subscription with the specified id and remove it.
// Returns a pointer to the subscription if found, nil otherwise.
func (sl *SubscriptionList) FindByIdAndRemove(id string) *Subscription {
for e := sl.subs.Front(); e != nil; e = e.Next() {
sub := e.Value.(*Subscription)
if sub.id == id {
sl.subs.Remove(e)
return sub
}
}
return nil
}
// Finds all subscriptions in the subscription list that are acked by the
// specified message-id (or ack) header. The subscription is removed from
// the list and the callback function called for that subscription.
func (sl *SubscriptionList) Ack(msgId uint64, callback func(s *Subscription)) {
for e := sl.subs.Front(); e != nil; {
next := e.Next()
sub := e.Value.(*Subscription)
if sub.IsAckedBy(msgId) {
sl.subs.Remove(e)
callback(sub)
}
e = next
}
}
// Finds all subscriptions in the subscription list that are *nacked* by the
// specified message-id (or ack) header. The subscription is removed from
// the list and the callback function called for that subscription. Current
// understanding that all NACKs are individual, but not sure
func (sl *SubscriptionList) Nack(msgId uint64, callback func(s *Subscription)) {
for e := sl.subs.Front(); e != nil; {
next := e.Next()
sub := e.Value.(*Subscription)
if sub.IsNackedBy(msgId) {
sl.subs.Remove(e)
callback(sub)
}
e = next
}
}
// Invoke a callback function for every subscription in the list.
func (sl *SubscriptionList) ForEach(callback func(s *Subscription, isLast bool)) {
for e := sl.subs.Front(); e != nil; {
next := e.Next()
sub := e.Value.(*Subscription)
callback(sub, next == nil)
e = next
}
}