Skip to content

Commit

Permalink
Prevent dequeue empty blocked requests queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Jul 23, 2023
1 parent 479a3e4 commit 11e6aa0
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 7 deletions.
4 changes: 2 additions & 2 deletions paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ NUGET
System.Collections.Immutable (>= 7.0)
protobuf-net.Reflection (3.2.12)
protobuf-net.Core (>= 3.2.12)
Pulsar.Client (2.12.4)
Pulsar.Client (2.12.6)
Apache.Avro (>= 1.11.2)
AvroSchemaGenerator (>= 2.9.2)
FSharp.Core (>= 7.0.300)
Expand Down Expand Up @@ -369,7 +369,7 @@ NUGET
System.Memory (>= 4.5.5)
protobuf-net.Reflection (3.2.12)
protobuf-net.Core (>= 3.2.12)
Pulsar.Client (2.12.4)
Pulsar.Client (2.12.6)
Apache.Avro (>= 1.11.2)
AvroSchemaGenerator (>= 2.9.2)
FSharp.Core (>= 7.0.300)
Expand Down
4 changes: 3 additions & 1 deletion src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,12 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
match pendingMessage.Callback with
| SingleCallback _ -> 1
| BatchCallbacks callbacks -> callbacks.Length
for _ in 1..messagesToRelease do
let mutable i = messagesToRelease
while i > 0 && blockedRequests.Count > 0 do
blockedRequests.Dequeue()
|> BeginSendMessage
|> post this.Mb
i <- i - 1
pendingMessages.Dequeue() |> ignore

let resendMessages (clientCnx: ClientCnx) =
Expand Down
6 changes: 3 additions & 3 deletions src/Pulsar.Client/Pulsar.Client.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
<Title>Pulsar.Client</Title>
<RootNamespace>Pulsar.Client</RootNamespace>
<AssemblyName>Pulsar.Client</AssemblyName>
<Version>2.12.4</Version>
<Version>2.12.6</Version>
<Company>F# community</Company>
<Description>.NET client library for Apache Pulsar</Description>
<RepositoryUrl>https://github.com/fsprojects/pulsar-client-dotnet</RepositoryUrl>
<PackageReleaseNotes>Packages update</PackageReleaseNotes>
<PackageReleaseNotes>Fix for unblocking correct amount of messages after receving acknowledge for a batch</PackageReleaseNotes>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/fsprojects/pulsar-client-dotnet</PackageProjectUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>pulsar</PackageTags>
<Authors>F# community</Authors>
<PackageVersion>2.12.4</PackageVersion>
<PackageVersion>2.12.6</PackageVersion>
<DebugType>portable</DebugType>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageReadmeFile>README.md</PackageReadmeFile>
Expand Down
2 changes: 1 addition & 1 deletion tests/IntegrationTests/Batching.fs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ let tests =

let client = getClient()
let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N")
let messagesNumber = 4
let messagesNumber = 5

let! (consumer: IConsumer<byte[]>) =
client.NewConsumer()
Expand Down

0 comments on commit 11e6aa0

Please sign in to comment.