Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer group deadlock on close: consumer group shutdown hangs #1351

Closed
sitano opened this issue Apr 10, 2019 · 20 comments
Closed

consumer group deadlock on close: consumer group shutdown hangs #1351

sitano opened this issue Apr 10, 2019 · 20 comments
Labels

Comments

@sitano
Copy link

sitano commented Apr 10, 2019

Versions

Sarama Version: d84c59b2a2d87f185d91a1cc426a1f4d4e9365109fe0d96cbd2404c3a57c365a / release v1.22.0
Kafka Version: kafka_2.12-2.1.0.jar
Go Version: go version go1.12.1 linux/amd64

Configuration

What configuration values are you using for Sarama and Kafka?

Kafka: a single topic with only 1 partition and 2 consumers in a single consumer group

Logs
goroutine 64 [chan receive]:
github.com/Shopify/sarama.(*consumerGroup).Consume(0xc000159ea0, 0x14ed720, 0xc0000c4020, 0xc000436ae0, 0x1, 0x1, 0x14e40a0, 0xc0000ba3c0, 0x0, 0x0)
	github.com/Shopify/sarama/consumer_group.go:175 +0x38b
internal/session.(*Service).Start.func1(0xc0000cc230)
created by internal/session.(*Service).Start

goroutine 78 [semacquire]:
sync.runtime_SemacquireMutex(0xc000159ef4, 0x900000000)
	/usr/lib/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000159ef0)
	/usr/lib/go/src/sync/mutex.go:134 +0x1e2
github.com/Shopify/sarama.(*consumerGroup).Close.func1()
	github.com/Shopify/sarama/consumer_group.go:121 +0x94
sync.(*Once).Do(0xc000159f00, 0xc0000dff08)
	/usr/lib/go/src/sync/once.go:44 +0xdf
github.com/Shopify/sarama.(*consumerGroup).Close(0xc000159ea0, 0x0, 0x0)
	github.com/Shopify/sarama/consumer_group.go:118 +0x7a
internal/session.(*Service).Stop(0xc0000cc230)

goroutine 77 [select]:
github.com/Shopify/sarama.(*consumerGroupSession).heartbeatLoop(0xc000428500)
	github.com/Shopify/sarama/consumer_group.go:701 +0x7e0
created by github.com/Shopify/sarama.newConsumerGroupSession
	github.com/Shopify/sarama/consumer_group.go:505 +0x41b

goroutine 76 [select]:
github.com/Shopify/sarama.(*offsetManager).mainLoop(0xc0003fdce0)
	github.com/Shopify/sarama/offset_manager.go:226 +0x1ef
github.com/Shopify/sarama.withRecover(0xc0001da8f0)
	github.com/Shopify/sarama/utils.go:45 +0x51
created by github.com/Shopify/sarama.newOffsetManagerFromClient
	github.com/Shopify/sarama/offset_manager.go:70 +0x3ae
Problem Description

Consumer group call to Close() hangs (or deadlocks) on acquiring a mutex, because consume cannot finish when it is waiting for a session to complete but there were no particions assigned to this consumer instance.

@sitano sitano changed the title consumer group deadlock on close: close hangs consumer group deadlock on close: consumer group shutdown hangs Apr 10, 2019
@burdiyan
Copy link

burdiyan commented Jun 5, 2019

Can you provide code sample to reproduce the issue?

@sitano
Copy link
Author

sitano commented Jun 5, 2019

@burdiyan ok, I will try to provide something reproducible

@sitano
Copy link
Author

sitano commented Jun 5, 2019

@burdiyan https://github.com/sitano/sarama_close_bug

$ ... run Kafka locally with docker with an image like https://hub.docker.com/r/wurstmeister/kafka/
$ ... expose 9092 port to the localhost

$ env GO111MODULE=on go build -race && ./sarama_close_bug -brokers 127.0.0.1:9092

2019/06/05 15:01:47 Starting a new Sarama consumer
2019/06/05 15:01:47 client connected
2019/06/05 15:01:47 new topic with single partition =  5a5fb18d-343e-4bf8-9dea-886729a6b28e
2019/06/05 15:01:47 new group =  196cff9d-c66b-4666-a76a-0d93047500e7
2019/06/05 15:01:47 starting  1
2019/06/05 15:01:47 starting  2
2019/06/05 15:01:47 consume at  1
2019/06/05 15:01:47 consume at  2
2019/06/05 15:01:48 consumer setup 1
2019/06/05 15:01:48 consumer cleanup 1
2019/06/05 15:01:48 consume at  1
2019/06/05 15:01:48 consumer setup 2
2019/06/05 15:01:48 consumer setup 1
2019/06/05 15:01:48 Sarama consumers up and running!...
2019/06/05 15:01:48 trying to close cs... 2
^C

consumer group never finishes...

@byebyebruce
Copy link

Hi,@sitano,have you solved this problem?

@sitano
Copy link
Author

sitano commented Jun 13, 2019

@bailu1901 yes, but with a workaround

@evelritual
Copy link

@sitano Would you mind posting a synopsis of your workaround for others experiencing this issue while a perm-fix is looked into? The inability to close these consumer groups is causing some serious memory-leak headaches.

@sitano
Copy link
Author

sitano commented Jun 24, 2019

@PapayaJuice yes, but the gods of concurrent programming will not praise me. The main idea is to ignore happening data race in a bad case.

done := channel
go func() {
    for not_shutdown {
       cs.Consume()
    }
    cleanup()
    close(done)
}()

CloseConsumerGroup(cs, 10 * time.Second, done)
kafkaConsumeGroupClient.Close()

func CloseConsumerGroup(cs sarama.ConsumerGroup, timeout time.Duration, wait ...<-chan struct{}) {
	done := make(chan struct{})

	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	// if consumer group will try to close shutdown first, it may
	// hang on waiting for a consumer group session to complete
	// when there were no partitions assigned to this node. in that
	// case s.cs.Close() will hang until cluster re-balance will
	// happen or disconnect. if that is a case, the cs Close()
	// invocation will be a data race on closed client, but at least
	// it will finish.
	go func() {
		if err := cs.Close(); err != nil {
			log.Logger().WithError(err).Info("close consumer group")
		}

		close(done)
	}()

	select {
	case <-ctx.Done():
		// consumer did not exit
		log.Logger().WithError(context.DeadlineExceeded).Info("close consumer group")
	case <-done:
		// cs.Close() finished successfully,
		// consume exited and closed `done`
		for _, ch := range wait {
			select {
			case <-ctx.Done():
				// consumer did not exit
				log.Logger().WithError(context.DeadlineExceeded).Info("close consumer group: wait external chan")
				return
			case <-ch:
				// finished
			}
		}
	}
}

The main idea behind that code is that it spawns a goroutine which tries to close a consumer group. If it deadlocks, the client which is owned by the CS will be closed forcibly in a goroutine which called CloseConsumerGroup. In a good case, the close of the CS client has a happens-before relationship set by the waiting in CloseConsumerGroup. In the bad case, it will be closed with a data race. But whatever, the cs.Close() would not block an execution thread and shutdown will continue.

The wait channels in arguments are required for this function to wait for finishing of the goroutine which does the consumer group session loop. When cs.Consume() finishes this goroutine may have a desire to do some cleanup.

@Lywane
Copy link

Lywane commented Aug 12, 2019

@burdiyan I have the same problem. Is there a better solution?

@d1egoaz
Copy link
Contributor

d1egoaz commented Aug 22, 2019

check https://github.com/sitano/sarama_close_bug to reproduce the error

@NeoHuang
Copy link

Hi, do you already have a fix for this? we also suffer this problem. if we have more consumers than partitions. the close will hang forever.

@berupp
Copy link

berupp commented Dec 4, 2019

Hi,

I ran into this exact problem as well. The issue was a bug in my implementation of

type ConsumerGroupHandler interface {
	// Setup is run at the beginning of a new session, before ConsumeClaim.
	Setup(ConsumerGroupSession) error

	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
	// but before the offsets are committed for the very last time.
	Cleanup(ConsumerGroupSession) error

	// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
	// Once the Messages() channel is closed, the Handler must finish its processing
	// loop and exit.
	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

Specifically

        // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
	// Once the Messages() channel is closed, the Handler must finish its processing
	// loop and exit.
	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error

Due to a race condition, my ConsumeClaim function would not finish its processing loop as required by the contract.

As a result, the close also got stuck trying to acquire a lock here: github.com/Shopify/sarama/consumer_group.go:121

// Close implements ConsumerGroup.
func (c *consumerGroup) Close() (err error) {
	c.closeOnce.Do(func() {
		close(c.closed)

		c.lock.Lock()     <- Can't acquire lock

Once I resolved the race condition in my code that locked ConsumeClaim, the issue was resolved

Maybe that helps

@ghost
Copy link

ghost commented Mar 3, 2020

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

@ghost ghost added the stale Issues and pull requests without any recent activity label Mar 3, 2020
@carldunham
Copy link

I'm seeing this as well, I think. The examples that I followed implement ConsumeClaim() like:

for message := range claim.Messages() {
  ...

and there seems to be a race that the closer loses, so the channel never gets closed and everything hangs.

I'm going to try implementing a version that selects on claim.Messages() and ctx.Done() to see if that helps at all.

@ghost ghost removed the stale Issues and pull requests without any recent activity label Mar 4, 2020
@carldunham
Copy link

Although I'm calling client.Close() after cancelling the context. Not sure if that's a supported flow.

@carldunham
Copy link

Never mind. Unrelated deadlock in my message handler. Sorry for the noise.

@joshua0x
Copy link

the root cause is that when the consumer got no claims , the session cant exit , and hold the consumerGroup.lock ,
we can just panic when got no claims ,

@sayan1886
Copy link

sayan1886 commented Mar 24, 2023

we are facing the same issue in sarama@1.38.1 @joshua0x @dnwe while calling consumerGroup.Close() creates a huge leaks in our apps, as we are creating a lot consumerGroup for consuming kafka topics based on the Topics
logs attached from debug/pprof/goroutine?debug=1

1 @ 0x104159d 0x104162a 0x10528dd 0x106f665 0x1081631 0x1081391 0x15d2fb5 0x15cea17 0x1081af8 0x1081985 0x15ce97c 0x1665214 0x1667105 0x16772fc 0x1073c01
#	0x106f664	sync.runtime_SemacquireMutex+0x24					/usr/local/Cellar/go/1.19.5/libexec/src/runtime/sema.go:77
#	0x1081630	sync.(*Mutex).lockSlow+0x270						/usr/local/Cellar/go/1.19.5/libexec/src/sync/mutex.go:171
#	0x1081390	sync.(*Mutex).Lock+0x50							/usr/local/Cellar/go/1.19.5/libexec/src/sync/mutex.go:90
#	0x15d2fb4	github.com/Shopify/sarama.(*consumerGroup).leave+0x54			/Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/consumer_group.go:545
#	0x15cea16	github.com/Shopify/sarama.(*consumerGroup).Close.func1+0x56		/Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/consumer_group.go:159
#	0x1081af7	sync.(*Once).doSlow+0x137						/usr/local/Cellar/go/1.19.5/libexec/src/sync/once.go:74
#	0x1081984	sync.(*Once).Do+0x44							/usr/local/Cellar/go/1.19.5/libexec/src/sync/once.go:65
#	0x15ce97b	github.com/Shopify/sarama.(*consumerGroup).Close+0x7b			/Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/consumer_group.go:155
#	0x1665213	github.ibm.com/BSS/golang-pulsar/pkg/worker.(*consumerGroup).Close+0x73	/Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/worker/worker.go:96
#	0x1667104	github.ibm.com/BSS/golang-pulsar/pkg/worker.(*PulsarWorker).Stop+0x204	/Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/worker/worker.go:347
#	0x16772fb	github.ibm.com/BSS/golang-pulsar/pkg/manager.(*Manager).Stop.func1+0x3b	/Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/manager/manager.go:462

1 @ 0x104159d 0x1051bc9 0x15d6c73 0x1073c01
#	0x15d6c72	github.com/Shopify/sarama.(*consumerGroupSession).heartbeatLoop+0x792	/Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/consumer_group.go:975

1 @ 0x104159d 0x1051bc9 0x1602a96 0x1624e9f 0x1073c01
#	0x1602a95	github.com/Shopify/sarama.(*offsetManager).mainLoop+0x1d5	/Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/offset_manager.go:242
#	0x1624e9e	github.com/Shopify/sarama.withRecover+0x3e			/Users/sayanchatterjee/go/pkg/mod/github.com/!shopify/sarama@v1.38.1/utils.go:43

1 @ 0x104159d 0x1070a05 0x1664be5 0x166d6f5 0x166b0d4 0x1073c01
#	0x1070a04	time.Sleep+0x124								/usr/local/Cellar/go/1.19.5/libexec/src/runtime/time.go:195
#	0x1664be4	github.ibm.com/BSS/exponential-goback.Wait+0xa4					/Users/sayanchatterjee/go/pkg/mod/github.ibm.com/!b!s!s/exponential-goback@v0.0.0-20220708152916-efd285ce2e13/goback.go:117
#	0x166d6f4	github.ibm.com/BSS/golang-pulsar/pkg/worker.(*PulsarWorker).send+0x25d4		/Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/worker/worker.go:891
#	0x166b0d3	github.ibm.com/BSS/golang-pulsar/pkg/worker.(*PulsarWorker).processBatch+0x213	/Users/sayanchatterjee/Documents/codebase/BSS/hyperwarp/golang-pulsar/pkg/worker/worker.go:690

@dnwe
Copy link
Collaborator

dnwe commented Aug 21, 2023

@sitano I took a look at this with the latest sarama and fixed up the example to check channel closure and session done correctly and didn't seem to be able to reproduce the bug

Here's the changes I made:
sitano/sarama_close_bug#1

Can you confirm if this is still a problem for you?

@sitano
Copy link
Author

sitano commented Aug 21, 2023

@dnwe Hi! I am not working on that one at the moment. Don't have time at the moment to reproduce it. If you feel its no longer an issue, feel free to close.

@dnwe
Copy link
Collaborator

dnwe commented Aug 21, 2023

@sitano ok no worries. Thanks I'll close this as believe to have been fixed.

@dnwe dnwe closed this as completed Aug 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests