Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove FileStream long pinning, and simplify synchronization #51462

Merged
merged 2 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,49 @@

using System.Buffers;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks.Sources;
using TaskSourceCodes = System.IO.Strategies.FileStreamHelpers.TaskSourceCodes;

namespace System.IO.Strategies
{
internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy
{
/// <summary>
/// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync.
/// </summary>
/// <summary>Reusable IValueTaskSource for FileStream ValueTask-returning async operations.</summary>
private sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
{
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;

internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
private readonly AsyncWindowsFileStreamStrategy _strategy;
private MemoryHandle _handle;
internal MemoryHandle _memoryHandle;
internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private NativeOverlapped* _overlapped;
private CancellationTokenRegistration _cancellationRegistration;
private long _result; // Using long since this needs to be used in Interlocked APIs
#if DEBUG
private bool _cancellationHasBeenRegistered;
#endif
/// <summary>
/// 0 when the operation hasn't been scheduled, non-zero when either the operation has completed,
/// in which case its value is a packed combination of the error code and number of bytes, or when
/// the read/write call has finished scheduling the async operation.
/// </summary>
internal ulong _result;

internal ValueTaskSource(AsyncWindowsFileStreamStrategy strategy)
stephentoub marked this conversation as resolved.
Show resolved Hide resolved
{
_strategy = strategy;
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);

_source.RunContinuationsAsynchronously = true;
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);
}

internal NativeOverlapped* Configure(ReadOnlyMemory<byte> memory)
internal void Dispose()
{
_result = TaskSourceCodes.NoResult;
ReleaseResources();
_preallocatedOverlapped.Dispose();
}

_handle = memory.Pin();
internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory)
{
_result = 0;
_memoryHandle = memory.Pin();
_overlapped = _strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);

return _overlapped;
}

Expand All @@ -69,140 +71,110 @@ private int GetResultAndRelease(short token)

internal void RegisterForCancellation(CancellationToken cancellationToken)
{
#if DEBUG
Debug.Assert(cancellationToken.CanBeCanceled);
Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
_cancellationHasBeenRegistered = true;
#endif

// Quick check to make sure the IO hasn't completed
if (_overlapped != null)
Debug.Assert(_overlapped != null);
if (cancellationToken.CanBeCanceled)
{
// Register the cancellation only if the IO hasn't completed
long packedResult = Interlocked.CompareExchange(ref _result, TaskSourceCodes.RegisteringCancellation, TaskSourceCodes.NoResult);
if (packedResult == TaskSourceCodes.NoResult)
try
{
_cancellationRegistration = cancellationToken.UnsafeRegister((s, token) => Cancel(token), this);

// Switch the result, just in case IO completed while we were setting the registration
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
}
else if (packedResult != TaskSourceCodes.CompletedCallback)
{
// Failed to set the result, IO is in the process of completing
// Attempt to take the packed result
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
_cancellationRegistration = cancellationToken.UnsafeRegister(static (s, token) =>
{
ValueTaskSource vts = (ValueTaskSource)s!;
if (!vts._strategy._fileHandle.IsInvalid)
{
try
{
Interop.Kernel32.CancelIoEx(vts._strategy._fileHandle, vts._overlapped);
// Ignore all failures: no matter whether it succeeds or fails, completion is handled via the IOCallback.
}
catch (ObjectDisposedException) { } // in case the SafeHandle is (erroneously) closed concurrently
}
}, this);
}

// If we have a callback that needs to be completed
if ((packedResult != TaskSourceCodes.NoResult) && (packedResult != TaskSourceCodes.CompletedCallback) && (packedResult != TaskSourceCodes.RegisteringCancellation))
catch (OutOfMemoryException)
{
CompleteCallback((ulong)packedResult);
// Just in case trying to register OOMs, we ignore it in order to
// protect the higher-level calling code that would proceed to unpin
// memory that might be actively used by an in-flight async operation.
}
}
}

internal void ReleaseNativeResource()
internal void ReleaseResources()
{
_handle.Dispose();
// Unpin any pinned buffer.
_memoryHandle.Dispose();

// Ensure that cancellation has been completed and cleaned up.
// Ensure that any cancellation callback has either completed or will never run,
// so that we don't try to access an overlapped for this operation after it's already
// been freed.
_cancellationRegistration.Dispose();

// Free the overlapped.
// NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
// (this is why we disposed the registration above).
if (_overlapped != null)
{
_strategy._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped);
_overlapped = null;
}
}

private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
ValueTaskSource valueTaskSource = (ValueTaskSource)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped)!;
Debug.Assert(valueTaskSource._overlapped == pOverlapped, "Overlaps don't match");

// Handle reading from & writing to closed pipes. While I'm not sure
// this is entirely necessary anymore, maybe it's possible for
// an async read on a pipe to be issued and then the pipe is closed,
// returning this error. This may very well be necessary.
ulong packedResult;
if (errorCode != 0 && errorCode != Interop.Errors.ERROR_BROKEN_PIPE && errorCode != Interop.Errors.ERROR_NO_DATA)
{
packedResult = ((ulong)TaskSourceCodes.ResultError | errorCode);
}
else
{
packedResult = ((ulong)TaskSourceCodes.ResultSuccess | numBytes);
}
// After calling Read/WriteFile to start the asynchronous operation, the caller may configure cancellation,
// and only after that should we allow for completing the operation, as completion needs to factor in work
// done by that cancellation registration, e.g. unregistering. As such, we use _result to both track who's
// responsible for calling Complete and for passing the necessary data between parties.

// Stow the result so that other threads can observe it
// And, if no other thread is registering cancellation, continue
if (Interlocked.Exchange(ref valueTaskSource._result, (long)packedResult) == TaskSourceCodes.NoResult)
/// <summary>Invoked when AsyncWindowsFileStreamStrategy has finished scheduling the async operation.</summary>
internal void FinishedScheduling()
{
// Set the value to 1. If it was already non-0, then the asynchronous operation already completed but
// didn't call Complete, so we call Complete here. The read result value is the data (packed) necessary
// to make the call.
ulong result = Interlocked.Exchange(ref _result, 1);
if (result != 0)
{
// Successfully set the state, attempt to take back the callback
if (Interlocked.Exchange(ref valueTaskSource._result, TaskSourceCodes.CompletedCallback) != TaskSourceCodes.NoResult)
{
// Successfully got the callback, finish the callback
valueTaskSource.CompleteCallback(packedResult);
}
// else: Some other thread stole the result, so now it is responsible to finish the callback
Complete(errorCode: (uint)result, numBytes: (uint)(result >> 32) & 0x7FFFFFFF);
}
// else: Some other thread is registering a cancellation, so it *must* finish the callback
}

private void CompleteCallback(ulong packedResult)
/// <summary>Invoked when the asynchronous operation has completed asynchronously.</summary>
private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
CancellationToken cancellationToken = _cancellationRegistration.Token;

ReleaseNativeResource();

// Unpack the result and send it to the user
long result = (long)(packedResult & TaskSourceCodes.ResultMask);
if (result == TaskSourceCodes.ResultError)
ValueTaskSource? vts = (ValueTaskSource?)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(vts is not null);
Debug.Assert(vts._overlapped == pOverlapped, "Overlaps don't match");

// Set the value to a packed combination of the error code and number of bytes (plus a high-bit 1
// to ensure the value we're setting is non-zero). If it was already non-0 (the common case), then
// the call site already finished scheduling the async operation, in which case we're ready to complete.
Debug.Assert(numBytes < int.MaxValue);
if (Interlocked.Exchange(ref vts._result, (1ul << 63) | ((ulong)numBytes << 32) | errorCode) != 0)
{
int errorCode = unchecked((int)(packedResult & uint.MaxValue));
Exception e;
if (errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
{
CancellationToken ct = cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(canceled: true);
e = new OperationCanceledException(ct);
}
else
{
e = Win32Marshal.GetExceptionForWin32Error(errorCode);
}
e.SetCurrentStackTrace();
_source.SetException(e);
}
else
{
Debug.Assert(result == TaskSourceCodes.ResultSuccess, "Unknown result");
_source.SetResult((int)(packedResult & uint.MaxValue));
vts.Complete(errorCode, numBytes);
}
}

private void Cancel(CancellationToken token)
internal void Complete(uint errorCode, uint numBytes)
{
// WARNING: This may potentially be called under a lock (during cancellation registration)
Debug.Assert(_overlapped != null && GetStatus(Version) != ValueTaskSourceStatus.Succeeded, "IO should not have completed yet");
ReleaseResources();

// If the handle is still valid, attempt to cancel the IO
if (!_strategy._fileHandle.IsInvalid &&
!Interop.Kernel32.CancelIoEx(_strategy._fileHandle, _overlapped))
switch (errorCode)
{
int errorCode = Marshal.GetLastWin32Error();

// ERROR_NOT_FOUND is returned if CancelIoEx cannot find the request to cancel.
// This probably means that the IO operation has completed.
if (errorCode != Interop.Errors.ERROR_NOT_FOUND)
{
Exception e = new OperationCanceledException(SR.OperationCanceled, Win32Marshal.GetExceptionForWin32Error(errorCode), token);
e.SetCurrentStackTrace();
_source.SetException(e);
}
case 0:
case Interop.Errors.ERROR_BROKEN_PIPE:
case Interop.Errors.ERROR_NO_DATA:
// Success
_source.SetResult((int)numBytes);
break;

case Interop.Errors.ERROR_OPERATION_ABORTED:
// Cancellation
CancellationToken ct = _cancellationRegistration.Token;
_source.SetException(ct.IsCancellationRequested ? new OperationCanceledException(ct) : new OperationCanceledException());
break;

default:
// Failure
_source.SetException(Win32Marshal.GetExceptionForWin32Error((int)errorCode));
break;
}
}
}
Expand Down
Loading