Skip to content

Commit

Permalink
Protect subscription completed flag with a mutex to avoid closing mul…
Browse files Browse the repository at this point in the history
…tiple times the channel or writing to a closed channel.
  • Loading branch information
laurentluce committed Sep 7, 2016
1 parent e301e7f commit 7d3e955
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
12 changes: 7 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net"
"strconv"
"sync"
"time"

"github.com/go-stomp/stomp/frame"
Expand Down Expand Up @@ -508,11 +509,12 @@ func (c *Conn) Subscribe(destination string, ack AckMode, opts ...func(*frame.Fr
}

sub := &Subscription{
id: id,
destination: destination,
conn: c,
ackMode: ack,
C: make(chan *Message, 16),
id: id,
destination: destination,
conn: c,
ackMode: ack,
C: make(chan *Message, 16),
completedMutex: &sync.Mutex{},
}
go sub.readLoop(ch)

Expand Down
30 changes: 20 additions & 10 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stomp
import (
"fmt"
"log"
"sync"

"github.com/go-stomp/stomp/frame"
)
Expand All @@ -12,12 +13,13 @@ import (
//
// Once a client has subscribed, it can receive messages from the C channel.
type Subscription struct {
C chan *Message
id string
destination string
conn *Conn
ackMode AckMode
completed bool
C chan *Message
id string
destination string
conn *Conn
ackMode AckMode
completed bool
completedMutex *sync.Mutex
}

// BUG(jpj): If the client does not read messages from the Subscription.C
Expand Down Expand Up @@ -65,8 +67,12 @@ func (s *Subscription) Unsubscribe(opts ...func(*frame.Frame) error) error {
}

s.conn.sendFrame(f)
s.completed = true
close(s.C)
s.completedMutex.Lock()
if !s.completed {
s.completed = true
close(s.C)
}
s.completedMutex.Unlock()
return nil
}

Expand Down Expand Up @@ -109,9 +115,11 @@ func (s *Subscription) readLoop(ch chan *frame.Frame) {
Header: f.Header,
Body: f.Body,
}
s.completedMutex.Lock()
if !s.completed {
s.C <- msg
}
s.completedMutex.Unlock()
} else if f.Command == frame.ERROR {
message, _ := f.Header.Contains(frame.Message)
text := fmt.Sprintf("Subscription %s: %s: ERROR message:%s",
Expand All @@ -131,11 +139,13 @@ func (s *Subscription) readLoop(ch chan *frame.Frame) {
Header: f.Header,
Body: f.Body,
}
s.completedMutex.Lock()
if !s.completed {
s.completed = true
s.C <- msg
close(s.C)
}
s.completed = true
close(s.C)
s.completedMutex.Unlock()
return
}
}
Expand Down

0 comments on commit 7d3e955

Please sign in to comment.