Skip to content

Commit

Permalink
[fix][client]Fix MaxQueueSize semaphore release leak in createOpSendM…
Browse files Browse the repository at this point in the history
…sg (#16915)

(cherry picked from commit d95f6cf)
  • Loading branch information
Nicklee007 authored and BewareMyPower committed Aug 5, 2022
1 parent a501593 commit 59339c4
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Expand Up @@ -66,7 +66,7 @@ public void testProducerSemaphoreInvalidMessage() throws Exception {
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerSemaphoreAcquire")
.maxPendingMessages(pendingQueueSize)
.enableBatching(false)
.enableBatching(true)
.create();

this.stopBroker();
Expand All @@ -79,6 +79,17 @@ public void testProducerSemaphoreInvalidMessage() throws Exception {
} catch (PulsarClientException.InvalidMessageException ex) {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
}

producer.conf.setBatchingEnabled(false);
try {
try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) {
mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
}
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.InvalidMessageException ex) {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
}
}

@Test(timeOut = 30000)
Expand Down
Expand Up @@ -200,6 +200,7 @@ public boolean isMultiBatches() {
public OpSendMsg createOpSendMsg() throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
producer.semaphoreRelease(messages.size());
messages.forEach(msg -> producer.client.getMemoryLimitController()
.releaseMemory(msg.getUncompressedSize()));
discard(new PulsarClientException.InvalidMessageException(
Expand Down

0 comments on commit 59339c4

Please sign in to comment.