Skip to content

Commit

Permalink
Fix for handling missing keys in KeyBasedBatcher #228
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Aug 9, 2023
1 parent 11e6aa0 commit 184d53f
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/Pulsar.Client/Internal/BatchMessageContainer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type internal MessageContainer<'T>(config: ProducerConfiguration) =
this.CurrentTxnId <- Some txn.Id
| _ ->
()

member this.HaveEnoughSpace (msgBuilder: MessageBuilder<'T>) =
let messageSize = msgBuilder.Payload.Length
((maxBytesInBatch <= 0 && (messageSize + this.CurrentBatchSizeBytes) <= this.MaxMessageSize)
Expand Down Expand Up @@ -145,18 +145,25 @@ type internal KeyBasedBatchMessageContainer<'T>(prefix: string, config: Producer
inherit MessageContainer<'T>(config)

let prefix = prefix + " KeyBasedBatcher"
let keyBatchItems = Dictionary<MessageKey option, ResizeArray<BatchItem<'T>>>()
let keyBatchItems = Dictionary<MessageKey, ResizeArray<BatchItem<'T>>>()

let getKey (msg: MessageBuilder<'T>) =
match msg.OrderingKey with
| Some orderingKey ->
| Some orderingKey ->
{
PartitionKey = %Convert.ToBase64String(orderingKey)
IsBase64Encoded = true
} |> Some
}
| None ->
msg.Key

match msg.Key with
| Some key -> key
| None ->
{
PartitionKey = %""
IsBase64Encoded = false
}


override this.Add batchItem =
this.AddStart(prefix, batchItem)
let key = batchItem.Message |> getKey
Expand Down

0 comments on commit 184d53f

Please sign in to comment.