Skip to content

Commit

Permalink
Small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Sep 26, 2023
1 parent c7857a1 commit 4c0b9dc
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
13 changes: 10 additions & 3 deletions src/Pulsar.Client/Common/Commands.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

open System.Collections
open System.Collections.Generic
open System.Threading.Tasks
open Microsoft.IO
open Pulsar.Client.Transaction
open pulsar.proto
Expand Down Expand Up @@ -46,8 +47,8 @@ let private serializeSimpleCommand (command : BaseCommand) =

let private serializePayloadCommand (command : BaseCommand) (metadata: MessageMetadata) (payload: MemoryStream) =
(fun output ->
use temp = MemoryStreamManager.GetStream() :?> RecyclableMemoryStream
use binaryWriter = new BinaryWriter(temp)
let temp = MemoryStreamManager.GetStream() :?> RecyclableMemoryStream
let binaryWriter = new BinaryWriter(temp)

// write fake totalLength
for i in 1..4 do
Expand Down Expand Up @@ -93,7 +94,13 @@ let private serializePayloadCommand (command : BaseCommand) (metadata: MessageMe
binaryWriter.Write(int32ToBigEndian totalSize)

temp.Seek(0L, SeekOrigin.Begin) |> ignore
temp.CopyToAsync(output)
backgroundTask {
try
return! temp.CopyToAsync(output)
finally
temp.Dispose()
binaryWriter.Dispose()
} :> Task
), command.``type``

let newPartitionMetadataRequest (topicName : CompleteTopicName) (requestId : RequestId) =
Expand Down
7 changes: 6 additions & 1 deletion src/Pulsar.Client/Common/Tools.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ open System.IO
open System.Net
open System.Runtime.InteropServices
open System.Threading.Tasks
open Microsoft.FSharp.NativeInterop
open Microsoft.IO
open System.Runtime.ExceptionServices
open System.Collections.Generic
Expand Down Expand Up @@ -169,4 +170,8 @@ let post (channel: Channel<'T>) =
channel.Writer.TryWrite >> ignore

let getSpan (stream: MemoryStream) =
stream.GetBuffer().AsSpan(0, int stream.Length)
stream.GetBuffer().AsSpan(0, int stream.Length)

let inline stackallocspan<'a when 'a: unmanaged> size =
let p = NativePtr.stackalloc<'a> size |> NativePtr.toVoidPtr
Span<'a>(p, size)

0 comments on commit 4c0b9dc

Please sign in to comment.