Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix negative WaitGroup counter issue #712

Merged
merged 3 commits into from
Jan 20, 2022

Conversation

wolfstudy
Copy link
Member

Signed-off-by: xiaolongran xiaolongran@tencent.com

Fixes #711

Motivation

As #711 desc, when the user calls Send(), we introduced waitgroup to ensure that the messageID is returned synchronously, but in internalSendAsync(), the callback may be triggered multiple times, causing the issue of #711

Modifications

  • Replace waitgroup with doneCh, consistent with the implementation on the consumer side to facilitate subsequent code maintenance
  • Introduce an atomic variable isDone to ensure that close(channel) will only be triggered once

Signed-off-by: xiaolongran <xiaolongran@tencent.com>
@wolfstudy wolfstudy self-assigned this Jan 18, 2022
@wolfstudy wolfstudy added this to the v0.8.0 milestone Jan 18, 2022
var err error
var msgID MessageID

// use atomic bool to avoid race
isDone := uAtomic.NewBool(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is isDone needed? Each call to this function will get it's own channel instance.

Copy link
Member Author

@wolfstudy wolfstudy Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as #711 desc, the callback of send is triggered multiple times, so the error as follows:

panic: sync: negative WaitGroup counter
goroutine 2204 [running]:
sync.(*WaitGroup).Add(0x18dbfc0, 0xc0020d8d40)
sync/waitgroup.go:74 +0x105
sync.(*WaitGroup).Done(...)
sync/waitgroup.go:99
github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).Send.func1({0x1965558, 0xc01a1ab1d0}, 0xc01a841160, {0x0, 0x0})
github.com/apache/pulsar-client-go@v0.7.0/pulsar/producer_partition.go:722 +0x7e
github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt(0xc001017b00, 0xc017280480)
github.com/apache/pulsar-client-go@v0.7.0/pulsar/producer_partition.go:828 +0x7ef
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt(0xc001188420, 0xc017280480)
github.com/apache/pulsar-client-go@v0.7.0/pulsar/internal/connection.go:673 +0xe8
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc001188420, 0xc006b31680, {0x0, 0x0})
github.com/apache/pulsar-client-go@v0.7.0/pulsar/internal/connection.go:558 +0x14a
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc001188420)
github.com/apache/pulsar-client-go@v0.7.0/pulsar/internal/connection.go:415 +0x3ba
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1()
github.com/apache/pulsar-client-go@v0.7.0/pulsar/internal/connection.go:227 +0x65
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
github.com/apache/pulsar-client-go@v0.7.0/pulsar/internal/connection.go:223 +0x75

So in the callback of send, we introduce the atomic variable of isDone to ensure that at any time, for a send request, its callback will only be called once

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this callback is still callback more than once, isDone is only check callback is called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, use CAS to avoid this

Signed-off-by: xiaolongran <xiaolongran@tencent.com>
@billowqiu
Copy link
Contributor

	var err error
	var msgID MessageID
	var callbackCalledCount int
	var arr [5]error
	p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
		err = e
		msgID = ID
		arr[callbackCalledCount] = e
		callbackCalledCount += 1
		if callbackCalledCount > 1 {
			p.log.Infof("callback more than once for msgid %v, err: %v, count %d", ID, arr, callbackCalledCount)
		}
		wg.Done()
	}, true)

time="2022-01-19T21:24:14+08:00" level=info msg="callback more than once for msgid , err: [ message send timeout: TimeoutError ], count 2" producerID=237 producer_name=xxxxx"
time="2022-01-19T21:23:09+08:00" level=info msg="callback more than once for msgid 684097:3722889:0, err: [message send timeout: TimeoutError ], count 2" producerID=1 producer_name=cmq_11.149.254.205_64ee3ab1-792a-11ec-b623-5254001f63ee topic="xxxx"

@wolfstudy wolfstudy changed the title Fix negative WaitGroup counter issue [WIP]Fix negative WaitGroup counter issue Jan 19, 2022
@wolfstudy wolfstudy changed the title [WIP]Fix negative WaitGroup counter issue Fix negative WaitGroup counter issue Jan 20, 2022
Signed-off-by: xiaolongran <xiaolongran@tencent.com>
@wolfstudy wolfstudy merged commit 90305e8 into apache:master Jan 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Send func panic
4 participants