Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AwaitTask throws AggregateException #14

Merged
merged 1 commit into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions socket/Async.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace System
open System
open System.Threading.Tasks

[<AutoOpen>]
module Async =
type Async with
static member AwaitTaskCorrect(task : Task) : Async<unit> =
Async.FromContinuations(fun (sc,ec,cc) ->
task.ContinueWith(fun (task:Task) ->
if task.IsFaulted then
let e = task.Exception
if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0]
else ec e
elif task.IsCanceled then
ec(TaskCanceledException())
else
sc ())
|> ignore)

static member AwaitTaskCorrect(task : Task<'T>) : Async<'T> =
Async.FromContinuations(fun (sc,ec,cc) ->
task.ContinueWith(fun (task:Task<'T>) ->
if task.IsFaulted then
let e = task.Exception
if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0]
else ec e
elif task.IsCanceled then
ec(TaskCanceledException())
else
sc task.Result)
|> ignore)
48 changes: 24 additions & 24 deletions socket/TcpMailbox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ open System.IO
open socket.core.TcpWrappers

let readBufferAsync (bytes: byte[]) (networkStream: Stream): Async<int option> = async {
try
try
//if (networkStream.CanRead && networkStream.DataAvailable) then
if (networkStream.CanRead) then
let token = (new CancellationTokenSource(TimeSpan.FromMilliseconds(5))).Token
let! i = networkStream.ReadAsync(bytes, 0, bytes.Length, token) |> Async.AwaitTask
let! i = networkStream.ReadAsync(bytes, 0, bytes.Length, token) |> Async.AwaitTaskCorrect
return Some i
else
else
return None
with
| :? ObjectDisposedException -> return Some 0
Expand All @@ -25,52 +25,52 @@ let readBufferAsync (bytes: byte[]) (networkStream: Stream): Async<int option> =
| :? OperationCanceledException -> return None
| ex -> printfn "Some unknown exception %A" ex
return None
}
}

// TODO Consider closing both ways to close connection, but we are a server
type ConnectionStatus = Open | Closed
type lineFeed = MesssageToSend of string | GetRecieved of AsyncReplyChannel<string*ConnectionStatus> | GetSent of AsyncReplyChannel<string*ConnectionStatus> | Close
type lineFeed = MesssageToSend of string | GetRecieved of AsyncReplyChannel<string*ConnectionStatus> | GetSent of AsyncReplyChannel<string*ConnectionStatus> | Close

let ListenMessages (client : ITcpClient) = MailboxProcessor<lineFeed>.Start( fun inbox ->
let networkStream = client.GetStream()
let bytes: byte [] = Array.zeroCreate 4096
let bytes: byte [] = Array.zeroCreate 4096
let mutable recievedString = ""
let mutable sentString = ""
let mutable streamState = Open
let mutable streamState = Open
let rec innerLoop () = async {
let! msg = inbox.TryReceive(10)
match msg with
| Some (MesssageToSend msg) ->
match msg with
| Some (MesssageToSend msg) ->
match streamState with
| Open ->
| Open ->
let msgWithNewline = msg + System.Environment.NewLine
let bytesToSend = System.Text.Encoding.Latin1.GetBytes(msgWithNewline)
// need to handle writing errors...
try
try
do! networkStream.WriteAsync(bytesToSend, 0, bytesToSend.Length) |> Async.AwaitTask
sentString <- sentString + msgWithNewline;
with
| :? ObjectDisposedException -> streamState <- Closed
| :? SocketException -> streamState <- Closed
| :? TaskCanceledException -> streamState <- Closed
| :? OperationCanceledException -> streamState <- Closed
| :? AggregateException -> streamState <- Closed
| Closed -> ()
with
| :? ObjectDisposedException -> streamState <- Closed
| :? SocketException -> streamState <- Closed
| :? TaskCanceledException -> streamState <- Closed
| :? OperationCanceledException -> streamState <- Closed
| :? AggregateException -> streamState <- Closed
| Closed -> ()
return! innerLoop()
| Some (GetSent reply) ->
reply.Reply((sentString, streamState))
reply.Reply((sentString, streamState))
return! innerLoop()
| Some (GetRecieved(reply)) ->
| Some (GetRecieved(reply)) ->
reply.Reply(recievedString, streamState)
return! innerLoop()
| Some (Close) ->
streamState <- Closed
| Some (Close) ->
streamState <- Closed
do! networkStream.FlushAsync() |> Async.AwaitTask
networkStream.Close()
client.Close();
client.Dispose()
return! innerLoop()
| None ->
| None ->
let! bytesRead = readBufferAsync bytes networkStream
match bytesRead with
| Some 0 -> streamState <- Closed
Expand All @@ -82,4 +82,4 @@ let ListenMessages (client : ITcpClient) = MailboxProcessor<lineFeed>.Start( fun
return! innerLoop()
| None -> return! innerLoop()
}
innerLoop ())
innerLoop ())
10 changes: 3 additions & 7 deletions socket/socket.fsproj
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Library</OutputType>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<Compile Include="Async.fs" />
<Compile Include="TcpClientWrapper.fs" />
<Compile Include="TcpMailbox.fs" />
<Compile Include="ConnectionController.fs" />
<Compile Include="Model.fs" />
<Compile Include="Messages.fs" />
<Compile Include="Server.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="FSharp.Control.AsyncSeq" Version="3.2.1" />
<PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="FSharp.Core" Version="7.0.400" />
</ItemGroup>

</Project>
</Project>