Skip to content

Commit

Permalink
AnonymousPipeStreams.Linux, Process.Unix Streams: perform async I/O b…
Browse files Browse the repository at this point in the history
…y using Socket implementation (#44647)

* experiment: use read/write for socket operations instead of recvmsg/sendmsg

* AnonymousPipeStreams: perform async I/O by using Socket implementation

* PipeStream.Unix: use sync/async functions on Socket

* Refactor Socket creation

* Cleanup

* Fix broken comment

* Allow passing pipe fd to Socket/SafeSocketHandle public ctor

* macOS: Handle EPIPE on registration

* Refactor IsPipe to IsSocket

* Move registration error handling in to single method

* Process: use AnonymousPipeClientStream for redirected streams.

* Remove unneeded BufferedStream

* PR feedback

* PipeStream.Unix: enable cancelation tests

* Add back Ctor_SafeHandle_Invalid_ThrowsException test

* SysRead/SysWrite: Assert not used for socket-type handle
  • Loading branch information
tmds committed Mar 17, 2021
1 parent ffd5c13 commit 2606d82
Show file tree
Hide file tree
Showing 21 changed files with 794 additions and 451 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@
<Reference Include="System.IO.FileSystem" />
<Reference Include="System.IO.FileSystem.DriveInfo" />
<Reference Include="System.Memory" />
<Reference Include="System.IO.Pipes" />
<Reference Include="System.Runtime" />
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Runtime.InteropServices" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.IO.Pipes;
using System.Security;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -439,20 +440,20 @@ private bool StartCore(ProcessStartInfo startInfo)
if (startInfo.RedirectStandardInput)
{
Debug.Assert(stdinFd >= 0);
_standardInput = new StreamWriter(OpenStream(stdinFd, FileAccess.Write),
_standardInput = new StreamWriter(OpenStream(stdinFd, PipeDirection.Out),
startInfo.StandardInputEncoding ?? Encoding.Default, StreamBufferSize)
{ AutoFlush = true };
}
if (startInfo.RedirectStandardOutput)
{
Debug.Assert(stdoutFd >= 0);
_standardOutput = new StreamReader(OpenStream(stdoutFd, FileAccess.Read),
_standardOutput = new StreamReader(OpenStream(stdoutFd, PipeDirection.In),
startInfo.StandardOutputEncoding ?? Encoding.Default, true, StreamBufferSize);
}
if (startInfo.RedirectStandardError)
{
Debug.Assert(stderrFd >= 0);
_standardError = new StreamReader(OpenStream(stderrFd, FileAccess.Read),
_standardError = new StreamReader(OpenStream(stderrFd, PipeDirection.In),
startInfo.StandardErrorEncoding ?? Encoding.Default, true, StreamBufferSize);
}

Expand Down Expand Up @@ -747,14 +748,12 @@ internal static TimeSpan TicksToTimeSpan(double ticks)

/// <summary>Opens a stream around the specified file descriptor and with the specified access.</summary>
/// <param name="fd">The file descriptor.</param>
/// <param name="access">The access mode.</param>
/// <param name="direction">The pipe direction.</param>
/// <returns>The opened stream.</returns>
private static FileStream OpenStream(int fd, FileAccess access)
private static Stream OpenStream(int fd, PipeDirection direction)
{
Debug.Assert(fd >= 0);
return new FileStream(
new SafeFileHandle((IntPtr)fd, ownsHandle: true),
access, StreamBufferSize, isAsync: false);
return new AnonymousPipeClientStream(direction, new SafePipeHandle((IntPtr)fd, ownsHandle: true));
}

/// <summary>Parses a command-line argument string into a list of arguments.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ async private Task<bool> WaitPipeSignal(PipeStream pipe, int millisecond)
}
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/44329")]
[PlatformSpecific(~TestPlatforms.Windows)] // currently on Windows these operations async-over-sync on Windows
[ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))]
public async Task ReadAsync_OutputStreams_Cancel_RespondsQuickly()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,70 +7,136 @@
using System.Reflection;
using System.Runtime.InteropServices;
using System.Security;
using System.Threading;

namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafePipeHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private const int DefaultInvalidHandle = -1;

// For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe, and the
// _named* fields remain null. For named pipes, SafePipeHandle.handle is a copy of the file descriptor
// extracted from the Socket's SafeHandle, and the _named* fields are the socket and its safe handle.
// For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe.
// For named pipes, SafePipeHandle.handle is a copy of the file descriptor
// extracted from the Socket's SafeHandle.
// This allows operations related to file descriptors to be performed directly on the SafePipeHandle,
// and operations that should go through the Socket to be done via _namedPipeSocket. We keep the
// and operations that should go through the Socket to be done via PipeSocket. We keep the
// Socket's SafeHandle alive as long as this SafeHandle is alive.

private Socket? _namedPipeSocket;
private SafeHandle? _namedPipeSocketHandle;
private Socket? _pipeSocket;
private SafeHandle? _pipeSocketHandle;
private volatile int _disposed;

internal SafePipeHandle(Socket namedPipeSocket) : base(ownsHandle: true)
{
Debug.Assert(namedPipeSocket != null);
_namedPipeSocket = namedPipeSocket;

_namedPipeSocketHandle = namedPipeSocket.SafeHandle;

bool ignored = false;
_namedPipeSocketHandle.DangerousAddRef(ref ignored);
SetHandle(_namedPipeSocketHandle.DangerousGetHandle());
SetPipeSocketInterlocked(namedPipeSocket, ownsHandle: true);
base.SetHandle(_pipeSocketHandle!.DangerousGetHandle());
}

internal Socket? NamedPipeSocket => _namedPipeSocket;
internal SafeHandle? NamedPipeSocketHandle => _namedPipeSocketHandle;
internal Socket PipeSocket => _pipeSocket ?? CreatePipeSocket();

internal SafeHandle? PipeSocketHandle => _pipeSocketHandle;

protected override void Dispose(bool disposing)
{
base.Dispose(disposing); // must be called before trying to Dispose the socket
if (disposing && _namedPipeSocket != null)
_disposed = 1;
if (disposing && Volatile.Read(ref _pipeSocket) is Socket socket)
{
_namedPipeSocket.Dispose();
_namedPipeSocket = null;
socket.Dispose();
_pipeSocket = null;
}
}

protected override bool ReleaseHandle()
{
Debug.Assert(!IsInvalid);

// Clean up resources for named handles
if (_namedPipeSocketHandle != null)
if (_pipeSocketHandle != null)
{
SetHandle(DefaultInvalidHandle);
_namedPipeSocketHandle.DangerousRelease();
_namedPipeSocketHandle = null;
base.SetHandle((IntPtr)DefaultInvalidHandle);
_pipeSocketHandle.DangerousRelease();
_pipeSocketHandle = null;
return true;
}

// Clean up resources for anonymous handles
return (long)handle >= 0 ?
Interop.Sys.Close(handle) == 0 :
true;
else
{
return (long)handle >= 0 ?
Interop.Sys.Close(handle) == 0 :
true;
}
}

public override bool IsInvalid
{
get { return (long)handle < 0 && _namedPipeSocket == null; }
get { return (long)handle < 0 && _pipeSocket == null; }
}

private Socket CreatePipeSocket(bool ownsHandle = true)
{
Socket? socket = null;
if (_disposed == 0)
{
bool refAdded = false;
try
{
DangerousAddRef(ref refAdded);

socket = SetPipeSocketInterlocked(new Socket(new SafeSocketHandle(handle, ownsHandle)), ownsHandle);

// Double check if we haven't Disposed in the meanwhile, and ensure
// the Socket is disposed, in case Dispose() missed the _pipeSocket assignment.
if (_disposed == 1)
{
Volatile.Write(ref _pipeSocket, null);
socket.Dispose();
socket = null;
}
}
finally
{
if (refAdded)
{
DangerousRelease();
}
}
}
return socket ?? throw new ObjectDisposedException(GetType().ToString());;
}

private Socket SetPipeSocketInterlocked(Socket socket, bool ownsHandle)
{
Debug.Assert(socket != null);

// Multiple threads may try to create the PipeSocket.
Socket? current = Interlocked.CompareExchange(ref _pipeSocket, socket, null);
if (current != null)
{
socket.Dispose();
return current;
}

// If we own the handle, defer ownership to the SocketHandle.
SafeSocketHandle socketHandle = _pipeSocket.SafeHandle;
if (ownsHandle)
{
_pipeSocketHandle = socketHandle;

bool ignored = false;
socketHandle.DangerousAddRef(ref ignored);
}

return socket;
}

internal void SetHandle(IntPtr descriptor, bool ownsHandle = true)
{
base.SetHandle(descriptor);

// Avoid throwing when we own the handle by defering pipe creation.
if (!ownsHandle)
{
_pipeSocket = CreatePipeSocket(ownsHandle);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,10 @@ protected override bool ReleaseHandle()
{
return Interop.Kernel32.CloseHandle(handle);
}

internal void SetHandle(IntPtr descriptor, bool ownsHandle = true)
{
base.SetHandle(descriptor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ public SafePipeHandle()
public SafePipeHandle(IntPtr preexistingHandle, bool ownsHandle)
: base(ownsHandle)
{
SetHandle(preexistingHandle);
}

internal void SetHandle(int descriptor)
{
base.SetHandle((IntPtr)descriptor);
SetHandle(preexistingHandle, ownsHandle);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public override int InBufferSize
{
CheckPipePropertyOperations();
if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream);
return InternalHandle?.NamedPipeSocket?.ReceiveBufferSize ?? 0;
return InternalHandle?.PipeSocket.ReceiveBufferSize ?? 0;
}
}

Expand All @@ -93,7 +93,7 @@ public override int OutBufferSize
{
CheckPipePropertyOperations();
if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream);
return InternalHandle?.NamedPipeSocket?.SendBufferSize ?? 0;
return InternalHandle?.PipeSocket.SendBufferSize ?? 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public string GetImpersonationUserName()
{
CheckWriteOperations();

SafeHandle? handle = InternalHandle?.NamedPipeSocketHandle;
SafeHandle? handle = InternalHandle?.PipeSocketHandle;
if (handle == null)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeHandleNotSet);
Expand All @@ -155,7 +155,7 @@ public override int InBufferSize
{
CheckPipePropertyOperations();
if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream);
return InternalHandle?.NamedPipeSocket?.ReceiveBufferSize ?? _inBufferSize;
return InternalHandle?.PipeSocket.ReceiveBufferSize ?? _inBufferSize;
}
}

Expand All @@ -165,15 +165,15 @@ public override int OutBufferSize
{
CheckPipePropertyOperations();
if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream);
return InternalHandle?.NamedPipeSocket?.SendBufferSize ?? _outBufferSize;
return InternalHandle?.PipeSocket.SendBufferSize ?? _outBufferSize;
}
}

// This method calls a delegate while impersonating the client.
public void RunAsClient(PipeStreamImpersonationWorker impersonationWorker)
{
CheckWriteOperations();
SafeHandle? handle = InternalHandle?.NamedPipeSocketHandle;
SafeHandle? handle = InternalHandle?.PipeSocketHandle;
if (handle == null)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeHandleNotSet);
Expand Down
Loading

0 comments on commit 2606d82

Please sign in to comment.