Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unsubscribe blocked when consumer is closing or has closed #457

Merged
merged 2 commits into from Feb 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 25 additions & 6 deletions pulsar/consumer_partition.go
Expand Up @@ -47,6 +47,21 @@ const (
consumerClosed
)

func (s consumerState) String() string {
switch s {
case consumerInit:
return "Initializing"
case consumerReady:
return "Ready"
case consumerClosing:
return "Closing"
case consumerClosed:
return "Closed"
default:
return "Unknown"
}
}

type subscriptionMode int

const (
Expand Down Expand Up @@ -195,6 +210,11 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
}

func (pc *partitionConsumer) Unsubscribe() error {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
return nil
}

req := &unsubscribeRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req

Expand All @@ -206,9 +226,8 @@ func (pc *partitionConsumer) Unsubscribe() error {
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)

state := pc.getConsumerState()
if state == consumerClosed || state == consumerClosing {
pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
return
}

Expand Down Expand Up @@ -354,7 +373,7 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
func (pc *partitionConsumer) requestSeek(msgID messageID) error {
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
pc.log.Error("Consumer was already closed")
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
return nil
}

Expand Down Expand Up @@ -398,7 +417,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {

state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
pc.log.Error("Consumer was already closed")
pc.log.WithField("state", pc.state).Error("Consumer is closing or has closed")
return
}

Expand Down Expand Up @@ -798,7 +817,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}

if state == consumerClosed || state == consumerClosing {
pc.log.Error("The consumer is closing or has been closed")
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
Expand Down