Skip to content

Commit

Permalink
Reworked Batching Interface
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib committed Jun 8, 2022
1 parent c06f37a commit 054b87a
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 89 deletions.
12 changes: 9 additions & 3 deletions src/HotChocolate/AspNetCore/src/AspNetCore/MiddlewareBase.cs
Expand Up @@ -14,6 +14,7 @@ public class MiddlewareBase : IDisposable
{
private readonly RequestDelegate _next;
private readonly IHttpResultSerializer _resultSerializer;
private bool? _batching = null;
private bool _disposed;

protected MiddlewareBase(
Expand Down Expand Up @@ -141,13 +142,17 @@ protected async ValueTask<ISchema> GetSchemaAsync(CancellationToken cancellation
requestBuilder.SetOperation(operationNames[i]);

await requestInterceptor.OnCreateAsync(
context, requestExecutor, requestBuilder, context.RequestAborted);
context,
requestExecutor,
requestBuilder,
context.RequestAborted);

requestBatch[i] = requestBuilder.Create();
}

return await requestExecutor.ExecuteBatchAsync(
requestBatch, cancellationToken: context.RequestAborted);
requestBatch,
cancellationToken: context.RequestAborted);
}

protected static async Task<IResponseStream> ExecuteBatchAsync(
Expand All @@ -172,7 +177,8 @@ protected async ValueTask<ISchema> GetSchemaAsync(CancellationToken cancellation
}

return await requestExecutor.ExecuteBatchAsync(
requestBatch, cancellationToken: context.RequestAborted);
requestBatch,
cancellationToken: context.RequestAborted);
}

protected static AllowedContentType ParseContentType(HttpContext context)
Expand Down
Expand Up @@ -2,9 +2,4 @@
Content-Type: application/json; charset=utf-8

{"data":{"hero":{"id":"1000"}}}
---
Content-Type: application/json; charset=utf-8

{"data":{"human":{"name":"Luke Skywalker"}}}
---
-----
Expand Up @@ -24,7 +24,7 @@ public class AutoUpdateRequestExecutorProxy : IRequestExecutor
_executorProxy = requestExecutorProxy;
_executor = initialExecutor;

_executorProxy.ExecutorEvicted += (sender, args) => BeginUpdateExecutor();
_executorProxy.ExecutorEvicted += (_, _) => BeginUpdateExecutor();

BeginUpdateExecutor();
}
Expand Down Expand Up @@ -146,10 +146,9 @@ public class AutoUpdateRequestExecutorProxy : IRequestExecutor
/// Returns a stream of query results.
/// </returns>
public Task<IResponseStream> ExecuteBatchAsync(
IEnumerable<IQueryRequest> requestBatch,
bool allowParallelExecution = false,
IReadOnlyList<IQueryRequest> requestBatch,
CancellationToken cancellationToken = default) =>
_executor.ExecuteBatchAsync(requestBatch, allowParallelExecution, cancellationToken);
_executor.ExecuteBatchAsync(requestBatch, cancellationToken);

private void BeginUpdateExecutor() =>
Task.Run(UpdateExecutorAsync);
Expand Down
Expand Up @@ -17,7 +17,7 @@ internal partial class BatchExecutor
{
private sealed class BatchExecutorEnumerable : IAsyncEnumerable<IQueryResult>
{
private readonly IEnumerable<IQueryRequest> _requestBatch;
private readonly IReadOnlyList<IQueryRequest> _requestBatch;
private readonly IRequestExecutor _requestExecutor;
private readonly IErrorHandler _errorHandler;
private readonly ITypeConverter _typeConverter;
Expand All @@ -29,7 +29,7 @@ private sealed class BatchExecutorEnumerable : IAsyncEnumerable<IQueryResult>
private Dictionary<string, FragmentDefinitionNode>? _fragments;

public BatchExecutorEnumerable(
IEnumerable<IQueryRequest> requestBatch,
IReadOnlyList<IQueryRequest> requestBatch,
IRequestExecutor requestExecutor,
IErrorHandler errorHandler,
ITypeConverter typeConverter,
Expand All @@ -51,11 +51,12 @@ private sealed class BatchExecutorEnumerable : IAsyncEnumerable<IQueryResult>
public async IAsyncEnumerator<IQueryResult> GetAsyncEnumerator(
CancellationToken cancellationToken = default)
{
foreach (IQueryRequest queryRequest in _requestBatch)
for (var i = 0; i < _requestBatch.Count; i++)
{
IQueryRequest queryRequest = _requestBatch[i];
var request = (IReadOnlyQueryRequest)queryRequest;
IQueryResult result =
await ExecuteNextAsync(request, cancellationToken).ConfigureAwait(false);
IQueryResult result = await ExecuteNextAsync(
request, cancellationToken).ConfigureAwait(false);
yield return result;

if (result.Data is null)
Expand Down Expand Up @@ -155,8 +156,7 @@ private sealed class BatchExecutorEnumerable : IAsyncEnumerable<IQueryResult>

if (!exported[variableName].Any())
{
if (variables != null
&& variables.TryGetValue(variableName, out var value))
if (variables != null && variables.TryGetValue(variableName, out var value))
{
merged[variableName] = value;
}
Expand All @@ -165,8 +165,7 @@ private sealed class BatchExecutorEnumerable : IAsyncEnumerable<IQueryResult>
{
var list = new List<object?>();

if (variables != null
&& variables.TryGetValue(variableName, out var value))
if (variables != null && variables.TryGetValue(variableName, out var value))
{
if (value is IReadOnlyCollection<object?> l)
{
Expand All @@ -178,21 +177,16 @@ private sealed class BatchExecutorEnumerable : IAsyncEnumerable<IQueryResult>
}
}

foreach (ExportedVariable variable in
exported[variableName])
foreach (ExportedVariable variable in exported[variableName])
{
SerializeListValue(
variable,
variableDefinition.Type,
list);
SerializeListValue(variable, variableDefinition.Type, list);
}

merged[variableName] = list;
}
else
{
if (variables != null
&& variables.TryGetValue(variableName, out var value))
if (variables != null && variables.TryGetValue(variableName, out var value))
{
merged[variableName] = value;
}
Expand Down
6 changes: 2 additions & 4 deletions src/HotChocolate/Core/src/Execution/Batching/BatchExecutor.cs
Expand Up @@ -27,13 +27,11 @@ internal partial class BatchExecutor

public IAsyncEnumerable<IQueryResult> ExecuteAsync(
IRequestExecutor requestExecutor,
IEnumerable<IQueryRequest> requestBatch)
{
return new BatchExecutorEnumerable(
IReadOnlyList<IQueryRequest> requestBatch)
=> new BatchExecutorEnumerable(
requestBatch,
requestExecutor,
_errorHandler,
_typeConverter,
_inputFormatter);
}
}
Expand Up @@ -2,11 +2,9 @@

namespace HotChocolate.Execution.Batching;

public sealed class ExportDirectiveType
: DirectiveType<ExportDirective>
public sealed class ExportDirectiveType : DirectiveType<ExportDirective>
{
protected override void Configure(
IDirectiveTypeDescriptor<ExportDirective> descriptor)
protected override void Configure(IDirectiveTypeDescriptor<ExportDirective> descriptor)
{
descriptor.Name(ExportDirectiveHelper.Name);
descriptor.Location(DirectiveLocation.Field);
Expand Down
Expand Up @@ -269,20 +269,23 @@ await GetRequestExecutorAsync(services, schemaName, cancellationToken)
/// The cancellation token.
/// </param>
/// <returns>
/// Returns the execution result of the given GraphQL <paramref name="query" />.
///
/// <para>Returns the execution result of the given GraphQL <paramref name="query" />.</para>
/// <para>
/// If the request operation is a simple query or mutation the result is a
/// <see cref="IQueryResult" />.
///
/// </para>
/// <para>
/// If the request operation is a query or mutation where data is deferred, streamed or
/// includes live data the result is a <see cref="IResponseStream" /> where each result
/// that the <see cref="IResponseStream" /> yields is a
/// <see cref="IQueryResult" />.
///
/// </para>
/// <para>
/// If the request operation is a subscription the result is a
/// <see cref="IResponseStream" /> where each result that the
/// <see cref="IResponseStream" /> yields is a
/// <see cref="IQueryResult" />.
/// </para>
/// </returns>
public static async Task<IExecutionResult> ExecuteRequestAsync(
this IRequestExecutorBuilder builder,
Expand Down Expand Up @@ -322,8 +325,7 @@ await BuildRequestExecutorAsync(builder, schemaName, cancellationToken)
/// </returns>
public static async Task<IResponseStream> ExecuteBatchRequestAsync(
this IServiceProvider services,
IEnumerable<IQueryRequest> requestBatch,
bool allowParallelExecution = false,
IReadOnlyList<IQueryRequest> requestBatch,
NameString schemaName = default,
CancellationToken cancellationToken = default)
{
Expand All @@ -332,7 +334,7 @@ await GetRequestExecutorAsync(services, schemaName, cancellationToken)
.ConfigureAwait(false);

return await executor
.ExecuteBatchAsync(requestBatch, allowParallelExecution, cancellationToken)
.ExecuteBatchAsync(requestBatch, cancellationToken)
.ConfigureAwait(false);
}
}
3 changes: 1 addition & 2 deletions src/HotChocolate/Core/src/Execution/IRequestExecutor.cs
Expand Up @@ -70,7 +70,6 @@ public interface IRequestExecutor
/// Returns a stream of query results.
/// </returns>
Task<IResponseStream> ExecuteBatchAsync(
IEnumerable<IQueryRequest> requestBatch,
bool allowParallelExecution = false,
IReadOnlyList<IQueryRequest> requestBatch,
CancellationToken cancellationToken = default);
}
4 changes: 2 additions & 2 deletions src/HotChocolate/Core/src/Execution/RequestExecutor.cs
Expand Up @@ -15,6 +15,7 @@ internal sealed class RequestExecutor : IRequestExecutor
private readonly RequestDelegate _requestDelegate;
private readonly BatchExecutor _batchExecutor;
private readonly ObjectPool<RequestContext> _contextPool;
private readonly bool _parallelBatching;

public RequestExecutor(
ISchema schema,
Expand Down Expand Up @@ -103,8 +104,7 @@ internal sealed class RequestExecutor : IRequestExecutor
}

public Task<IResponseStream> ExecuteBatchAsync(
IEnumerable<IQueryRequest> requestBatch,
bool allowParallelExecution = false,
IReadOnlyList<IQueryRequest> requestBatch,
CancellationToken cancellationToken = default)
{
if (requestBatch is null)
Expand Down
20 changes: 5 additions & 15 deletions src/HotChocolate/Core/src/Execution/RequestExecutorProxy.cs
Expand Up @@ -9,9 +9,9 @@ namespace HotChocolate.Execution;
/// The <see cref="RequestExecutorProxy"/> is a helper class that represents a executor for
/// one specific schema and handles the resolving and hot-swapping the specific executor.
/// </summary>
public class RequestExecutorProxy : IDisposable
public sealed class RequestExecutorProxy : IDisposable
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _semaphore = new(1, 1);
private readonly IRequestExecutorResolver _executorResolver;
private readonly NameString _schemaName;
private IRequestExecutor? _executor;
Expand Down Expand Up @@ -82,18 +82,14 @@ await executor
/// <param name="requestBatch">
/// The GraphQL request batch.
/// </param>
/// <param name="allowParallelExecution">
/// Defines if the executor is allowed to execute the batch in parallel.
/// </param>
/// <param name="cancellationToken">
/// The cancellation token.
/// </param>
/// <returns>
/// Returns a stream of query results.
/// </returns>
public async Task<IResponseStream> ExecuteBatchAsync(
IEnumerable<IQueryRequest> requestBatch,
bool allowParallelExecution = false,
IReadOnlyList<IQueryRequest> requestBatch,
CancellationToken cancellationToken = default)
{
if (requestBatch == null)
Expand All @@ -107,7 +103,7 @@ await GetRequestExecutorAsync(cancellationToken)

IResponseStream result =
await executor
.ExecuteBatchAsync(requestBatch, allowParallelExecution, cancellationToken)
.ExecuteBatchAsync(requestBatch, cancellationToken)
.ConfigureAwait(false);

return result;
Expand Down Expand Up @@ -196,13 +192,7 @@ private void EvictRequestExecutor(object? sender, RequestExecutorEvictedEventArg

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (!_disposed && disposing)
if (!_disposed)
{
_executor = null;
_semaphore.Dispose();
Expand Down
Expand Up @@ -81,7 +81,7 @@ public sealed partial class MultiPartResponseStreamFormatter : IResponseStreamFo
{
await WriteResultAsync(result, outputStream, ct).ConfigureAwait(false);

if (result.HasNext ?? true)
if (result.HasNext ?? false)
{
await WriteNextAsync(outputStream, ct).ConfigureAwait(false);
await outputStream.FlushAsync(ct).ConfigureAwait(false);
Expand Down Expand Up @@ -119,7 +119,7 @@ public sealed partial class MultiPartResponseStreamFormatter : IResponseStreamFo
await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false);
}

private async Task WriteResultHeaderAsync(
private static async Task WriteResultHeaderAsync(
Stream outputStream,
CancellationToken ct)
{
Expand All @@ -134,7 +134,7 @@ public sealed partial class MultiPartResponseStreamFormatter : IResponseStreamFo
await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false);
}

private async Task WriteNextAsync(
private static async Task WriteNextAsync(
Stream outputStream,
CancellationToken ct)
{
Expand All @@ -143,7 +143,7 @@ public sealed partial class MultiPartResponseStreamFormatter : IResponseStreamFo
await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false);
}

private async Task WriteEndAsync(
private static async Task WriteEndAsync(
Stream outputStream,
CancellationToken ct)
{
Expand Down

0 comments on commit 054b87a

Please sign in to comment.