Skip to content

Commit

Permalink
#2039 Buffer request body and copy the body to downstreams during mul…
Browse files Browse the repository at this point in the history
…tiplexing (#2050)

* feat: buffer the request body during multiplexing multiple routes

* style: rename clone request body method to be more explicit

* Code review by @raman-m

* feat: refactor clone request method, add acceptance test for form-based requests

* fix: add content-length log, refactor tests from @raman-m commit

* Update requestaggregation.rst

* style: reverse return condition

* Register `Stream` objects for disposing by downstream `HttpResponse`

---------

Co-authored-by: Paul Roy <paul.roy@astriis.com>
Co-authored-by: Raman Maksimchuk <dotnet044@gmail.com>
  • Loading branch information
3 people committed Apr 25, 2024
1 parent ab9fb65 commit 233f87a
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 110 deletions.
7 changes: 5 additions & 2 deletions docs/features/requestaggregation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,12 @@ Below is an example of an aggregator that you could implement for your solution:
Gotchas
-------

You cannot use Routes with specific **RequestIdKeys** as this would be crazy complicated to track.
* You cannot use Routes with specific **RequestIdKeys** as this would be crazy complicated to track.
* Aggregation only supports the ``GET`` HTTP verb.
* Aggregation allows for the forwarding of ``HttpRequest.Body`` to downstream services by duplicating the body data.
Form data and attached files should also be forwarded.
It is essential to always specify the ``Content-Length`` header in requests to upstream; otherwise, Ocelot will log warnings like *"Aggregation does not support body copy without Content-Length header!"*.

Aggregation only supports the ``GET`` HTTP verb.

""""

Expand Down
45 changes: 35 additions & 10 deletions src/Ocelot/Multiplexer/MultiplexingMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ public class MultiplexingMiddleware : OcelotMiddleware

public MultiplexingMiddleware(RequestDelegate next,
IOcelotLoggerFactory loggerFactory,
IResponseAggregatorFactory factory
)
IResponseAggregatorFactory factory)
: base(loggerFactory.CreateLogger<MultiplexingMiddleware>())
{
_factory = factory;
Expand Down Expand Up @@ -184,7 +183,7 @@ private Task MapResponsesAsync(HttpContext context, Route route, HttpContext mai
/// <returns>The cloned Http context.</returns>
private async Task<HttpContext> ProcessRouteAsync(HttpContext sourceContext, DownstreamRoute route, List<PlaceholderNameAndValue> placeholders = null)
{
var newHttpContext = CreateThreadContext(sourceContext);
var newHttpContext = await CreateThreadContextAsync(sourceContext);
CopyItemsToNewContext(newHttpContext, sourceContext, placeholders);
newHttpContext.Items.UpsertDownstreamRoute(route);

Expand All @@ -208,14 +207,15 @@ private static void CopyItemsToNewContext(HttpContext target, HttpContext source
/// </summary>
/// <param name="source">The base http context.</param>
/// <returns>The cloned context.</returns>
private static HttpContext CreateThreadContext(HttpContext source)
protected virtual async Task<HttpContext> CreateThreadContextAsync(HttpContext source)
{
var from = source.Request;
var from = source.Request;
var bodyStream = await CloneRequestBodyAsync(from, source.RequestAborted);
var target = new DefaultHttpContext
{
Request =
{
Body = from.Body, // TODO Consider stream cloning for multiple reads
Body = bodyStream,
ContentLength = from.ContentLength,
ContentType = from.ContentType,
Host = from.Host,
Expand All @@ -237,12 +237,13 @@ private static HttpContext CreateThreadContext(HttpContext source)
RequestAborted = source.RequestAborted,
User = source.User,
};

foreach (var header in from.Headers)
{
target.Request.Headers[header.Key] = header.Value.ToArray();
}

}

// Once the downstream request is completed and the downstream response has been read, the downstream response object can dispose of the body's Stream object
target.Response.RegisterForDisposeAsync(bodyStream); // manage Stream lifetime by HttpResponse object
return target;
}

Expand All @@ -255,5 +256,29 @@ protected virtual Task MapAsync(HttpContext httpContext, Route route, List<HttpC

var aggregator = _factory.Get(route);
return aggregator.Aggregate(route, httpContext, contexts);
}
}

protected virtual async Task<Stream> CloneRequestBodyAsync(HttpRequest request, CancellationToken aborted)
{
request.EnableBuffering();
if (request.Body.Position != 0)
{
Logger.LogWarning("Ocelot does not support body copy without stream in initial position 0");
return request.Body;
}

var targetBuffer = new MemoryStream();
if (request.ContentLength is not null)
{
await request.Body.CopyToAsync(targetBuffer, (int)request.ContentLength, aborted);
targetBuffer.Position = 0;
request.Body.Position = 0;
}
else
{
Logger.LogWarning("Aggregation does not support body copy without Content-Length header!");
}

return targetBuffer;
}
}
Loading

0 comments on commit 233f87a

Please sign in to comment.