Skip to content

Commit

Permalink
Add a Task to synchronize with the sendToSocket pipe
Browse files Browse the repository at this point in the history
having forwarded all data to the socket after the
Writer has been completed
  • Loading branch information
Kai Ruhnau committed Sep 16, 2019
1 parent dc9b643 commit 43b172b
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/Pipelines.Sockets.Unofficial/SocketConnection.Send.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private async Task DoSendAsync()
try { _sendToSocket.Writer.Complete(error); } catch { }
try { _sendToSocket.Reader.Complete(error); } catch { }
TrySetShutdown(error, this, PipeShutdownKind.OutputReaderCompleted);
_readerCompletedTcs.TrySetResult(0);

var args = _writerArgs;
_writerArgs = null;
Expand Down
3 changes: 3 additions & 0 deletions src/Pipelines.Sockets.Unofficial/SocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public void Dispose()
/// Connection for sending data
/// </summary>
public PipeWriter Output => _output;
public Task ReaderCompleted => _readerCompletedTcs.Task;
private string Name { get; }

/// <summary>
Expand Down Expand Up @@ -320,6 +321,7 @@ internal Counters(int available, long sendLength, long receiveLength)
private readonly Pipe _sendToSocket, _receiveFromSocket;
private readonly PipeReader _input; // was _receiveFromSocket.Reader;
private readonly PipeWriter _output; // was _sendToSocket.Writer;
private readonly TaskCompletionSource<int> _readerCompletedTcs;

// TODO: flagify and fully implement
#pragma warning disable CS0414, CS0649, IDE0044, IDE0051, IDE0052
Expand Down Expand Up @@ -360,6 +362,7 @@ private SocketConnection(Socket socket, PipeOptions sendPipeOptions, PipeOptions
_receiveFromSocket = new Pipe(receivePipeOptions);
_receiveOptions = receivePipeOptions;
_sendOptions = sendPipeOptions;
_readerCompletedTcs = new TaskCompletionSource<int>();

_input = new WrappedReader(_receiveFromSocket.Reader, this);
_output = new WrappedWriter(_sendToSocket.Writer, this);
Expand Down

0 comments on commit 43b172b

Please sign in to comment.