Skip to content

Commit

Permalink
fix: potential deadlock in Subscriber (#584)
Browse files Browse the repository at this point in the history
* fix: potential deadlock in Subscriber

* another try

* new try

* handle more cases
  • Loading branch information
dunglas authored Nov 14, 2021
1 parent 6c1907a commit df815c9
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 16 deletions.
2 changes: 1 addition & 1 deletion bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestBoltTransportClosed(t *testing.T) {

assert.Equal(t, transport.Dispatch(&Update{Topics: s.Topics}), ErrClosedTransport)

_, ok := <-s.disconnected
_, ok := <-s.out
assert.False(t, ok)
}

Expand Down
2 changes: 0 additions & 2 deletions local_transport_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func subBenchLocalTransport(b *testing.B, topics, concurrency, matchPct int, tes
}
}
out := make(chan *Update, 50000)
once := &sync.Once{}
for i := 0; i < concurrency; i++ {
s := NewSubscriber("", zap.NewNop())
if i%100 < matchPct {
Expand All @@ -48,7 +47,6 @@ func subBenchLocalTransport(b *testing.B, topics, concurrency, matchPct int, tes
s.SetTopics(tsNoMatch, nil)
}
s.out = out
s.disconnectedOnce = once
tr.AddSubscriber(s)
}
ctx, done := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestUnsubscribe(t *testing.T) {
hub.SubscribeHandler(httptest.NewRecorder(), req)
assert.Equal(t, 0, s.subscribers.Len())
s.subscribers.Walk(0, func(s *Subscriber) bool {
_, ok := <-s.disconnected
_, ok := <-s.out
assert.False(t, ok)

return true
Expand Down
32 changes: 21 additions & 11 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ type Subscriber struct {
PrivateRegexps []*regexp.Regexp
Debug bool

disconnectedOnce *sync.Once
disconnected int32
out chan *Update
disconnected chan struct{}
outMutex sync.RWMutex
responseLastEventID chan string
logger Logger
ready int32
Expand All @@ -45,20 +45,16 @@ func NewSubscriber(lastEventID string, logger Logger) *Subscriber {
RequestLastEventID: lastEventID,
responseLastEventID: make(chan string, 1),
out: make(chan *Update, 1000),
disconnected: make(chan struct{}),
logger: logger,
disconnectedOnce: &sync.Once{},
}

return s
}

// Dispatch an update to the subscriber.
func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool {
select {
case <-s.disconnected:
if atomic.LoadInt32(&s.disconnected) > 0 {
return false
default:
}

if !fromHistory && atomic.LoadInt32(&s.ready) < 1 {
Expand All @@ -71,6 +67,13 @@ func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool {
}
s.liveMutex.Unlock()
}

s.outMutex.Lock()
defer s.outMutex.Unlock()
if atomic.LoadInt32(&s.disconnected) > 0 {
return false
}

s.out <- u

return true
Expand All @@ -80,6 +83,8 @@ func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool {
func (s *Subscriber) Ready() int {
s.liveMutex.Lock()
defer s.liveMutex.Unlock()
s.outMutex.Lock()
defer s.outMutex.Unlock()

n := len(s.liveQueue)
for _, u := range s.liveQueue {
Expand All @@ -102,10 +107,15 @@ func (s *Subscriber) HistoryDispatched(responseLastEventID string) {

// Disconnect disconnects the subscriber.
func (s *Subscriber) Disconnect() {
s.disconnectedOnce.Do(func() {
close(s.disconnected)
close(s.out)
})
if atomic.LoadInt32(&s.disconnected) > 0 {
return
}

s.outMutex.Lock()
defer s.outMutex.Unlock()

atomic.StoreInt32(&s.disconnected, 1)
close(s.out)
}

// SetTopics compiles topic selector regexps.
Expand Down
2 changes: 1 addition & 1 deletion transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestLocalTransportClosed(t *testing.T) {
assert.Equal(t, transport.AddSubscriber(NewSubscriber("", zap.NewNop())), ErrClosedTransport)
assert.Equal(t, transport.Dispatch(&Update{}), ErrClosedTransport)

_, ok := <-s.disconnected
_, ok := <-s.out
assert.False(t, ok)
}

Expand Down

0 comments on commit df815c9

Please sign in to comment.