Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AnonymousPipeStreams.Linux, Process.Unix Streams: perform async I/O by using Socket implementation #44647

Merged
merged 16 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@
<Reference Include="System.IO.FileSystem" />
<Reference Include="System.IO.FileSystem.DriveInfo" />
<Reference Include="System.Memory" />
<Reference Include="System.IO.Pipes" />
<Reference Include="System.Runtime" />
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Runtime.InteropServices" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.IO.Pipes;
using System.Security;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -445,20 +446,20 @@ private bool StartCore(ProcessStartInfo startInfo)
if (startInfo.RedirectStandardInput)
{
Debug.Assert(stdinFd >= 0);
_standardInput = new StreamWriter(OpenStream(stdinFd, FileAccess.Write),
_standardInput = new StreamWriter(OpenStream(stdinFd, PipeDirection.Out),
startInfo.StandardInputEncoding ?? Encoding.Default, StreamBufferSize)
{ AutoFlush = true };
}
if (startInfo.RedirectStandardOutput)
{
Debug.Assert(stdoutFd >= 0);
_standardOutput = new StreamReader(OpenStream(stdoutFd, FileAccess.Read),
_standardOutput = new StreamReader(OpenStream(stdoutFd, PipeDirection.In),
startInfo.StandardOutputEncoding ?? Encoding.Default, true, StreamBufferSize);
}
if (startInfo.RedirectStandardError)
{
Debug.Assert(stderrFd >= 0);
_standardError = new StreamReader(OpenStream(stderrFd, FileAccess.Read),
_standardError = new StreamReader(OpenStream(stderrFd, PipeDirection.In),
startInfo.StandardErrorEncoding ?? Encoding.Default, true, StreamBufferSize);
}

Expand Down Expand Up @@ -753,14 +754,12 @@ internal static TimeSpan TicksToTimeSpan(double ticks)

/// <summary>Opens a stream around the specified file descriptor and with the specified access.</summary>
/// <param name="fd">The file descriptor.</param>
/// <param name="access">The access mode.</param>
/// <param name="direction">The pipe direction.</param>
/// <returns>The opened stream.</returns>
private static FileStream OpenStream(int fd, FileAccess access)
private static Stream OpenStream(int fd, PipeDirection direction)
{
Debug.Assert(fd >= 0);
return new FileStream(
new SafeFileHandle((IntPtr)fd, ownsHandle: true),
access, StreamBufferSize, isAsync: false);
return new AnonymousPipeClientStream(direction, new SafePipeHandle((IntPtr)fd, ownsHandle: true));
}

/// <summary>Parses a command-line argument string into a list of arguments.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ async private Task<bool> WaitPipeSignal(PipeStream pipe, int millisecond)
}
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/44329")]
[PlatformSpecific(~TestPlatforms.Windows)] // currently on Windows these operations async-over-sync on Windows
[ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))]
public async Task ReadAsync_OutputStreams_Cancel_RespondsQuickly()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,70 +7,136 @@
using System.Reflection;
using System.Runtime.InteropServices;
using System.Security;
using System.Threading;

namespace Microsoft.Win32.SafeHandles
{
public sealed partial class SafePipeHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private const int DefaultInvalidHandle = -1;

// For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe, and the
// _named* fields remain null. For named pipes, SafePipeHandle.handle is a copy of the file descriptor
// extracted from the Socket's SafeHandle, and the _named* fields are the socket and its safe handle.
// For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe.
// For named pipes, SafePipeHandle.handle is a copy of the file descriptor
tmds marked this conversation as resolved.
Show resolved Hide resolved
// extracted from the Socket's SafeHandle.
// This allows operations related to file descriptors to be performed directly on the SafePipeHandle,
// and operations that should go through the Socket to be done via _namedPipeSocket. We keep the
// and operations that should go through the Socket to be done via PipeSocket. We keep the
// Socket's SafeHandle alive as long as this SafeHandle is alive.

private Socket? _namedPipeSocket;
private SafeHandle? _namedPipeSocketHandle;
private Socket? _pipeSocket;
private SafeHandle? _pipeSocketHandle;
private volatile int _disposed;

internal SafePipeHandle(Socket namedPipeSocket) : base(ownsHandle: true)
{
Debug.Assert(namedPipeSocket != null);
_namedPipeSocket = namedPipeSocket;

_namedPipeSocketHandle = namedPipeSocket.SafeHandle;

bool ignored = false;
_namedPipeSocketHandle.DangerousAddRef(ref ignored);
SetHandle(_namedPipeSocketHandle.DangerousGetHandle());
SetPipeSocketInterlocked(namedPipeSocket, ownsHandle: true);
base.SetHandle(_pipeSocketHandle!.DangerousGetHandle());
}

internal Socket? NamedPipeSocket => _namedPipeSocket;
internal SafeHandle? NamedPipeSocketHandle => _namedPipeSocketHandle;
internal Socket PipeSocket => _pipeSocket ?? CreatePipeSocket();

internal SafeHandle? PipeSocketHandle => _pipeSocketHandle;

protected override void Dispose(bool disposing)
{
base.Dispose(disposing); // must be called before trying to Dispose the socket
if (disposing && _namedPipeSocket != null)
_disposed = 1;
if (disposing && Volatile.Read(ref _pipeSocket) is Socket socket)
{
_namedPipeSocket.Dispose();
_namedPipeSocket = null;
socket.Dispose();
_pipeSocket = null;
}
}

protected override bool ReleaseHandle()
{
Debug.Assert(!IsInvalid);

// Clean up resources for named handles
if (_namedPipeSocketHandle != null)
if (_pipeSocketHandle != null)
{
SetHandle(DefaultInvalidHandle);
_namedPipeSocketHandle.DangerousRelease();
_namedPipeSocketHandle = null;
base.SetHandle((IntPtr)DefaultInvalidHandle);
_pipeSocketHandle.DangerousRelease();
_pipeSocketHandle = null;
return true;
}

// Clean up resources for anonymous handles
return (long)handle >= 0 ?
Interop.Sys.Close(handle) == 0 :
true;
else
stephentoub marked this conversation as resolved.
Show resolved Hide resolved
{
return (long)handle >= 0 ?
Interop.Sys.Close(handle) == 0 :
true;
}
}

public override bool IsInvalid
{
get { return (long)handle < 0 && _namedPipeSocket == null; }
get { return (long)handle < 0 && _pipeSocket == null; }
}

private Socket CreatePipeSocket(bool ownsHandle = true)
{
Socket? socket = null;
if (_disposed == 0)
{
bool refAdded = false;
try
{
DangerousAddRef(ref refAdded);

socket = SetPipeSocketInterlocked(new Socket(new SafeSocketHandle(handle, ownsHandle)), ownsHandle);

// Double check if we haven't Disposed in the meanwhile, and ensure
// the Socket is disposed, in case Dispose() missed the _pipeSocket assignment.
if (_disposed == 1)
{
Volatile.Write(ref _pipeSocket, null);
socket.Dispose();
socket = null;
}
tmds marked this conversation as resolved.
Show resolved Hide resolved
}
finally
{
if (refAdded)
{
DangerousRelease();
}
}
}
return socket ?? throw new ObjectDisposedException(GetType().ToString());;
}

private Socket SetPipeSocketInterlocked(Socket socket, bool ownsHandle)
{
Debug.Assert(socket != null);

// Multiple threads may try to create the PipeSocket.
Socket? current = Interlocked.CompareExchange(ref _pipeSocket, socket, null);
if (current != null)
{
socket.Dispose();
return current;
}

// If we own the handle, defer ownership to the SocketHandle.
SafeSocketHandle socketHandle = _pipeSocket.SafeHandle;
if (ownsHandle)
{
_pipeSocketHandle = socketHandle;

bool ignored = false;
socketHandle.DangerousAddRef(ref ignored);
}

return socket;
}

internal void SetHandle(IntPtr descriptor, bool ownsHandle = true)
{
base.SetHandle(descriptor);

// Avoid throwing when we own the handle by defering pipe creation.
if (!ownsHandle)
{
_pipeSocket = CreatePipeSocket(ownsHandle);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,10 @@ protected override bool ReleaseHandle()
{
return Interop.Kernel32.CloseHandle(handle);
}

internal void SetHandle(IntPtr descriptor, bool ownsHandle = true)
{
base.SetHandle(descriptor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ public SafePipeHandle()
public SafePipeHandle(IntPtr preexistingHandle, bool ownsHandle)
: base(ownsHandle)
{
SetHandle(preexistingHandle);
}

internal void SetHandle(int descriptor)
{
base.SetHandle((IntPtr)descriptor);
SetHandle(preexistingHandle, ownsHandle);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public override int InBufferSize
{
CheckPipePropertyOperations();
if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream);
return InternalHandle?.NamedPipeSocket?.ReceiveBufferSize ?? 0;
return InternalHandle?.PipeSocket.ReceiveBufferSize ?? 0;
}
}

Expand All @@ -93,7 +93,7 @@ public override int OutBufferSize
{
CheckPipePropertyOperations();
if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream);
return InternalHandle?.NamedPipeSocket?.SendBufferSize ?? 0;
return InternalHandle?.PipeSocket.SendBufferSize ?? 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public string GetImpersonationUserName()
{
CheckWriteOperations();

SafeHandle? handle = InternalHandle?.NamedPipeSocketHandle;
SafeHandle? handle = InternalHandle?.PipeSocketHandle;
if (handle == null)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeHandleNotSet);
Expand All @@ -155,7 +155,7 @@ public override int InBufferSize
{
CheckPipePropertyOperations();
if (!CanRead) throw new NotSupportedException(SR.NotSupported_UnreadableStream);
return InternalHandle?.NamedPipeSocket?.ReceiveBufferSize ?? _inBufferSize;
return InternalHandle?.PipeSocket.ReceiveBufferSize ?? _inBufferSize;
}
}

Expand All @@ -165,15 +165,15 @@ public override int OutBufferSize
{
CheckPipePropertyOperations();
if (!CanWrite) throw new NotSupportedException(SR.NotSupported_UnwritableStream);
return InternalHandle?.NamedPipeSocket?.SendBufferSize ?? _outBufferSize;
return InternalHandle?.PipeSocket.SendBufferSize ?? _outBufferSize;
}
}

// This method calls a delegate while impersonating the client.
public void RunAsClient(PipeStreamImpersonationWorker impersonationWorker)
{
CheckWriteOperations();
SafeHandle? handle = InternalHandle?.NamedPipeSocketHandle;
SafeHandle? handle = InternalHandle?.PipeSocketHandle;
if (handle == null)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeHandleNotSet);
Expand Down
Loading