Permalink
Browse files

Deliver messages while holding registry lock

  • Loading branch information...
Alex Suraci and Pieter Noordhuis
Alex Suraci and Pieter Noordhuis committed Mar 28, 2013
1 parent 50c0cb1 commit 8267bd7b801e7a3ff06c626cac75bf38929495fe
Showing with 11 additions and 6 deletions.
  1. +11 −6 client.go
View
@@ -115,11 +115,10 @@ func (s *Subscription) unsubscribe() {
func (s *Subscription) deliver(m *readMessage) {
s.received++
s.Inbox <- m
+}
- // Unsubscribe if the maximum number of messages has been received
- if s.maximum > 0 && s.received >= s.maximum {
- s.Unsubscribe()
- }
+func (s *Subscription) isDone() bool {
+ return s.maximum > 0 && s.received >= s.maximum
}
type subscriptionRegistry struct {
@@ -202,11 +201,17 @@ func (sr *subscriptionRegistry) Deliver(m *readMessage) {
var ok bool
sr.Lock()
- s, ok = sr.m[m.SubscriptionId]
- sr.Unlock()
+ defer sr.Unlock()
+ s, ok = sr.m[m.SubscriptionId]
if ok {
s.deliver(m)
+
+ // Unsubscribe if the maximum number of messages has been received
+ if s.isDone() {
+ delete(sr.m, s.sid)
+ s.unsubscribe()
+ }
}
}

0 comments on commit 8267bd7

Please sign in to comment.