Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Message is blocked on the AckGroupingTracker.isDuplicate method. #986

Merged
merged 1 commit into from
Mar 8, 2023

Conversation

shibd
Copy link
Member

@shibd shibd commented Mar 8, 2023

Master Issue: #979

Motivation

The root cause is internalReceivedCommand will be blocked by timedAckGroupingTracker.isDuplicate.

case cmd := <-c.incomingCmdCh:
c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)

1 @ 0x104ff6204 0x104fc232c 0x104fc1ed8 0x1054894dc 0x105497934 0x10541fa60 0x10541e430 0x10541d3bc 0x10541c114 0x105027874
# 0x1054894db github.com/apache/pulsar-client-go/pulsar.(*timedAckGroupingTracker).isDuplicate+0x3b  /Users/shibaodi/GolandProjects/pulsar-client-go/pulsar/ack_grouping_tracker.go:281
# 0x105497933 github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).MessageReceived+0xd63  /Users/shibaodi/GolandProjects/pulsar-client-go/pulsar/consumer_partition.go:1007
# 0x10541fa5f github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleMessage+0xdf  /Users/shibaodi/GolandProjects/pulsar-client-go/pulsar/internal/connection.go:757
# 0x10541e42f github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand+0x28f /Users/shibaodi/GolandProjects/pulsar-client-go/pulsar/internal/connection.go:588
# 0x10541d3bb github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run+0x30b   /Users/shibaodi/GolandProjects/pulsar-client-go/pulsar/internal/connection.go:423
# 0x10541c113 github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1+0x53  /Users/shibaodi/GolandProjects/pulsar-client-go/pulsar/internal/connection.go:233

Then, the subsequent commands under this connection cannot be processed, resulting in a timeout.

Failed logs

time="2023-03-02T06:34:19Z" level=info msg="Closing consumer=2" consumerID=2 name=rtvmw subscription=sub-2 topic="persistent://public/default/my-topic-70957754"
time="2023-03-02T06:34:49Z" level=warning msg="Failed to close consumer" consumerID=2 error="request timed out" name=rtvmw subscription=sub-2 topic="persistent://public/default/my-topic-70957754"
time="2023-03-02T06:34:49Z" level=info msg="close consumer, exit reconnect" consumerID=2 name=rtvmw subscription=sub-2 topic="persistent://public/default/my-topic-70957754"
    consumer_test.go:1026: 
        	Error Trace:	/pulsar/pulsar-client-go/pulsar/consumer_test.go:1026
        	Error:      	Expected nil, but got: &errors.errorString{s:"request timed out"}
        	Test:       	TestConsumerBatchCumulativeAck

#957 introduced the AckGroupingTracker. If a consumer calls close, it will closed AckGroupingTracker, This exits the loop.

if ackFlushType == flushAndClose {
return
}

This will course in never getting duplicateResultCh messages and be blocked here.

return <-t.duplicateResultCh

In this test, when called c1.close() or c2.close(), there may be message cmd pushed over and eventually blocked by the isDuplicate method.

for i := 0; i < 2*N; i++ {
msg, err := c1.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
if i == N-1 {
// cumulative ack the first half of messages
c1.AckCumulative(msg)
} else if i == N {
// the N+1 msg is in the second batch
// cumulative ack it to test if the first batch can be acked
c2.AckCumulative(msg)
}
}
c1.Close()
c2.Close()

Modifications

  • Use sync.mutex instead of select channel to refactor AckGroupingTracker.

Verifying this change

  • Add TestDuplicateAfterClose to cover it.

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@shibd shibd added the type/bug label Mar 8, 2023
@shibd shibd added this to the v0.10.0 milestone Mar 8, 2023
@shibd shibd marked this pull request as draft March 8, 2023 06:00
@shibd shibd marked this pull request as ready for review March 8, 2023 06:14
@shibd shibd marked this pull request as draft March 8, 2023 07:01
@shibd shibd marked this pull request as ready for review March 8, 2023 13:38
@BewareMyPower
Copy link
Contributor

Merge this patch first, I have some more improvements based on this patch.

To prove the lock is not the critical path, here is the flamegraph:

image

image

The cost of lock is near the cost of the map insertion in isDuplicate, which is only 1.78% of the connection goroutine.

image

Regarding add, the cost of lock takes 10% time.

@BewareMyPower BewareMyPower merged commit 5277f3f into apache:master Mar 8, 2023
@shibd shibd changed the title Fix ack grouping tracker is duplicate method will be blocked. [Fix] Message is blocked on the AckGroupingTracker.isDuplicate method. Apr 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants