Skip to content

Commit

Permalink
Merge pull request #14 from laat/awaittask
Browse files Browse the repository at this point in the history
AwaitTask throws AggregateException
  • Loading branch information
bjorn-einar-bjartnes-4ss committed Oct 15, 2023
2 parents dfc2d5a + 77a8fcf commit 005ecb2
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 31 deletions.
32 changes: 32 additions & 0 deletions socket/Async.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace System
open System
open System.Threading.Tasks

[<AutoOpen>]
module Async =
type Async with
static member AwaitTaskCorrect(task : Task) : Async<unit> =
Async.FromContinuations(fun (sc,ec,cc) ->
task.ContinueWith(fun (task:Task) ->
if task.IsFaulted then
let e = task.Exception
if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0]
else ec e
elif task.IsCanceled then
ec(TaskCanceledException())
else
sc ())
|> ignore)

static member AwaitTaskCorrect(task : Task<'T>) : Async<'T> =
Async.FromContinuations(fun (sc,ec,cc) ->
task.ContinueWith(fun (task:Task<'T>) ->
if task.IsFaulted then
let e = task.Exception
if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0]
else ec e
elif task.IsCanceled then
ec(TaskCanceledException())
else
sc task.Result)
|> ignore)
48 changes: 24 additions & 24 deletions socket/TcpMailbox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ open System.IO
open socket.core.TcpWrappers

let readBufferAsync (bytes: byte[]) (networkStream: Stream): Async<int option> = async {
try
try
//if (networkStream.CanRead && networkStream.DataAvailable) then
if (networkStream.CanRead) then
let token = (new CancellationTokenSource(TimeSpan.FromMilliseconds(5))).Token
let! i = networkStream.ReadAsync(bytes, 0, bytes.Length, token) |> Async.AwaitTask
let! i = networkStream.ReadAsync(bytes, 0, bytes.Length, token) |> Async.AwaitTaskCorrect
return Some i
else
else
return None
with
| :? ObjectDisposedException -> return Some 0
Expand All @@ -25,52 +25,52 @@ let readBufferAsync (bytes: byte[]) (networkStream: Stream): Async<int option> =
| :? OperationCanceledException -> return None
| ex -> printfn "Some unknown exception %A" ex
return None
}
}

// TODO Consider closing both ways to close connection, but we are a server
type ConnectionStatus = Open | Closed
type lineFeed = MesssageToSend of string | GetRecieved of AsyncReplyChannel<string*ConnectionStatus> | GetSent of AsyncReplyChannel<string*ConnectionStatus> | Close
type lineFeed = MesssageToSend of string | GetRecieved of AsyncReplyChannel<string*ConnectionStatus> | GetSent of AsyncReplyChannel<string*ConnectionStatus> | Close

let ListenMessages (client : ITcpClient) = MailboxProcessor<lineFeed>.Start( fun inbox ->
let networkStream = client.GetStream()
let bytes: byte [] = Array.zeroCreate 4096
let bytes: byte [] = Array.zeroCreate 4096
let mutable recievedString = ""
let mutable sentString = ""
let mutable streamState = Open
let mutable streamState = Open
let rec innerLoop () = async {
let! msg = inbox.TryReceive(10)
match msg with
| Some (MesssageToSend msg) ->
match msg with
| Some (MesssageToSend msg) ->
match streamState with
| Open ->
| Open ->
let msgWithNewline = msg + System.Environment.NewLine
let bytesToSend = System.Text.Encoding.Latin1.GetBytes(msgWithNewline)
// need to handle writing errors...
try
try
do! networkStream.WriteAsync(bytesToSend, 0, bytesToSend.Length) |> Async.AwaitTask
sentString <- sentString + msgWithNewline;
with
| :? ObjectDisposedException -> streamState <- Closed
| :? SocketException -> streamState <- Closed
| :? TaskCanceledException -> streamState <- Closed
| :? OperationCanceledException -> streamState <- Closed
| :? AggregateException -> streamState <- Closed
| Closed -> ()
with
| :? ObjectDisposedException -> streamState <- Closed
| :? SocketException -> streamState <- Closed
| :? TaskCanceledException -> streamState <- Closed
| :? OperationCanceledException -> streamState <- Closed
| :? AggregateException -> streamState <- Closed
| Closed -> ()
return! innerLoop()
| Some (GetSent reply) ->
reply.Reply((sentString, streamState))
reply.Reply((sentString, streamState))
return! innerLoop()
| Some (GetRecieved(reply)) ->
| Some (GetRecieved(reply)) ->
reply.Reply(recievedString, streamState)
return! innerLoop()
| Some (Close) ->
streamState <- Closed
| Some (Close) ->
streamState <- Closed
do! networkStream.FlushAsync() |> Async.AwaitTask
networkStream.Close()
client.Close();
client.Dispose()
return! innerLoop()
| None ->
| None ->
let! bytesRead = readBufferAsync bytes networkStream
match bytesRead with
| Some 0 -> streamState <- Closed
Expand All @@ -82,4 +82,4 @@ let ListenMessages (client : ITcpClient) = MailboxProcessor<lineFeed>.Start( fun
return! innerLoop()
| None -> return! innerLoop()
}
innerLoop ())
innerLoop ())
10 changes: 3 additions & 7 deletions socket/socket.fsproj
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Library</OutputType>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<Compile Include="Async.fs" />
<Compile Include="TcpClientWrapper.fs" />
<Compile Include="TcpMailbox.fs" />
<Compile Include="ConnectionController.fs" />
<Compile Include="Model.fs" />
<Compile Include="Messages.fs" />
<Compile Include="Server.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="FSharp.Control.AsyncSeq" Version="3.2.1" />
<PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="FSharp.Core" Version="7.0.400" />
</ItemGroup>

</Project>
</Project>

0 comments on commit 005ecb2

Please sign in to comment.