Skip to content

Commit

Permalink
fix panic on already subscribed error (#390)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jun 19, 2024
1 parent f41c0bf commit 4023c34
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1596,11 +1596,12 @@ func (c *Client) handleRefresh(req *protocol.RefreshRequest, cmd *protocol.Comma
func (c *Client) onSubscribeError(channel string) {
c.mu.Lock()
chCtx, ok := c.channels[channel]
subscribingCh := chCtx.subscribingCh
delete(c.channels, channel)
c.mu.Unlock()
if ok {
_ = c.node.removeSubscription(channel, c)
if chCtx.subscribingCh != nil {
if subscribingCh != nil {
close(chCtx.subscribingCh)
}
}
Expand Down Expand Up @@ -2995,13 +2996,21 @@ func (c *Client) subscribeCmd(req *protocol.SubscribeRequest, reply SubscribeRep
if !serverSide {
// In case of server-side sub this will be done later by the caller.
c.mu.Lock()
if chCtx, ok := c.channels[channel]; ok {
subscribedCh := chCtx.subscribingCh
defer func() { close(subscribedCh) }()
channelContext.subscribingCh = subscribedCh
if chCtx, ok := c.channels[channel]; ok { // Move subscribingCh from existing channel context to the new one.
channelContext.subscribingCh = chCtx.subscribingCh
}
c.channels[channel] = channelContext
c.mu.Unlock()
defer func() {
c.mu.Lock()
defer c.mu.Unlock()
chCtx, ok := c.channels[channel]
if ok && chCtx.subscribingCh != nil {
close(chCtx.subscribingCh)
chCtx.subscribingCh = nil
c.channels[channel] = chCtx
}
}()
// Stop syncing recovery and PUB/SUB.
// In case of server side subscription we will do this later.
c.pubSubSync.StopBuffering(channel)
Expand Down

0 comments on commit 4023c34

Please sign in to comment.