Skip to content

Commit

Permalink
Fix for unblocking correct amount of messages after receving acknowle…
Browse files Browse the repository at this point in the history
…dge for a batch
  • Loading branch information
Lanayx committed Jul 23, 2023
1 parent 4b6cfe7 commit 479a3e4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
23 changes: 14 additions & 9 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,17 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
| _ ->
Log.Logger.LogWarning("{0} not connected, skipping send", prefix)

let dequeuePendingMessage () =
if (blockedRequests.Count > 0) then
blockedRequests.Dequeue()
|> BeginSendMessage
|> post this.Mb
pendingMessages.Dequeue()
let dequeuePendingMessage (pendingMessage: PendingMessage<'T>) =
if blockedRequests.Count > 0 then
let messagesToRelease =
match pendingMessage.Callback with
| SingleCallback _ -> 1
| BatchCallbacks callbacks -> callbacks.Length
for _ in 1..messagesToRelease do
blockedRequests.Dequeue()
|> BeginSendMessage
|> post this.Mb
pendingMessages.Dequeue() |> ignore

let resendMessages (clientCnx: ClientCnx) =
if pendingMessages.Count > 0 then
Expand Down Expand Up @@ -289,7 +294,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c

let processOpSendMsg { OpSendMsg = opSendMsg; LowestSequenceId = lowestSequenceId; HighestSequenceId = highestSequenceId;
PartitionKey = partitionKey; OrderingKey = orderingKey; TxnId = txnId; ReplicationClusters = replicationClusters } =
let batchPayload, batchCallbacks = opSendMsg;
let batchPayload, batchCallbacks = opSendMsg
let batchSize = batchCallbacks.Length
let metadata = createMessageMetadata lowestSequenceId txnId (Some batchSize)
batchPayload partitionKey EmptyProps None orderingKey None replicationClusters
Expand Down Expand Up @@ -589,7 +594,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
// Add check `sequenceId >= highestSequenceId` for backward compatibility.
if sequenceId >= highestSequenceId || highestSequenceId = exptectedHighestSequenceId then
Log.Logger.LogDebug("{0} Received ack for message {1}", prefix, receipt)
dequeuePendingMessage() |> ignore
dequeuePendingMessage pendingMessage
lastSequenceIdPublished <- Math.Max(lastSequenceIdPublished, %(getHighestSequenceId pendingMessage))
match pendingMessage.Callback with
| SingleCallback (chunkDetailsOption, msg, tcs) ->
Expand Down Expand Up @@ -652,7 +657,7 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
let! corrupted = verifyIfLocalBufferIsCorrupted pendingMessage
if corrupted then
// remove message from pendingMessages queue and fail callback
dequeuePendingMessage |> ignore
dequeuePendingMessage pendingMessage
failPendingMessage pendingMessage (ChecksumException "Checksum failed on corrupt message")
else
Log.Logger.LogDebug("{0} Message is not corrupted, retry send-message with sequenceId {1}", prefix, sequenceId)
Expand Down
2 changes: 1 addition & 1 deletion tests/IntegrationTests/Batching.fs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ let tests =
Log.Debug("Finished Batch recieve works with regular consumer")
}

ftestTask "Second batch is formed well after the first one" {
testTask "Second batch is formed well after the first one" {

Log.Debug("Started 'Second batch is formed well after the first one'")

Expand Down

0 comments on commit 479a3e4

Please sign in to comment.