Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix producer thread block forever on memory limit controller #21790

Merged

Conversation

wenbingshen
Copy link
Member

Motivation

When the modification of the current PR is rolled back, the following unit test will fail, and the main producer thread will be stuck forever in org.apache.pulsar.client.impl.MemoryLimitController#reserveMemory condition.await();

 @Test(timeOut = 10_000)
    public void testProducerBlockReserveMemory() throws Exception {
        replacePulsarClient(PulsarClient.builder().
                serviceUrl(lookupUrl.toString())
                .memoryLimit(1, SizeUnit.KILO_BYTES));
        @Cleanup
        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
                .topic("testProducerMemoryLimit")
                .sendTimeout(5, TimeUnit.SECONDS)
                .compressionType(CompressionType.SNAPPY)
                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                .maxPendingMessages(0)
                .blockIfQueueFull(true)
                .enableBatching(true)
                .batchingMaxMessages(100)
                .batchingMaxBytes(65536)
                .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
                .create();
        int msgCount = 5;
        CountDownLatch cdl = new CountDownLatch(msgCount);
        for (int i = 0; i < msgCount; i++) {
            producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8)).whenComplete(((messageId, throwable) -> {
                cdl.countDown();
            }));
        }

        cdl.await();

        producer.close();
        PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
        final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
        Assert.assertEquals(memoryLimitController.currentUsage(), 0);
    }

According to my investigation this is related to this PR #17936

  1. When the producer turns on the following parameters and uses asynchronous sending:
    .compressionType(CompressionType.SNAPPY)
    .blockIfQueueFull(true)
    .enableBatching(true)
    Set .memoryLimit(1, SizeUnit.KILO_BYTES)); small enough to make it easier to reproduce the problem.

  2. BatchMessageContainer will apply for a batchAllocatedSize from MemoryLimitController when building BatchMsgMetadata for the first time. At this time, MemoryLimitController.currentUsage=(msg payload size + batchedMessageMetadataAndPayload size),

  3. The main production thread continues to send data asynchronously, but at this time the MemoryLimitController has reached the memoryLimit limit, and the main thread will be stuck in condition.await(); waiting to wake up;

  4. When the batch message compression is completed, the BatchMessageContainer will call updateAndReserveBatchAllocatedSize again. At this time, the memory will be released to the MemoryLimitController to the actual batch message size, but it will not wake up those threads that are blocked and waiting due to insufficient memory.

  5. After the batch message is sent, call org.apache.pulsar.client.impl.MemoryLimitController#releaseMemory to release the batch size memory. However, since the size released by MemoryLimitController.currentUsage + is smaller than memoryLimit, no threads are awakened.

  6. The main production thread is always stuck in condition.await();

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 22, 2023
@wenbingshen
Copy link
Member Author

These are some logs I printed during debugging. I can clearly see that forceReserveMemory applied for 1024 bytes once and applied for -969 bytes the second time.
image

@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@lhotari
Copy link
Member

lhotari commented Dec 22, 2023

Great catch @wenbingshen ! this has been a long outstanding issue.

Copy link
Contributor

@AnonHxy AnonHxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Thanks

@wenbingshen
Copy link
Member Author

/pulsarbot rerun-failure-checks

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (8beac8b) 73.42% compared to head (1359217) 73.58%.
Report is 5 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #21790      +/-   ##
============================================
+ Coverage     73.42%   73.58%   +0.15%     
+ Complexity    32795    32247     -548     
============================================
  Files          1897     1858      -39     
  Lines        140656   138021    -2635     
  Branches      15491    15111     -380     
============================================
- Hits         103282   101561    -1721     
+ Misses        29297    28607     -690     
+ Partials       8077     7853     -224     
Flag Coverage Δ
inttests 24.18% <25.00%> (-0.07%) ⬇️
systests 23.74% <50.00%> (-1.11%) ⬇️
unittests 72.86% <100.00%> (+0.15%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
.../pulsar/client/impl/BatchMessageContainerImpl.java 80.68% <100.00%> (+0.22%) ⬆️

... and 127 files with indirect coverage changes

Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe modify the method tryReserveMemory and forceReserveMemory like below is better?

public void forceReserveMemory(long size) {
    if (size < 0) {
        releaseMemory(-size);
        return;
    }
    // do original implementation
    ....
}

public void tryReserveMemory(long size) {
    if (size < 0) {
        releaseMemory(-size);
        return;
    }
    // do original implementation
    ....
}

@lhotari
Copy link
Member

lhotari commented Dec 26, 2023

Maybe modify the method tryReserveMemory and forceReserveMemory like below is better?

I think it's not a good idea. It's better to add validation and throw IllegalArgumentException if the value is negative

If there would have been validation, the API would have never been misused and the issue wouldn't have happened in the past.

@poorbarcode
Copy link
Contributor

I think it's not a good idea. It's better to add validation and throw IllegalArgumentException if the value is negative

Sure

@Technoboy-
Copy link
Contributor

Maybe modify the method tryReserveMemory and forceReserveMemory like below is better?

I think it's not a good idea. It's better to add validation and throw IllegalArgumentException if the value is negative

If there would have been validation, the API would have never been misused and the issue wouldn't have happened in the past.

yes, agree

@wenbingshen
Copy link
Member Author

Maybe modify the method tryReserveMemory and forceReserveMemory like below is better?

I think it's not a good idea. It's better to add validation and throw IllegalArgumentException if the value is negative

If there would have been validation, the API would have never been misused and the issue wouldn't have happened in the past.

@lhotari Good idea! I think we can add validation that the value is negative and throw an IllegalArgumentException by submitting another PR.

Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari Good idea! I think we can add validation that the value is negative and throw an IllegalArgumentException by submitting another PR.

At least, we should add a WARN log when calling tryReserveMemory/forceReserveMemory with a negative value

@wenbingshen
Copy link
Member Author

wenbingshen commented Dec 26, 2023

@lhotari Good idea! I think we can add validation that the value is negative and throw an IllegalArgumentException by submitting another PR.

At least, we should add a WARN log when calling tryReserveMemory/forceReserveMemory with a negative value

@poorbarcode I will add an assert after later.

@lhotari lhotari dismissed poorbarcode’s stale review December 26, 2023 11:10

assert in separate follow up PR

@lhotari lhotari merged commit 99d06b9 into apache:master Dec 26, 2023
48 checks passed
@lhotari lhotari modified the milestones: 3.3.0, 3.2.0 Dec 26, 2023
@wenbingshen wenbingshen deleted the wenbing/fix_producer_thread_block_forever branch December 26, 2023 11:41
@poorbarcode
Copy link
Contributor

@lhotari @wenbingshen @Technoboy-

assert in separate follow up PR

I never see the following PR. To prevent new PRs from incorrectly using tryReserveMemory, reserveMemory, forceReserveMemory leads to the same issue, I create a new PR to add this validation. Please take a look

lhotari pushed a commit that referenced this pull request Dec 26, 2023
lhotari pushed a commit that referenced this pull request Dec 26, 2023
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Jan 4, 2024
…oller (apache#21790)

(cherry picked from commit 99d06b9)
(cherry picked from commit 04ed338)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jan 8, 2024
…oller (apache#21790)

(cherry picked from commit 99d06b9)
(cherry picked from commit 04ed338)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants