Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for processing requests with StreamContent to AddStandardHedgingHandler() #5112

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
Expand All @@ -25,10 +26,15 @@ internal sealed class RequestMessageSnapshotStrategy : ResilienceStrategy
Throw.InvalidOperationException("The HTTP request message was not found in the resilience context.");
}

using var snapshot = RequestMessageSnapshot.Create(request);

context.Properties.Set(ResilienceKeys.RequestSnapshot, snapshot);

return await callback(context, state).ConfigureAwait(context.ContinueOnCapturedContext);
try
{
using var snapshot = await RequestMessageSnapshot.CreateAsync(request).ConfigureAwait(context.ContinueOnCapturedContext);
context.Properties.Set(ResilienceKeys.RequestSnapshot, snapshot);
return await callback(context, state).ConfigureAwait(context.ContinueOnCapturedContext);
}
catch (IOException e)
{
return Outcome.FromException<TResult>(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

using System;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Http.Resilience;
Expand Down Expand Up @@ -88,26 +90,48 @@ public static IStandardHedgingHandlerBuilder AddStandardHedgingHandler(this IHtt
Throw.InvalidOperationException("Request message snapshot is not attached to the resilience context.");
}

var requestMessage = snapshot.CreateRequestMessage();

// The secondary request message should use the action resilience context
requestMessage.SetResilienceContext(args.ActionContext);

// replace the request message
args.ActionContext.Properties.Set(ResilienceKeys.RequestMessage, requestMessage);

// if a routing strategy has been configured but it does not return the next route, then no more routes
// are availabe, stop hedging
Uri? route;
if (args.PrimaryContext.Properties.TryGetValue(ResilienceKeys.RoutingStrategy, out var routingPipeline))
{
if (!routingPipeline.TryGetNextRoute(out var route))
if (!routingPipeline.TryGetNextRoute(out route))
{
// no routes left, stop hedging
return null;
}

requestMessage.RequestUri = requestMessage.RequestUri!.ReplaceHost(route);
}
else
{
route = null;
}

return async () =>
{
Outcome<HttpResponseMessage>? actionResult = null;

try
{
var requestMessage = await snapshot.CreateRequestMessageAsync().ConfigureAwait(args.ActionContext.ContinueOnCapturedContext);

// The secondary request message should use the action resilience context
requestMessage.SetResilienceContext(args.ActionContext);

// replace the request message
args.ActionContext.Properties.Set(ResilienceKeys.RequestMessage, requestMessage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
args.ActionContext.Properties.Set(ResilienceKeys.RequestMessage, requestMessage);
args.ActionContext.Properties.Set(ResilienceKeys.RequestMessage, requestMessage);

There is a breaking change here :/

The OnHedging callback does not have access to request message anymore. This is because OnHedging is called after the action is created and before it is invoked.

https://github.com/App-vNext/Polly/blob/f85029c6d14ad20fd36e4fcdde7a32f33409137a/src/Polly.Core/Hedging/Controller/TaskExecution.cs#L127


if (route != null)
adamhammond marked this conversation as resolved.
Show resolved Hide resolved
{
// replace the RequestUri of the request per the routing strategy
requestMessage.RequestUri = requestMessage.RequestUri!.ReplaceHost(route);
}
}
catch (IOException e)
{
actionResult = Outcome.FromException<HttpResponseMessage>(e);
}

return () => args.Callback(args.ActionContext);
return actionResult ?? await args.Callback(args.ActionContext).ConfigureAwait(args.ActionContext.ContinueOnCapturedContext);
};
};
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Extensions.ObjectPool;
using Microsoft.Shared.Diagnostics;
using Microsoft.Shared.Pools;
Expand All @@ -22,21 +24,40 @@ internal sealed class RequestMessageSnapshot : IResettable, IDisposable
private Version? _version;
private HttpContent? _content;

public static RequestMessageSnapshot Create(HttpRequestMessage request)
[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Past the point of no cancellation.")]
public static async Task<RequestMessageSnapshot> CreateAsync(HttpRequestMessage request)
{
_ = Throw.IfNull(request);

var snapshot = _snapshots.Get();
snapshot.Initialize(request);
await snapshot.InitializeAsync(request).ConfigureAwait(false);
return snapshot;
}

public HttpRequestMessage CreateRequestMessage()
[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Past the point of no cancellation.")]
public async Task<HttpRequestMessage> CreateRequestMessageAsync()
adamhammond marked this conversation as resolved.
Show resolved Hide resolved
{
if (IsReset())
{
throw new InvalidOperationException($"{nameof(CreateRequestMessageAsync)}() cannot be called on a snapshot object that has been reset and has not been initialized");
}

var clone = new HttpRequestMessage(_method!, _requestUri)
{
Content = _content,
Version = _version!
};

if (_content is StreamContent)
adamhammond marked this conversation as resolved.
Show resolved Hide resolved
{
(HttpContent? content, HttpContent? clonedContent) = await CloneContentAsync(_content).ConfigureAwait(false);
_content = content;
clone.Content = clonedContent;
}
else
{
clone.Content = _content;
}

#if NET5_0_OR_GREATER
foreach (var prop in _properties)
{
Expand All @@ -56,6 +77,7 @@ public HttpRequestMessage CreateRequestMessage()
return clone;
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Critical Bug", "S2952:Classes should \"Dispose\" of members from the classes' own \"Dispose\" methods", Justification = "Handled by ObjectPool")]
bool IResettable.TryReset()
{
_properties.Clear();
Expand All @@ -64,24 +86,76 @@ bool IResettable.TryReset()
_method = null;
_version = null;
_requestUri = null;
if (_content is StreamContent)
{
// a snapshot's StreamContent is always a unique copy (deep clone)
// therefore, it is safe to dispose when snapshot is no longer needed
_content.Dispose();
}

_content = null;

return true;
}

void IDisposable.Dispose() => _snapshots.Return(this);

private void Initialize(HttpRequestMessage request)
[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Past the point of no cancellation.")]
private static async Task<(HttpContent? content, HttpContent? clonedContent)> CloneContentAsync(HttpContent? content)
{
if (request.Content is StreamContent)
HttpContent? clonedContent = null;
if (content != null)
{
Throw.InvalidOperationException($"{nameof(StreamContent)} content cannot by cloned.");
HttpContent originalContent = content;
Stream originalRequestBody = await content.ReadAsStreamAsync().ConfigureAwait(false);
MemoryStream clonedRequestBody = new MemoryStream();
await originalRequestBody.CopyToAsync(clonedRequestBody).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streamed content can point to a large file which can have > GBs in size. Since we are copying to memory we should constraint it. Let's say < 10MB

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's a good idea to impose a size limit. For example, I don't think the framework imposes any such limit when cloning request content for redirected requests. IMO, we should allow this to be limitless, and require users to enforce their own request content size limits, if they have them, via server configurations and/or their own custom handlers, filters, middleware, etc. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change can buffer potentially endless stream into memory and crash the process. I think it's something we should guard against.

This thing should be used for relatively small streamed payloads.

Btw, you can try to call LoadIntoBufferAsync on the content to see how it behaves.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do understand the concern; however, the max size that a given stream can grow before it crashes the process is bound only by the memory resources available on the given server that it's executing on. Therefore, by choosing a static max limit, we are creating a cap that might unnecessarily hinder users of the API that have both requirements and the necessary server resources available to process requests with exceptionally large stream content. Further, RequestMessageSnapshot is only cloning content from existing request objects that contain existing StreamContent. Therefore, a HttpRequestMessage object has already been created with StreamContent of the given size before any of the RequestMessageSnapshot operations are executed. If the size of the StreamContent was in fact too large, it would have already crashed the process.

I feel strongly that we should not impose a size limit when cloning the StreamContent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the size of the StreamContent was in fact too large, it would have already crashed the process.

This is not true, the StreamContent can point to file stream and it consumes minimal memory even as you read the whole file. What you are doing here is materializing the whole stream into the memory. This is what concerns me.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way what you are saying is true is if the content is a FileStream and is read into a buffer, correct? This is somewhat of a remarkable edge case since we are talking about streams in the context of HTTP request content. It would help me understand this better if you could provide an example scenario where a developer would need to support this. Also, it is universally known that a given request must be deep cloned in order to support hedging. I believe the general assumption is that deep cloning will be handled via cloning to a MemoryStream as opposed to any alternative stream, which would require the entire stream to be copied to memory. Therefore, I return to my previous stance that users of the AddStandardHedgingHandler extension method should add their own handler, middleware, etc. to impose a size limit on request content if they have one. My biggest concern is that we don't have enough information to know what the optimal max limit value should be that would support the most use cases effectively.

clonedRequestBody.Position = 0;
if (originalRequestBody.CanSeek)
{
originalRequestBody.Position = 0;
}
else
{
originalRequestBody = new MemoryStream();
await clonedRequestBody.CopyToAsync(originalRequestBody).ConfigureAwait(false);
originalRequestBody.Position = 0;
clonedRequestBody.Position = 0;
}

clonedContent = new StreamContent(clonedRequestBody);
content = new StreamContent(originalRequestBody);
foreach (KeyValuePair<string, IEnumerable<string>> header in originalContent.Headers)
{
_ = clonedContent.Headers.TryAddWithoutValidation(header.Key, header.Value);
_ = content.Headers.TryAddWithoutValidation(header.Key, header.Value);
}
}

return (content, clonedContent);
}

private bool IsReset()
{
return _method == null;
}
adamhammond marked this conversation as resolved.
Show resolved Hide resolved

[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Past the point of no cancellation.")]
private async Task InitializeAsync(HttpRequestMessage request)
{
_method = request.Method;
_version = request.Version;
_requestUri = request.RequestUri;
_content = request.Content;
if (request.Content is StreamContent)
{
(HttpContent? requestContent, HttpContent? clonedRequestContent) = await CloneContentAsync(request.Content).ConfigureAwait(false);
_content = clonedRequestContent;
request.Content = requestContent;
}
else
{
_content = request.Content;
}

// headers
_headers.AddRange(request.Headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void Configure_ValidConfigurationSection_ShouldInitialize()
}

[Fact]
public void ActionGenerator_Ok()
public async Task ActionGenerator_Ok()
{
var options = Builder.Services.BuildServiceProvider().GetRequiredService<IOptionsMonitor<HttpStandardHedgingResilienceOptions>>().Get(Builder.Name);
var generator = options.Hedging.ActionGenerator;
Expand All @@ -115,7 +115,7 @@ public void ActionGenerator_Ok()
generator.Invoking(g => g(args)).Should().Throw<InvalidOperationException>().WithMessage("Request message snapshot is not attached to the resilience context.");

using var request = new HttpRequestMessage();
using var snapshot = RequestMessageSnapshot.Create(request);
using var snapshot = await RequestMessageSnapshot.CreateAsync(request).ConfigureAwait(false);
primary.Properties.Set(ResilienceKeys.RequestSnapshot, snapshot);
generator.Invoking(g => g(args)).Should().NotThrow();
}
Expand Down