Skip to content

Commit

Permalink
Unified response body features (#12328)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tratcher committed Jul 25, 2019
1 parent 2884ef6 commit 0806609
Show file tree
Hide file tree
Showing 78 changed files with 1,217 additions and 1,184 deletions.
5 changes: 2 additions & 3 deletions src/Hosting/TestHost/src/HttpContextBuilder.cs
Expand Up @@ -45,15 +45,13 @@ internal HttpContextBuilder(ApplicationWrapper application, bool allowSynchronou
_responseReaderStream = new ResponseBodyReaderStream(pipe, ClientInitiatedAbort, () => _responseReadCompleteCallback?.Invoke(_httpContext));
_responsePipeWriter = new ResponseBodyPipeWriter(pipe, ReturnResponseMessageAsync);
_responseFeature.Body = new ResponseBodyWriterStream(_responsePipeWriter, () => AllowSynchronousIO);
_responseFeature.BodySnapshot = _responseFeature.Body;
_responseFeature.BodyWriter = _responsePipeWriter;

_httpContext.Features.Set<IHttpBodyControlFeature>(this);
_httpContext.Features.Set<IHttpResponseFeature>(_responseFeature);
_httpContext.Features.Set<IHttpResponseStartFeature>(_responseFeature);
_httpContext.Features.Set<IHttpResponseBodyFeature>(_responseFeature);
_httpContext.Features.Set<IHttpRequestLifetimeFeature>(_requestLifetimeFeature);
_httpContext.Features.Set<IHttpResponseTrailersFeature>(_responseTrailersFeature);
_httpContext.Features.Set<IResponseBodyPipeFeature>(_responseFeature);
}

public bool AllowSynchronousIO { get; set; }
Expand Down Expand Up @@ -183,6 +181,7 @@ internal async Task ReturnResponseMessageAsync()
Body = _responseReaderStream
};
newFeatures.Set<IHttpResponseFeature>(clientResponseFeature);
newFeatures.Set<IHttpResponseBodyFeature>(new StreamResponseBodyFeature(_responseReaderStream));
_responseTcs.TrySetResult(new DefaultHttpContext(newFeatures));
}
}
Expand Down
39 changes: 17 additions & 22 deletions src/Hosting/TestHost/src/ResponseFeature.cs
Expand Up @@ -11,7 +11,7 @@

namespace Microsoft.AspNetCore.TestHost
{
internal class ResponseFeature : IHttpResponseFeature, IHttpResponseStartFeature, IResponseBodyPipeFeature
internal class ResponseFeature : IHttpResponseFeature, IHttpResponseBodyFeature
{
private readonly HeaderDictionary _headers = new HeaderDictionary();
private readonly Action<Exception> _abort;
Expand All @@ -24,7 +24,6 @@ internal class ResponseFeature : IHttpResponseFeature, IHttpResponseStartFeature
public ResponseFeature(Action<Exception> abort)
{
Headers = _headers;
Body = new MemoryStream();

// 200 is the default status code all the way down to the host, so we set it
// here to be consistent with the rest of the hosts when writing tests.
Expand Down Expand Up @@ -68,29 +67,11 @@ public string ReasonPhrase

public Stream Body { get; set; }

internal Stream BodySnapshot { get; set; }
public Stream Stream => Body;

internal PipeWriter BodyWriter { get; set; }

public PipeWriter Writer
{
get
{
if (!ReferenceEquals(BodySnapshot, Body))
{
BodySnapshot = Body;
BodyWriter = PipeWriter.Create(Body);

OnCompleted((self) =>
{
((PipeWriter)self).Complete();
return Task.CompletedTask;
}, BodyWriter);
}

return BodyWriter;
}
}
public PipeWriter Writer => BodyWriter;

public bool HasStarted { get; set; }

Expand Down Expand Up @@ -158,5 +139,19 @@ public async Task StartAsync(CancellationToken token = default)
throw;
}
}

public void DisableBuffering()
{
}

public Task SendFileAsync(string path, long offset, long? count, CancellationToken cancellation)
{
return SendFileFallback.SendFileAsync(Stream, path, offset, count, cancellation);
}

public Task CompleteAsync()
{
return Writer.CompleteAsync().AsTask();
}
}
}
4 changes: 2 additions & 2 deletions src/Hosting/TestHost/test/ClientHandlerTests.cs
Expand Up @@ -51,7 +51,7 @@ public Task ExpectedKeysAreInFeatures()
{
var handler = new ClientHandler(new PathString("/A/Path/"), new InspectingApplication(features =>
{
// TODO: Assert.True(context.RequestAborted.CanBeCanceled);
Assert.True(features.Get<IHttpRequestLifetimeFeature>().RequestAborted.CanBeCanceled);
Assert.Equal("HTTP/1.1", features.Get<IHttpRequestFeature>().Protocol);
Assert.Equal("GET", features.Get<IHttpRequestFeature>().Method);
Assert.Equal("https", features.Get<IHttpRequestFeature>().Scheme);
Expand All @@ -61,7 +61,7 @@ public Task ExpectedKeysAreInFeatures()
Assert.NotNull(features.Get<IHttpRequestFeature>().Body);
Assert.NotNull(features.Get<IHttpRequestFeature>().Headers);
Assert.NotNull(features.Get<IHttpResponseFeature>().Headers);
Assert.NotNull(features.Get<IHttpResponseFeature>().Body);
Assert.NotNull(features.Get<IHttpResponseBodyFeature>().Stream);
Assert.Equal(200, features.Get<IHttpResponseFeature>().StatusCode);
Assert.Null(features.Get<IHttpResponseFeature>().ReasonPhrase);
Assert.Equal("example.com", features.Get<IHttpRequestFeature>().Headers["host"]);
Expand Down
Expand Up @@ -282,6 +282,7 @@ public abstract partial class HttpResponse
public abstract Microsoft.AspNetCore.Http.IHeaderDictionary Headers { get; }
public abstract Microsoft.AspNetCore.Http.HttpContext HttpContext { get; }
public abstract int StatusCode { get; set; }
public virtual System.Threading.Tasks.Task CompleteAsync() { throw null; }
public abstract void OnCompleted(System.Func<object, System.Threading.Tasks.Task> callback, object state);
public virtual void OnCompleted(System.Func<System.Threading.Tasks.Task> callback) { }
public abstract void OnStarting(System.Func<object, System.Threading.Tasks.Task> callback, object state);
Expand Down
10 changes: 7 additions & 3 deletions src/Http/Http.Abstractions/src/HttpResponse.cs
Expand Up @@ -127,9 +127,13 @@ public abstract class HttpResponse
/// Starts the response by calling OnStarting() and making headers unmodifiable.
/// </summary>
/// <param name="cancellationToken"></param>
/// <remarks>
/// If the <see cref="IHttpResponseStartFeature"/> isn't set, StartAsync will default to calling HttpResponse.Body.FlushAsync().
/// </remarks>
public virtual Task StartAsync(CancellationToken cancellationToken = default) { throw new NotImplementedException(); }

/// <summary>
/// Flush any remaining response headers, data, or trailers.
/// This may throw if the response is in an invalid state such as a Content-Length mismatch.
/// </summary>
/// <returns></returns>
public virtual Task CompleteAsync() { throw new NotImplementedException(); }
}
}
Expand Up @@ -55,7 +55,6 @@ public partial class QueryBuilder : System.Collections.Generic.IEnumerable<Syste
}
public static partial class StreamCopyOperation
{
[System.Diagnostics.DebuggerStepThroughAttribute]
public static System.Threading.Tasks.Task CopyToAsync(System.IO.Stream source, System.IO.Stream destination, long? count, int bufferSize, System.Threading.CancellationToken cancel) { throw null; }
public static System.Threading.Tasks.Task CopyToAsync(System.IO.Stream source, System.IO.Stream destination, long? count, System.Threading.CancellationToken cancel) { throw null; }
}
Expand Down
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>ASP.NET Core common extension methods for HTTP abstractions, HTTP headers, HTTP request/response, and session state.</Description>
Expand All @@ -9,6 +9,10 @@
<PackageTags>aspnetcore</PackageTags>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\..\Shared\StreamCopyOperationInternal.cs" Link="StreamCopyOperationInternal.cs" />
</ItemGroup>

<ItemGroup>
<Reference Include="Microsoft.AspNetCore.Http.Abstractions" />
<Reference Include="Microsoft.Net.Http.Headers" />
Expand Down
37 changes: 2 additions & 35 deletions src/Http/Http.Extensions/src/SendFileResponseExtensions.cs
Expand Up @@ -128,43 +128,10 @@ private static async Task SendFileAsyncCore(HttpResponse response, IFileInfo fil

private static Task SendFileAsyncCore(HttpResponse response, string fileName, long offset, long? count, CancellationToken cancellationToken = default)
{
var sendFile = response.HttpContext.Features.Get<IHttpSendFileFeature>();
if (sendFile == null)
{
return SendFileAsyncCore(response.Body, fileName, offset, count, cancellationToken);
}

var sendFile = response.HttpContext.Features.Get<IHttpResponseBodyFeature>();
return sendFile.SendFileAsync(fileName, offset, count, cancellationToken);
}

// Not safe for overlapped writes.
private static async Task SendFileAsyncCore(Stream outputStream, string fileName, long offset, long? count, CancellationToken cancel = default)
{
cancel.ThrowIfCancellationRequested();

var fileInfo = new FileInfo(fileName);
CheckRange(offset, count, fileInfo.Length);

int bufferSize = 1024 * 16;
var fileStream = new FileStream(
fileName,
FileMode.Open,
FileAccess.Read,
FileShare.ReadWrite,
bufferSize: bufferSize,
options: FileOptions.Asynchronous | FileOptions.SequentialScan);

using (fileStream)
{
if (offset > 0)
{
fileStream.Seek(offset, SeekOrigin.Begin);
}

await StreamCopyOperation.CopyToAsync(fileStream, outputStream, count, cancel);
}
}

private static void CheckRange(long offset, long? count, long fileLength)
{
if (offset < 0 || offset > fileLength)
Expand All @@ -178,4 +145,4 @@ private static void CheckRange(long offset, long? count, long fileLength)
}
}
}
}
}
67 changes: 6 additions & 61 deletions src/Http/Http.Extensions/src/StreamCopyOperation.cs
@@ -1,9 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -13,75 +10,23 @@ namespace Microsoft.AspNetCore.Http.Extensions
// FYI: In most cases the source will be a FileStream and the destination will be to the network.
public static class StreamCopyOperation
{
private const int DefaultBufferSize = 4096;

/// <summary>Asynchronously reads the bytes from the source stream and writes them to another stream.</summary>
/// <summary>Asynchronously reads the given number of bytes from the source stream and writes them to another stream.</summary>
/// <returns>A task that represents the asynchronous copy operation.</returns>
/// <param name="source">The stream from which the contents will be copied.</param>
/// <param name="destination">The stream to which the contents of the current stream will be copied.</param>
/// <param name="count">The count of bytes to be copied.</param>
/// <param name="cancel">The token to monitor for cancellation requests. The default value is <see cref="P:System.Threading.CancellationToken.None" />.</param>
public static Task CopyToAsync(Stream source, Stream destination, long? count, CancellationToken cancel)
{
return CopyToAsync(source, destination, count, DefaultBufferSize, cancel);
}
=> StreamCopyOperationInternal.CopyToAsync(source, destination, count, cancel);

/// <summary>Asynchronously reads the bytes from the source stream and writes them to another stream, using a specified buffer size.</summary>
/// <summary>Asynchronously reads the given number of bytes from the source stream and writes them to another stream, using a specified buffer size.</summary>
/// <returns>A task that represents the asynchronous copy operation.</returns>
/// <param name="source">The stream from which the contents will be copied.</param>
/// <param name="destination">The stream to which the contents of the current stream will be copied.</param>
/// <param name="count">The count of bytes to be copied.</param>
/// <param name="bufferSize">The size, in bytes, of the buffer. This value must be greater than zero. The default size is 4096.</param>
/// <param name="cancel">The token to monitor for cancellation requests. The default value is <see cref="P:System.Threading.CancellationToken.None" />.</param>
public static async Task CopyToAsync(Stream source, Stream destination, long? count, int bufferSize, CancellationToken cancel)
{
long? bytesRemaining = count;

var buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
{
Debug.Assert(source != null);
Debug.Assert(destination != null);
Debug.Assert(!bytesRemaining.HasValue || bytesRemaining.GetValueOrDefault() >= 0);
Debug.Assert(buffer != null);

while (true)
{
// The natural end of the range.
if (bytesRemaining.HasValue && bytesRemaining.GetValueOrDefault() <= 0)
{
return;
}

cancel.ThrowIfCancellationRequested();

int readLength = buffer.Length;
if (bytesRemaining.HasValue)
{
readLength = (int)Math.Min(bytesRemaining.GetValueOrDefault(), (long)readLength);
}
int read = await source.ReadAsync(buffer, 0, readLength, cancel);

if (bytesRemaining.HasValue)
{
bytesRemaining -= read;
}

// End of the source stream.
if (read == 0)
{
return;
}

cancel.ThrowIfCancellationRequested();

await destination.WriteAsync(buffer, 0, read, cancel);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public static Task CopyToAsync(Stream source, Stream destination, long? count, int bufferSize, CancellationToken cancel)
=> StreamCopyOperationInternal.CopyToAsync(source, destination, count, bufferSize, cancel);
}
}
}
26 changes: 23 additions & 3 deletions src/Http/Http.Extensions/test/SendFileResponseExtensionsTests.cs
@@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved. See License.txt in the project root for license information.

using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;
Expand All @@ -22,8 +23,8 @@ public async Task SendFileWorks()
{
var context = new DefaultHttpContext();
var response = context.Response;
var fakeFeature = new FakeSendFileFeature();
context.Features.Set<IHttpSendFileFeature>(fakeFeature);
var fakeFeature = new FakeResponseBodyFeature();
context.Features.Set<IHttpResponseBodyFeature>(fakeFeature);

await response.SendFileAsync("bob", 1, 3, CancellationToken.None);

Expand All @@ -33,13 +34,27 @@ public async Task SendFileWorks()
Assert.Equal(CancellationToken.None, fakeFeature.token);
}

private class FakeSendFileFeature : IHttpSendFileFeature
private class FakeResponseBodyFeature : IHttpResponseBodyFeature
{
public string name = null;
public long offset = 0;
public long? length = null;
public CancellationToken token;

public Stream Stream => throw new System.NotImplementedException();

public PipeWriter Writer => throw new System.NotImplementedException();

public Task CompleteAsync()
{
throw new System.NotImplementedException();
}

public void DisableBuffering()
{
throw new System.NotImplementedException();
}

public Task SendFileAsync(string path, long offset, long? length, CancellationToken cancellation)
{
this.name = path;
Expand All @@ -48,6 +63,11 @@ public Task SendFileAsync(string path, long offset, long? length, CancellationTo
this.token = cancellation;
return Task.FromResult(0);
}

public Task StartAsync(CancellationToken token = default)
{
throw new System.NotImplementedException();
}
}
}
}

0 comments on commit 0806609

Please sign in to comment.