Skip to content

Commit

Permalink
Changed byte arrays to memory stream for producer
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Sep 26, 2023
1 parent bf4218d commit c7857a1
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 225 deletions.
3 changes: 1 addition & 2 deletions src/Pulsar.Client/Api/IMessageCrypto.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ type IMessageEncryptor =
abstract member UpdateEncryptionKeys: unit -> unit

type IMessageDecryptor =

abstract member Decrypt: encryptedPayload: EncryptedMessage -> byte []

186 changes: 88 additions & 98 deletions src/Pulsar.Client/Common/Commands.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,99 +21,88 @@ type CommandType = BaseCommand.Type
let DEFAULT_MAX_MESSAGE_SIZE = 5_242_880 //5 * 1024 * 1024


let private processSimpleCommand (command : BaseCommand) (stream: Stream) (binaryWriter: BinaryWriter) (output: Stream) =
// write fake totalLength
for i in 1..4 do
stream.WriteByte(0uy)

// write commandPayload
Serializer.SerializeWithLengthPrefix(stream, command, PrefixStyle.Fixed32BigEndian)
let frameSize = int stream.Length

let totalSize = frameSize - 4

//write total size and command size
stream.Seek(0L,SeekOrigin.Begin) |> ignore
binaryWriter.Write(int32ToBigEndian totalSize)
stream.Seek(0L, SeekOrigin.Begin) |> ignore

stream.CopyToAsync(output)

let private processComplexCommand (command : BaseCommand) (metadata: MessageMetadata) (payload: byte[])
(stream: RecyclableMemoryStream) (binaryWriter: BinaryWriter) (output: Stream) =
// write fake totalLength
for i in 1..4 do
stream.WriteByte(0uy)

// write commandPayload
Serializer.SerializeWithLengthPrefix(stream, command, PrefixStyle.Fixed32BigEndian)

let stream1Size = int stream.Length

// write magic number 0x0e01
stream.WriteByte(14uy)
stream.WriteByte(1uy)

// write fake CRC sum
for i in 1..4 do
stream.WriteByte(0uy)

// write metadata
Serializer.SerializeWithLengthPrefix(stream, metadata, PrefixStyle.Fixed32BigEndian)
let stream2Size = int stream.Length
let totalMetadataSize = stream2Size - stream1Size - 6

// write payload
stream.Write(payload, 0, payload.Length)

let frameSize = int stream.Length
let totalSize = frameSize - 4
let payloadSize = frameSize - stream2Size

let crcStart = stream1Size + 2
let crcPayloadStart = crcStart + 4

//write CRC
stream.Seek(int64 crcPayloadStart, SeekOrigin.Begin) |> ignore
let crc = int32 <| CRC32C.GetForRMS(stream, totalMetadataSize + payloadSize)
stream.Seek(int64 crcStart, SeekOrigin.Begin) |> ignore
binaryWriter.Write(int32ToBigEndian crc)

//write total size and command size
stream.Seek(0L, SeekOrigin.Begin) |> ignore
binaryWriter.Write(int32ToBigEndian totalSize)

stream.Seek(0L, SeekOrigin.Begin) |> ignore
stream.CopyToAsync(output)

let serializeSimpleCommand(command : BaseCommand) =
let f =
fun (output: Stream) ->
backgroundTask {
use stream = MemoryStreamManager.GetStream()
use binaryWriter = new BinaryWriter(stream)
return! processSimpleCommand command stream binaryWriter output
}
(f, command.``type``)


let serializePayloadCommand (command : BaseCommand) (metadata: MessageMetadata) (payload: byte[]) =
let f =
fun (output: Stream) ->
backgroundTask {
use stream = MemoryStreamManager.GetStream() :?> RecyclableMemoryStream
use binaryWriter = new BinaryWriter(stream)
return! processComplexCommand command metadata payload stream binaryWriter output
}
(f, command.``type``)

let newPartitionMetadataRequest(topicName : CompleteTopicName) (requestId : RequestId) : Payload =
let private serializeSimpleCommand (command : BaseCommand) =
(fun output ->
use temp = MemoryStreamManager.GetStream()
use binaryWriter = new BinaryWriter(temp)

// write fake totalLength
for i in 1..4 do
temp.WriteByte(0uy)

// write commandPayload
Serializer.SerializeWithLengthPrefix(temp, command, PrefixStyle.Fixed32BigEndian)
let frameSize = int temp.Length

let totalSize = frameSize - 4

//write total size and command size
temp.Seek(0L,SeekOrigin.Begin) |> ignore
binaryWriter.Write(int32ToBigEndian totalSize)
temp.Seek(0L, SeekOrigin.Begin) |> ignore
temp.CopyToAsync(output)
), command.``type``


let private serializePayloadCommand (command : BaseCommand) (metadata: MessageMetadata) (payload: MemoryStream) =
(fun output ->
use temp = MemoryStreamManager.GetStream() :?> RecyclableMemoryStream
use binaryWriter = new BinaryWriter(temp)

// write fake totalLength
for i in 1..4 do
temp.WriteByte(0uy)

// write commandPayload
Serializer.SerializeWithLengthPrefix(temp, command, PrefixStyle.Fixed32BigEndian)

let stream1Size = int temp.Length

// write magic number 0x0e01
temp.WriteByte(14uy)
temp.WriteByte(1uy)

// write fake CRC sum
for i in 1..4 do
temp.WriteByte(0uy)

// write metadata
Serializer.SerializeWithLengthPrefix(temp, metadata, PrefixStyle.Fixed32BigEndian)
let stream2Size = int temp.Length
let totalMetadataSize = stream2Size - stream1Size - 6

// write payload
payload.Seek(0L, SeekOrigin.Begin) |> ignore
payload.CopyTo(temp)

let frameSize = int temp.Length
let totalSize = frameSize - 4
let payloadSize = frameSize - stream2Size

let crcStart = stream1Size + 2
let crcPayloadStart = crcStart + 4

//write CRC
temp.Seek(int64 crcPayloadStart, SeekOrigin.Begin) |> ignore
let crc = int32 <| CRC32C.GetForRMS(temp, totalMetadataSize + payloadSize)
temp.Seek(int64 crcStart, SeekOrigin.Begin) |> ignore
binaryWriter.Write(int32ToBigEndian crc)

//write total size and command size
temp.Seek(0L, SeekOrigin.Begin) |> ignore
binaryWriter.Write(int32ToBigEndian totalSize)

temp.Seek(0L, SeekOrigin.Begin) |> ignore
temp.CopyToAsync(output)
), command.``type``

let newPartitionMetadataRequest (topicName : CompleteTopicName) (requestId : RequestId) =
let request = CommandPartitionedTopicMetadata(Topic = %topicName, RequestId = %requestId)
let command = BaseCommand(``type`` = CommandType.PartitionedMetadata, partitionMetadata = request)
serializeSimpleCommand command

let newSend (producerId : ProducerId) (sequenceId : SequenceId) (highestSequenceId: SequenceId option)
(numMessages : int) (msgMetadata : MessageMetadata) (payload: byte[]) : Payload =
(numMessages : int) (msgMetadata : MessageMetadata) (payload: MemoryStream) =
let request = CommandSend(ProducerId = %producerId, SequenceId = uint64 %sequenceId)
if numMessages > 1 then
request.NumMessages <- numMessages
Expand All @@ -130,7 +119,7 @@ let newSend (producerId : ProducerId) (sequenceId : SequenceId) (highestSequence

let newAck (consumerId : ConsumerId) (ledgerId: LedgerId) (entryId: EntryId) (ackType : AckType)
(properties: IReadOnlyDictionary<string, int64>) (ackSet: AckSet) (validationError: CommandAck.ValidationError option)
(txnId: TxnId option) (requestId: RequestId option) (batchSize: int option) : Payload =
(txnId: TxnId option) (requestId: RequestId option) (batchSize: int option) =
let request = CommandAck(ConsumerId = %consumerId, ack_type = ackType.ToCommandAckType())
let messageIdData = MessageIdData(ledgerId = uint64 %ledgerId,
entryId = uint64 %entryId,
Expand All @@ -156,17 +145,17 @@ let newAck (consumerId : ConsumerId) (ledgerId: LedgerId) (entryId: EntryId) (ac
let command = BaseCommand(``type`` = CommandType.Ack, Ack = request)
serializeSimpleCommand command

let newMultiMessageAck (consumerId : ConsumerId) (messages: seq<LedgerId*EntryId*AckSet>) : Payload =
let newMultiMessageAck (consumerId : ConsumerId) (messages: seq<LedgerId*EntryId*AckSet>) =
let request = CommandAck(ConsumerId = %consumerId, ack_type = CommandAck.AckType.Individual)
messages
|> Seq.map (fun (ledgerId, entryId, ackSet) ->
MessageIdData(ledgerId = uint64 %ledgerId, entryId = uint64 %entryId, AckSets = ackSet)
)
|> request.MessageIds.AddRange
let command = BaseCommand(``type`` = CommandType.Ack, Ack = request)
serializeSimpleCommand command
command |> serializeSimpleCommand

let newConnect (authMethodName: string) (authData: AuthData) (clientVersion: string) (protocolVersion: ProtocolVersion) (proxyToBroker: Option<DnsEndPoint>) : Payload =
let newConnect (authMethodName: string) (authData: AuthData) (clientVersion: string) (protocolVersion: ProtocolVersion) (proxyToBroker: Option<DnsEndPoint>) =
let request = CommandConnect(ClientVersion = clientVersion, ProtocolVersion = int protocolVersion, AuthMethodName = authMethodName)
if authMethodName = "ycav1" then
request.AuthMethod <- AuthMethod.AuthMethodYcaV1
Expand All @@ -178,17 +167,18 @@ let newConnect (authMethodName: string) (authData: AuthData) (clientVersion: str
let command = BaseCommand(``type`` = CommandType.Connect, Connect = request)
command |> serializeSimpleCommand

let newPing () : Payload =
let newPing () =
let request = CommandPing()
let command = BaseCommand(``type`` = CommandType.Ping, Ping = request)
command |> serializeSimpleCommand
serializeSimpleCommand command

let newPong () : Payload =
let newPong () =
let request = CommandPong()
let command = BaseCommand(``type`` = CommandType.Pong, Pong = request)
command |> serializeSimpleCommand
serializeSimpleCommand command

let newLookup (topicName : CompleteTopicName) (requestId : RequestId) (authoritative : bool) (listenerName: string) =
let newLookup (topicName : CompleteTopicName) (requestId : RequestId) (authoritative : bool)
(listenerName: string) =
let request = CommandLookupTopic(Topic = %topicName, Authoritative = authoritative, RequestId = uint64(%requestId))
if listenerName |> String.IsNullOrEmpty |> not then
request.AdvertisedListenerName <- listenerName
Expand Down Expand Up @@ -349,7 +339,7 @@ let newAddSubscriptionToTxn (txn: TxnId) (requestId: RequestId) (topic: Complete
let command = BaseCommand(``type`` = CommandType.AddSubscriptionToTxn, addSubscriptionToTxn = request)
command |> serializeSimpleCommand

let newEndTxn (txn: TxnId) (requestId: RequestId) (action: TxnAction) =
let newEndTxn (txn: TxnId) (requestId: RequestId) (action: TxnAction) =
let request = CommandEndTxn(TxnidLeastBits = txn.LeastSigBits, TxnidMostBits = txn.MostSigBits, RequestId = %requestId,
TxnAction = action)
let command = BaseCommand(``type`` = CommandType.EndTxn, endTxn = request)
Expand Down

0 comments on commit c7857a1

Please sign in to comment.