From 2e5e210210ce362023052a6b57b025fd024c91e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Cie=C5=9Blak?= Date: Mon, 7 Jan 2019 16:16:58 +0100 Subject: [PATCH] Use Jimmy's implementation --- src/Saturn/Channels.fs | 51 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/src/Saturn/Channels.fs b/src/Saturn/Channels.fs index 23ed7fb8..6c7f4c8c 100644 --- a/src/Saturn/Channels.fs +++ b/src/Saturn/Channels.fs @@ -27,6 +27,50 @@ module Channels = type SocketMiddleware(next : RequestDelegate, serializer: IJsonSerializer, path: string, channel: IChannel) = do sockets.Add(path, ConcurrentDictionary()) + /// **Description** + /// + /// (16 * 1024) = 16384 + /// https://referencesource.microsoft.com/#System/net/System/Net/WebSockets/WebSocketHelpers.cs,285b8b64a4da6851 + /// + /// **Output Type** + /// * `int` + [] + let defaultBufferSize : int = 16384 // (16 * 1024) + + + let receiveMessage cancellationToken bufferSize messageType (writeableStream : IO.Stream) (socket : WebSocket) = task { + let buffer = new ArraySegment( Array.create (bufferSize) Byte.MinValue) + let mutable moreToRead = false + let mutable res = None + while moreToRead do + let! result = socket.ReceiveAsync(buffer,cancellationToken) + res <- Some result + match result with + | result when result.MessageType = WebSocketMessageType.Close || socket.State = WebSocketState.CloseReceived -> + // printfn "Close received! %A - %A" socket.CloseStatus socket.CloseStatusDescription + do! socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Close received", cancellationToken) + | result -> + // printfn "result.MessageType -> %A" result.MessageType + if result.MessageType <> messageType then + failwithf "Invalid message type received %A, expected %A" result.MessageType messageType + do! writeableStream.WriteAsync(buffer.Array, buffer.Offset, result.Count) + if result.EndOfMessage then + moreToRead <- false + return res.Value + } + + + let receiveMessageAsUTF8 cancellationToken socket = task { + use stream = new IO.MemoryStream() + let! res = receiveMessage cancellationToken defaultBufferSize WebSocketMessageType.Text stream socket + stream.Seek(0L,IO.SeekOrigin.Begin) |> ignore + let cnt = + stream.ToArray() + |> Text.Encoding.UTF8.GetString + |> fun s -> s.TrimEnd(char 0) + return res,cnt + } + member __.Invoke(ctx : HttpContext) = task { if ctx.Request.Path = PathString(path) then @@ -39,13 +83,12 @@ module Channels = let guid = Guid.NewGuid().ToString() sockets.[path].AddOrUpdate (guid, webSocket, fun _ _ -> webSocket) |> ignore - let buffer : byte [] = Array.zeroCreate 4096 + let buffer : byte [] = Array.zeroCreate 4096 //It's buffer for just open message. let! echo = webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None) while not echo.CloseStatus.HasValue do - let! result = - webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None) - let msg = serializer.Deserialize buffer + let! (result, msg) = receiveMessageAsUTF8 CancellationToken.None webSocket + let msg = serializer.Deserialize msg do! channel.HandleMessage(ctx, result, msg) ()