Skip to content

Commit

Permalink
Use Jimmy's implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Krzysztof-Cieslak committed Jan 7, 2019
1 parent c0f9317 commit 2e5e210
Showing 1 changed file with 47 additions and 4 deletions.
51 changes: 47 additions & 4 deletions src/Saturn/Channels.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,50 @@ module Channels =
type SocketMiddleware(next : RequestDelegate, serializer: IJsonSerializer, path: string, channel: IChannel) =
do sockets.Add(path, ConcurrentDictionary())

/// **Description**
///
/// (16 * 1024) = 16384
/// https://referencesource.microsoft.com/#System/net/System/Net/WebSockets/WebSocketHelpers.cs,285b8b64a4da6851
///
/// **Output Type**
/// * `int`
[<Literal>]
let defaultBufferSize : int = 16384 // (16 * 1024)


let receiveMessage cancellationToken bufferSize messageType (writeableStream : IO.Stream) (socket : WebSocket) = task {
let buffer = new ArraySegment<Byte>( Array.create (bufferSize) Byte.MinValue)
let mutable moreToRead = false
let mutable res = None
while moreToRead do
let! result = socket.ReceiveAsync(buffer,cancellationToken)
res <- Some result
match result with
| result when result.MessageType = WebSocketMessageType.Close || socket.State = WebSocketState.CloseReceived ->
// printfn "Close received! %A - %A" socket.CloseStatus socket.CloseStatusDescription
do! socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Close received", cancellationToken)
| result ->
// printfn "result.MessageType -> %A" result.MessageType
if result.MessageType <> messageType then
failwithf "Invalid message type received %A, expected %A" result.MessageType messageType
do! writeableStream.WriteAsync(buffer.Array, buffer.Offset, result.Count)
if result.EndOfMessage then
moreToRead <- false
return res.Value
}


let receiveMessageAsUTF8 cancellationToken socket = task {
use stream = new IO.MemoryStream()
let! res = receiveMessage cancellationToken defaultBufferSize WebSocketMessageType.Text stream socket
stream.Seek(0L,IO.SeekOrigin.Begin) |> ignore
let cnt =
stream.ToArray()
|> Text.Encoding.UTF8.GetString
|> fun s -> s.TrimEnd(char 0)
return res,cnt
}

member __.Invoke(ctx : HttpContext) =
task {
if ctx.Request.Path = PathString(path) then
Expand All @@ -39,13 +83,12 @@ module Channels =
let guid = Guid.NewGuid().ToString()
sockets.[path].AddOrUpdate (guid, webSocket, fun _ _ -> webSocket) |> ignore

let buffer : byte [] = Array.zeroCreate 4096
let buffer : byte [] = Array.zeroCreate 4096 //It's buffer for just open message.
let! echo = webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None)

while not echo.CloseStatus.HasValue do
let! result =
webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None)
let msg = serializer.Deserialize<Message> buffer
let! (result, msg) = receiveMessageAsUTF8 CancellationToken.None webSocket
let msg = serializer.Deserialize<Message> msg
do! channel.HandleMessage(ctx, result, msg)
()

Expand Down

0 comments on commit 2e5e210

Please sign in to comment.