Skip to content

Commit

Permalink
Started adding task completion. (#1922)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib committed May 13, 2020
1 parent cfc1c2c commit 25100d4
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 99 deletions.
Expand Up @@ -6,7 +6,7 @@ namespace HotChocolate.Execution
{
public readonly struct ResultValue : IEquatable<ResultValue?>
{
public ResultValue(string name, object value, bool isNullable)
public ResultValue(string name, object? value, bool isNullable)
{
Name = name;
Value = value;
Expand All @@ -16,7 +16,7 @@ public ResultValue(string name, object value, bool isNullable)

public string Name { get; }

public object Value { get; }
public object? Value { get; }

public bool IsNullable { get; }

Expand Down
25 changes: 25 additions & 0 deletions src/HotChocolate/Core/src/Execution/CompletionQueue.cs
@@ -0,0 +1,25 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;

namespace HotChocolate.Execution
{
internal sealed class CompletionQueue : ICompletionQueue
{
private readonly ConcurrentQueue<ResolverTask> _queue =
new ConcurrentQueue<ResolverTask>();

public event EventHandler? TaskEnqueued;

public void Enqueue(ResolverTask task)
{
_queue.Enqueue(task);
TaskEnqueued?.Invoke(this, EventArgs.Empty);
}

public bool TryDequeue([NotNullWhen(true)]out ResolverTask? task)
{
return _queue.TryDequeue(out task);
}
}
}
14 changes: 14 additions & 0 deletions src/HotChocolate/Core/src/Execution/ICompletionQueue.cs
@@ -0,0 +1,14 @@
using System;
using System.Diagnostics.CodeAnalysis;

namespace HotChocolate.Execution
{
internal interface ICompletionQueue
{
event EventHandler? TaskEnqueued;

void Enqueue(ResolverTask task);

bool TryDequeue([NotNullWhen(true)]out ResolverTask? task);
}
}
7 changes: 7 additions & 0 deletions src/HotChocolate/Core/src/Execution/IExecutionContext.cs
Expand Up @@ -16,6 +16,11 @@ internal interface IExecutionContext
/// </summary>
ITaskQueue Tasks { get; }

/// <summary>
/// Gets the tasks that need to be completed.
/// </summary>
ICompletionQueue Completion { get; }

/// <summary>
/// Gets the batch dispatcher.
/// </summary>
Expand All @@ -29,6 +34,8 @@ internal interface IExecutionContext
/// </summary>
Task WaitForEngine(CancellationToken cancellationToken);

Task WaitForCompletion(CancellationToken cancellationToken);

/// <summary>
/// operationContext.Tasks.IsEmpty
/// && operationContext.BatchScheduler.IsEmpty
Expand Down
2 changes: 1 addition & 1 deletion src/HotChocolate/Core/src/Execution/IOperationContext.cs
Expand Up @@ -111,7 +111,7 @@ public interface IResultHelper
/// <param name="error">The error that shall be added.</param>
void AddErrors(IEnumerable<IError> errors, FieldNode? selection = null);

void AddNonNullViolation(FieldNode selection, IResultMap parent);
void AddNonNullViolation(FieldNode selection, Path path, IResultMap parent);

IReadOnlyQueryResult BuildResult();
}
Expand Down
24 changes: 10 additions & 14 deletions src/HotChocolate/Core/src/Execution/MiddlewareContext.Arguments.cs
@@ -1,3 +1,4 @@
using System.Collections.Generic;
using HotChocolate.Execution.Utilities;
using HotChocolate.Language;
using HotChocolate.Resolvers;
Expand All @@ -6,9 +7,11 @@

namespace HotChocolate.Execution
{
internal partial class MiddlewareContext
: IMiddlewareContext
internal partial class MiddlewareContext : IMiddlewareContext
{
public IReadOnlyDictionary<NameString, PreparedArgument> Arguments { get; set; } =
default!;

public T Argument<T>(NameString name)
{
if (typeof(IValueNode).IsAssignableFrom(typeof(T)))
Expand All @@ -27,7 +30,7 @@ public T Argument<T>(NameString name)

public T ArgumentValue<T>(NameString name)
{
if (!_selection.Arguments.TryGetValue(name, out PreparedArgument? argument))
if (!Arguments.TryGetValue(name, out PreparedArgument? argument))
{
throw new GraphQLException(); // throw helper
}
Expand All @@ -37,7 +40,7 @@ public T ArgumentValue<T>(NameString name)

public Optional<T> ArgumentOptional<T>(NameString name)
{
if (!_selection.Arguments.TryGetValue(name, out PreparedArgument? argument))
if (!Arguments.TryGetValue(name, out PreparedArgument? argument))
{
throw new GraphQLException(); // throw helper
}
Expand All @@ -47,18 +50,13 @@ public Optional<T> ArgumentOptional<T>(NameString name)

public T ArgumentLiteral<T>(NameString name) where T : IValueNode
{
if (!_selection.Arguments.TryGetValue(name, out PreparedArgument? argument))
if (!Arguments.TryGetValue(name, out PreparedArgument? argument))
{
throw new GraphQLException(); // throw helper
}

IValueNode literal = argument.ValueLiteral!;

if (!argument.IsFinal)
{
literal = _operationContext.ReplaceVariables(literal, argument.Type);
}

if (literal is T castedLiteral)
{
return castedLiteral;
Expand All @@ -70,7 +68,7 @@ public Optional<T> ArgumentOptional<T>(NameString name)

public ValueKind ArgumentKind(NameString name)
{
if (!_selection.Arguments.TryGetValue(name, out PreparedArgument? argument))
if (!Arguments.TryGetValue(name, out PreparedArgument? argument))
{
throw new GraphQLException(); // throw helper
}
Expand All @@ -86,9 +84,7 @@ private T CoerceArgumentValue<T>(PreparedArgument argument)

if (!argument.IsFinal)
{
IValueNode literal = argument.ValueLiteral!;
literal = _operationContext.ReplaceVariables(literal, argument.Type);
value = argument.Type.ParseLiteral(literal);
value = argument.Type.ParseLiteral(argument.ValueLiteral!);
}

if (value is null)
Expand Down
Expand Up @@ -25,6 +25,7 @@ internal partial class MiddlewareContext : IMiddlewareContext
_parent = parent;
Path = path;
ScopedContextData = scopedContextData;
Arguments = _selection.Arguments;
}

public void Clear()
Expand All @@ -43,6 +44,7 @@ public void Clear()
ResponseIndex = default;
ResultMap = default!;
HasErrors = false;
Arguments = default!;
}
}
}
46 changes: 40 additions & 6 deletions src/HotChocolate/Core/src/Execution/QueryExecutor.cs
Expand Up @@ -16,22 +16,24 @@ internal sealed class QueryExecutor : IOperationExecutor
}

private async Task ExecuteResolversAsync(
IExecutionContext executionContext,
IExecutionContext executionContext,
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested &&
{
BeginCompletion(executionContext, cancellationToken);

while (!cancellationToken.IsCancellationRequested &&
!executionContext.IsCompleted)
{
while (!cancellationToken.IsCancellationRequested &&
while (!cancellationToken.IsCancellationRequested &&
executionContext.Tasks.TryDequeue(out ResolverTask task))
{
task.BeginExecute();
}

await executionContext.WaitForEngine(cancellationToken).ConfigureAwait(false);

while (!cancellationToken.IsCancellationRequested &&
executionContext.Tasks.IsEmpty &&
while (!cancellationToken.IsCancellationRequested &&
executionContext.Tasks.IsEmpty &&
executionContext.BatchDispatcher.HasTasks)
{
await executionContext.BatchDispatcher.DispatchAsync(cancellationToken)
Expand All @@ -45,5 +47,37 @@ await executionContext.WaitForEngine(cancellationToken)

// ensure non-null propagation
}

/// <summary>
/// Completes running resolver tasks and returns task to the bool.
/// </summary>
private void BeginCompletion(
IExecutionContext executionContext,
CancellationToken cancellationToken)
{
Task.Factory.StartNew(
async () =>
{
while (!cancellationToken.IsCancellationRequested &&
!executionContext.IsCompleted)
{
await executionContext.WaitForCompletion(cancellationToken)
.ConfigureAwait(false);
while (!cancellationToken.IsCancellationRequested &&
executionContext.Completion.TryDequeue(out ResolverTask? task))
{
if (!task.IsCompleted)
{
await task.EndExecuteAsync().ConfigureAwait(false);
}
// todo : return task to pool
}
}
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
}

0 comments on commit 25100d4

Please sign in to comment.