diff --git a/socket/Async.fs b/socket/Async.fs new file mode 100644 index 0000000..e379ed8 --- /dev/null +++ b/socket/Async.fs @@ -0,0 +1,32 @@ +namespace System +open System +open System.Threading.Tasks + +[] +module Async = + type Async with + static member AwaitTaskCorrect(task : Task) : Async = + Async.FromContinuations(fun (sc,ec,cc) -> + task.ContinueWith(fun (task:Task) -> + if task.IsFaulted then + let e = task.Exception + if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0] + else ec e + elif task.IsCanceled then + ec(TaskCanceledException()) + else + sc ()) + |> ignore) + + static member AwaitTaskCorrect(task : Task<'T>) : Async<'T> = + Async.FromContinuations(fun (sc,ec,cc) -> + task.ContinueWith(fun (task:Task<'T>) -> + if task.IsFaulted then + let e = task.Exception + if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0] + else ec e + elif task.IsCanceled then + ec(TaskCanceledException()) + else + sc task.Result) + |> ignore) diff --git a/socket/TcpMailbox.fs b/socket/TcpMailbox.fs index 8a2633a..1d666bf 100644 --- a/socket/TcpMailbox.fs +++ b/socket/TcpMailbox.fs @@ -10,13 +10,13 @@ open System.IO open socket.core.TcpWrappers let readBufferAsync (bytes: byte[]) (networkStream: Stream): Async = async { - try + try //if (networkStream.CanRead && networkStream.DataAvailable) then if (networkStream.CanRead) then let token = (new CancellationTokenSource(TimeSpan.FromMilliseconds(5))).Token - let! i = networkStream.ReadAsync(bytes, 0, bytes.Length, token) |> Async.AwaitTask + let! i = networkStream.ReadAsync(bytes, 0, bytes.Length, token) |> Async.AwaitTaskCorrect return Some i - else + else return None with | :? ObjectDisposedException -> return Some 0 @@ -25,52 +25,52 @@ let readBufferAsync (bytes: byte[]) (networkStream: Stream): Async = | :? OperationCanceledException -> return None | ex -> printfn "Some unknown exception %A" ex return None -} +} // TODO Consider closing both ways to close connection, but we are a server type ConnectionStatus = Open | Closed -type lineFeed = MesssageToSend of string | GetRecieved of AsyncReplyChannel | GetSent of AsyncReplyChannel | Close +type lineFeed = MesssageToSend of string | GetRecieved of AsyncReplyChannel | GetSent of AsyncReplyChannel | Close let ListenMessages (client : ITcpClient) = MailboxProcessor.Start( fun inbox -> let networkStream = client.GetStream() - let bytes: byte [] = Array.zeroCreate 4096 + let bytes: byte [] = Array.zeroCreate 4096 let mutable recievedString = "" let mutable sentString = "" - let mutable streamState = Open + let mutable streamState = Open let rec innerLoop () = async { let! msg = inbox.TryReceive(10) - match msg with - | Some (MesssageToSend msg) -> + match msg with + | Some (MesssageToSend msg) -> match streamState with - | Open -> + | Open -> let msgWithNewline = msg + System.Environment.NewLine let bytesToSend = System.Text.Encoding.Latin1.GetBytes(msgWithNewline) // need to handle writing errors... - try + try do! networkStream.WriteAsync(bytesToSend, 0, bytesToSend.Length) |> Async.AwaitTask sentString <- sentString + msgWithNewline; - with - | :? ObjectDisposedException -> streamState <- Closed - | :? SocketException -> streamState <- Closed - | :? TaskCanceledException -> streamState <- Closed - | :? OperationCanceledException -> streamState <- Closed - | :? AggregateException -> streamState <- Closed - | Closed -> () + with + | :? ObjectDisposedException -> streamState <- Closed + | :? SocketException -> streamState <- Closed + | :? TaskCanceledException -> streamState <- Closed + | :? OperationCanceledException -> streamState <- Closed + | :? AggregateException -> streamState <- Closed + | Closed -> () return! innerLoop() | Some (GetSent reply) -> - reply.Reply((sentString, streamState)) + reply.Reply((sentString, streamState)) return! innerLoop() - | Some (GetRecieved(reply)) -> + | Some (GetRecieved(reply)) -> reply.Reply(recievedString, streamState) return! innerLoop() - | Some (Close) -> - streamState <- Closed + | Some (Close) -> + streamState <- Closed do! networkStream.FlushAsync() |> Async.AwaitTask networkStream.Close() client.Close(); client.Dispose() return! innerLoop() - | None -> + | None -> let! bytesRead = readBufferAsync bytes networkStream match bytesRead with | Some 0 -> streamState <- Closed @@ -82,4 +82,4 @@ let ListenMessages (client : ITcpClient) = MailboxProcessor.Start( fun return! innerLoop() | None -> return! innerLoop() } - innerLoop ()) \ No newline at end of file + innerLoop ()) diff --git a/socket/socket.fsproj b/socket/socket.fsproj index 3aa50c6..173f03d 100644 --- a/socket/socket.fsproj +++ b/socket/socket.fsproj @@ -1,11 +1,10 @@ - - + Library net6.0 - + @@ -13,14 +12,11 @@ - - - - + \ No newline at end of file