Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

[2.1.1] Gracefully handle disposing while writing (#2180) #2355

Merged
merged 2 commits into from May 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/Common/PipeWriterStream.cs
Expand Up @@ -83,7 +83,16 @@ private ValueTask WriteCoreAsync(ReadOnlyMemory<byte> source, CancellationToken

return default;

async ValueTask WriteSlowAsync(ValueTask<FlushResult> flushTask) => await flushTask;
async ValueTask WriteSlowAsync(ValueTask<FlushResult> flushTask)
{
var flushResult = await flushTask;

// Cancellation can be triggered by PipeWriter.CancelPendingFlush
if (flushResult.IsCanceled)
{
throw new OperationCanceledException();
}
}
}

public void Reset()
Expand Down
Expand Up @@ -178,7 +178,7 @@ public void TickHeartbeat()

public async Task DisposeAsync(bool closeGracefully = false)
{
var disposeTask = Task.CompletedTask;
Task disposeTask;

await StateLock.WaitAsync();
try
Expand Down Expand Up @@ -267,6 +267,9 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
{
Log.ShuttingDownTransportAndApplication(_logger, TransportType);

// Cancel any pending flushes from back pressure
Application?.Output.CancelPendingFlush();

// Shutdown both sides and wait for nothing
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
Application?.Output.Complete(transportTask.Exception?.InnerException);
Expand Down
Expand Up @@ -11,10 +11,10 @@ public partial class HttpConnectionDispatcher
private static class Log
{
private static readonly Action<ILogger, string, Exception> _connectionDisposed =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(1, "ConnectionDisposed"), "Connection Id {TransportConnectionId} was disposed.");
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(1, "ConnectionDisposed"), "Connection {TransportConnectionId} was disposed.");

private static readonly Action<ILogger, string, string, Exception> _connectionAlreadyActive =
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(2, "ConnectionAlreadyActive"), "Connection Id {TransportConnectionId} is already active via {RequestId}.");
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(2, "ConnectionAlreadyActive"), "Connection {TransportConnectionId} is already active via {RequestId}.");

private static readonly Action<ILogger, string, string, Exception> _pollCanceled =
LoggerMessage.Define<string, string>(LogLevel.Trace, new EventId(3, "PollCanceled"), "Previous poll canceled for {TransportConnectionId} on {RequestId}.");
Expand Down Expand Up @@ -46,6 +46,9 @@ private static class Log
private static readonly Action<ILogger, Exception> _terminatingConnection =
LoggerMessage.Define(LogLevel.Trace, new EventId(12, "TerminatingConection"), "Terminating Long Polling connection due to a DELETE request.");

private static readonly Action<ILogger, string, Exception> _connectionDisposedWhileWriteInProgress =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(13, "ConnectionDisposedWhileWriteInProgress"), "Connection {TransportConnectionId} was disposed while a write was in progress.");

public static void ConnectionDisposed(ILogger logger, string connectionId)
{
_connectionDisposed(logger, connectionId, null);
Expand Down Expand Up @@ -105,6 +108,11 @@ public static void TerminatingConection(ILogger logger)
{
_terminatingConnection(logger, null);
}

public static void ConnectionDisposedWhileWriteInProgress(ILogger logger, string connectionId, Exception ex)
{
_connectionDisposedWhileWriteInProgress(logger, connectionId, ex);
}
}
}
}
Expand Up @@ -479,12 +479,40 @@ private async Task ProcessSend(HttpContext context, HttpConnectionDispatcherOpti
return;
}

await context.Request.Body.CopyToAsync(connection.ApplicationStream, bufferSize);
try
{
try
{
await context.Request.Body.CopyToAsync(connection.ApplicationStream, bufferSize);
}
catch (InvalidOperationException ex)
{
// PipeWriter will throw an error if it is written to while dispose is in progress and the writer has been completed
// Dispose isn't taking WriteLock because it could be held because of backpressure, and calling CancelPendingFlush
// then taking the lock introduces a race condition that could lead to a deadlock
Log.ConnectionDisposedWhileWriteInProgress(_logger, connection.ConnectionId, ex);

Log.ReceivedBytes(_logger, connection.ApplicationStream.Length);
context.Response.StatusCode = StatusCodes.Status404NotFound;
context.Response.ContentType = "text/plain";
return;
}
catch (OperationCanceledException)
{
// CancelPendingFlush has canceled pending writes caused by backpresure
Log.ConnectionDisposed(_logger, connection.ConnectionId);

context.Response.StatusCode = StatusCodes.Status404NotFound;
context.Response.ContentType = "text/plain";
return;
}

// Clear the amount of read bytes so logging is accurate
connection.ApplicationStream.Reset();
Log.ReceivedBytes(_logger, connection.ApplicationStream.Length);
}
finally
{
// Clear the amount of read bytes so logging is accurate
connection.ApplicationStream.Reset();
}
}
finally
{
Expand Down