Skip to content

Commit

Permalink
Integrated stream into operation compiler. (#5232)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib committed Jul 12, 2022
1 parent 45ca379 commit b01e494
Show file tree
Hide file tree
Showing 14 changed files with 354 additions and 93 deletions.
14 changes: 13 additions & 1 deletion src/HotChocolate/Core/src/Abstractions/Location.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace HotChocolate;

public readonly struct Location
public readonly struct Location : IComparable<Location>
{
public Location(int line, int column)
{
Expand All @@ -30,4 +30,16 @@ public Location(int line, int column)
public int Line { get; }

public int Column { get; }

public int CompareTo(Location other)
{
var lineComparison = Line.CompareTo(other.Line);

if (lineComparison != 0)
{
return lineComparison;
}

return Column.CompareTo(other.Column);
}
}
19 changes: 8 additions & 11 deletions src/HotChocolate/Core/src/Execution/Processing/DeferredStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ internal sealed class DeferredStream : DeferredExecutionTask

try
{
_task ??= new StreamExecutionTask(operationContext, this);
_task.Reset(resultId);
_task ??= new StreamExecutionTask(this);
_task.Reset(operationContext, resultId);

operationContext.Scheduler.Register(_task);
await operationContext.Scheduler.ExecuteAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -120,21 +120,17 @@ internal sealed class DeferredStream : DeferredExecutionTask

private sealed class StreamExecutionTask : ExecutionTask
{
private readonly OperationContext _operationContext;
private readonly DeferredStream _deferredStream;
private OperationContext _operationContext = default!;
private IImmutableDictionary<string, object?> _scopedContextData;

public StreamExecutionTask(
OperationContext operationContext,
DeferredStream deferredStream)
public StreamExecutionTask(DeferredStream deferredStream)
{
_operationContext = operationContext;
_deferredStream = deferredStream;
_scopedContextData = _deferredStream.ScopedContextData;
Context = operationContext;
}

protected override IExecutionTaskContext Context { get; }
protected override IExecutionTaskContext Context => _operationContext;

public ResolverTask? ChildTask { get; private set; }

Expand All @@ -157,9 +153,10 @@ protected override async ValueTask ExecuteAsync(CancellationToken cancellationTo
}
}

public void Reset(uint taskId)
public void Reset(OperationContext operationContext, uint taskId)
{
_scopedContextData =_scopedContextData.SetItem(DeferredResultId, taskId);
_operationContext = operationContext;
_scopedContextData = _scopedContextData.SetItem(DeferredResultId, taskId);
base.Reset();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private class DeferResultStream : IAsyncEnumerable<IQueryResult>
CancellationToken cancellationToken = default)
{
var span = _diagnosticEvents.ExecuteStream(_operation);
var hasNext = true;

try
{
Expand All @@ -106,11 +107,15 @@ private class DeferResultStream : IAsyncEnumerable<IQueryResult>
var result = await _stateOwner.State.TryDequeueResultAsync(cancellationToken);
if (result is not null)
{
hasNext = result.HasNext ?? false;
yield return result;
}
else
{

if (hasNext)
{
yield return new QueryResult(null, hasNext: false);
}
yield break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ VariableNode variable
return null;
}

internal static StreamDirective? GetStreamDirective(
this ISelection selection,
IVariableValueCollection variables) =>
selection.SyntaxNode.Directives.GetStreamDirective(variables);

internal static StreamDirective? GetStreamDirective(
this IReadOnlyList<DirectiveNode> directives,
IVariableValueCollection variables)
Expand Down Expand Up @@ -181,6 +186,10 @@ VariableNode variable
this IReadOnlyList<DirectiveNode> directives) =>
GetDirective(directives, WellKnownDirectives.Defer);

internal static DirectiveNode? GetStreamDirective(
this FieldNode selection) =>
GetDirective(selection.Directives, WellKnownDirectives.Stream);

internal static DirectiveNode? GetStreamDirective(
this IReadOnlyList<DirectiveNode> directives) =>
GetDirective(directives, WellKnownDirectives.Stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,21 @@ private void CompleteSelectionSet(CompilerContext context)
// before we change the overall stream handling.
//
// For now we only allow streams on lists of composite types.
//if (selection.SyntaxNode.IsStreamable())
// {
// var streamDirective = selection.SyntaxNode.GetStreamDirective();
// }
if (selection.SyntaxNode.IsStreamable())
{
var streamDirective = selection.SyntaxNode.GetStreamDirective();
var nullValue = NullValueNode.Default;
var ifValue = streamDirective?.GetIfArgumentValueOrDefault() ?? nullValue;
long ifConditionFlags = 0;

if (ifValue.Kind is not SyntaxKind.NullValue)
{
var ifCondition = new IncludeCondition(ifValue, nullValue);
ifConditionFlags = GetSelectionIncludeCondition(ifCondition, 0);
}

selection.MarkAsStream(ifConditionFlags);
}
}

selection.SetSelectionSetId(selectionSetId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Net;
using HotChocolate.Execution.Properties;

namespace HotChocolate.Execution.Processing;
Expand Down Expand Up @@ -87,6 +88,11 @@ public IQueryResult BuildResult()
Resources.ResultHelper_BuildResult_InvalidResult);
}

if (_errors.Count > 0)
{
_errors.Sort(ErrorComparer.Default);
}

var result = new QueryResult
(
_data,
Expand Down Expand Up @@ -125,8 +131,9 @@ public IQueryResultBuilder BuildResultBuilder()

builder.SetData(_data);

if (_errors is { Count: > 0 })
if (_errors.Count > 0)
{
_errors.Sort(ErrorComparer.Default);
builder.AddErrors(_errors);
}

Expand Down Expand Up @@ -157,6 +164,44 @@ public IQueryResultBuilder BuildResultBuilder()
public void DiscardResult()
=> _resultOwner.Dispose();

private sealed class ErrorComparer : IComparer<IError>
{
public int Compare(IError? x, IError? y)
{
if (ReferenceEquals(x, y))
{
return 0;
}

if (ReferenceEquals(null, y))
{
return 1;
}

if (ReferenceEquals(null, x))
{
return -1;
}

if (y.Locations?.Count > 0)
{
if (x.Locations?.Count > 0)
{
return x.Locations[0].CompareTo(y.Locations[0]);
}
return 1;
}

if (x.Locations?.Count > 0)
{
return -1;
}

return 0;
}

public static readonly ErrorComparer Default = new();
}
}


16 changes: 16 additions & 0 deletions src/HotChocolate/Core/src/Execution/Processing/Selection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class Selection : ISelection
new(new Dictionary<string, ArgumentValue>());

private long[] _includeConditions;
private long _streamIfCondition;
private Flags _flags;

public Selection(
Expand Down Expand Up @@ -122,6 +123,10 @@ protected Selection(Selection selection)
/// <inheritdoc />
public IArgumentMap Arguments { get; }

public bool IsStream(long includeFlags)
=> (_flags & Flags.Stream) == Flags.Stream &&
(_streamIfCondition is 0 || (includeFlags & _streamIfCondition) != _streamIfCondition);

public bool IsReadOnly => (_flags & Flags.Sealed) == Flags.Sealed;

/// <inheritdoc />
Expand Down Expand Up @@ -275,6 +280,17 @@ internal void SetSelectionSetId(int selectionSetId)
SelectionSetId = selectionSetId;
}

internal void MarkAsStream(long ifCondition)
{
if ((_flags & Flags.Sealed) == Flags.Sealed)
{
throw new NotSupportedException(Resources.PreparedSelection_ReadOnly);
}

_streamIfCondition = ifCondition;
_flags |= Flags.Stream;
}

internal void Seal()
{
if ((_flags & Flags.Sealed) != Flags.Sealed)
Expand Down

0 comments on commit b01e494

Please sign in to comment.