Skip to content

Commit

Permalink
Add by key batching to fusion. (#5934)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib committed Mar 7, 2023
1 parent ea34608 commit f42fe7c
Show file tree
Hide file tree
Showing 92 changed files with 2,075 additions and 896 deletions.

Large diffs are not rendered by default.

Expand Up @@ -12,6 +12,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\Language\src\Language.SyntaxTree\HotChocolate.Language.SyntaxTree.csproj" />
<ProjectReference Include="..\Transport.Sockets\HotChocolate.Transport.Sockets.csproj" />
</ItemGroup>

Expand Down
@@ -1,11 +1,29 @@
using System;
using System.Collections.Generic;
using HotChocolate.Language;
using static HotChocolate.Transport.Sockets.Client.Properties.SocketClientResources;

namespace HotChocolate.Transport.Sockets.Client;

public readonly struct OperationRequest : IEquatable<OperationRequest>
{
public OperationRequest(
string? query,
string? id,
ObjectValueNode? variables,
ObjectValueNode? extensions)
{
if (query is null && id is null && extensions is null)
{
throw new ArgumentException(OperationRequest_QueryOrPersistedQueryId, nameof(query));
}

Query = query;
Id = id;
VariablesNode = variables;
ExtensionsNode = extensions;
}

public OperationRequest(
string? query = null,
string? id = null,
Expand All @@ -29,8 +47,12 @@ namespace HotChocolate.Transport.Sockets.Client;

public IReadOnlyDictionary<string, object?>? Variables { get; }

public ObjectValueNode? VariablesNode { get; }

public IReadOnlyDictionary<string, object?>? Extensions { get; }

public ObjectValueNode? ExtensionsNode { get; }

public bool Equals(OperationRequest other)
=> Id == other.Id &&
Query == other.Query &&
Expand Down
Expand Up @@ -33,7 +33,7 @@ public DataMessageObserver(string id)
throw _error;
}

_messages.TryDequeue(out IDataMessage? message);
_messages.TryDequeue(out var message);
return message;
}

Expand Down
Expand Up @@ -19,7 +19,7 @@ internal sealed class GraphQLOverWebSocketProtocolHandler : IProtocolHandler
CancellationToken cancellationToken = default)
{
var observer = new ConnectionMessageObserver<ConnectionAcceptMessage>(cancellationToken);
using IDisposable subscription = context.Messages.Subscribe(observer);
using var subscription = context.Messages.Subscribe(observer);
await context.Socket.SendConnectionInitMessage(payload, cancellationToken);
await observer.Accepted;
}
Expand All @@ -32,7 +32,7 @@ internal sealed class GraphQLOverWebSocketProtocolHandler : IProtocolHandler
var id = Guid.NewGuid().ToString("N");
var observer = new DataMessageObserver(id);
var completion = new DataCompletion(context.Socket, id);
IDisposable subscription = context.Messages.Subscribe(observer);
var subscription = context.Messages.Subscribe(observer);

await context.Socket.SendSubscribeMessageAsync(id, request, cancellationToken);

Expand Down Expand Up @@ -62,9 +62,9 @@ internal sealed class GraphQLOverWebSocketProtocolHandler : IProtocolHandler
try
{
document = JsonDocument.Parse(message);
JsonElement root = document.RootElement;
var root = document.RootElement;

if (root.TryGetProperty(TypeProp, out JsonElement typeProp))
if (root.TryGetProperty(TypeProp, out var typeProp))
{
if (typeProp.ValueEquals(Utf8Messages.Ping))
{
Expand Down
Expand Up @@ -25,7 +25,7 @@ internal static class MessageHelper
jsonWriter.WritePropertyName(PayloadProp);
JsonSerializer.Serialize(jsonWriter, payload, JsonDefaults.SerializerOptions);
}

jsonWriter.WriteEndObject();
await jsonWriter.FlushAsync(ct).ConfigureAwait(false);

Expand All @@ -44,6 +44,7 @@ internal static class MessageHelper
{
using var arrayWriter = new ArrayWriter();
await using var jsonWriter = new Utf8JsonWriter(arrayWriter, JsonDefaults.WriterOptions);

jsonWriter.WriteStartObject();
jsonWriter.WriteString(IdProp, operationSessionId);
jsonWriter.WriteString(TypeProp, Utf8Messages.Subscribe);
Expand Down
Expand Up @@ -15,7 +15,7 @@ private CompleteMessage(string id)

public static CompleteMessage From(JsonDocument document)
{
JsonElement root = document.RootElement;
var root = document.RootElement;
var id = root.GetProperty(Utf8MessageProperties.IdProp).GetString()!;
return new CompleteMessage(id);
}
Expand Down
Expand Up @@ -18,10 +18,10 @@ private ErrorMessage(string id, OperationResult payload)

public static ErrorMessage From(JsonDocument document)
{
JsonElement root = document.RootElement;
var root = document.RootElement;
var id = root.GetProperty(Utf8MessageProperties.IdProp).GetString()!;

JsonElement payload = root.GetProperty(Utf8MessageProperties.PayloadProp);
var payload = root.GetProperty(Utf8MessageProperties.PayloadProp);
var result = new OperationResult(document, errors: payload);

return new ErrorMessage(id, result);
Expand Down
Expand Up @@ -20,10 +20,10 @@ private NextMessage(string id, OperationResult payload)

public static NextMessage From(JsonDocument document)
{
JsonElement root = document.RootElement;
var root = document.RootElement;
var id = root.GetProperty(IdProp).GetString()!;

JsonElement payload = root.GetProperty(PayloadProp);
var payload = root.GetProperty(PayloadProp);
var result = new OperationResult(
document,
TryGetProperty(payload, DataProp),
Expand All @@ -34,7 +34,7 @@ public static NextMessage From(JsonDocument document)
}

private static JsonElement? TryGetProperty(JsonElement element, ReadOnlySpan<byte> name)
=> element.TryGetProperty(name, out JsonElement property)
=> element.TryGetProperty(name, out var property)
? property
: null;
}
Expand Up @@ -33,7 +33,7 @@ private void Unsubscribe(Subscription subscription)

private void OnNext(IOperationMessage value, ImmutableList<Subscription> subscriptions)
{
foreach (Subscription subscription in subscriptions)
foreach (var subscription in subscriptions)
{
subscription.Observer.OnNext(value);
}
Expand All @@ -43,7 +43,7 @@ private void OnNext(IOperationMessage value, ImmutableList<Subscription> subscri

private void OnError(Exception error, ImmutableList<Subscription> subscriptions)
{
foreach (Subscription subscription in subscriptions)
foreach (var subscription in subscriptions)
{
subscription.Observer.OnError(error);
}
Expand All @@ -53,7 +53,7 @@ private void OnError(Exception error, ImmutableList<Subscription> subscriptions)

private void OnCompleted(ImmutableList<Subscription> subscriptions)
{
foreach (Subscription subscription in subscriptions)
foreach (var subscription in subscriptions)
{
subscription.Observer.OnCompleted();
}
Expand Down
Expand Up @@ -11,7 +11,7 @@

namespace HotChocolate.Transport.Sockets.Client;

public class SocketClient : ISocket
public sealed class SocketClient : ISocket
{
private static readonly IProtocolHandler[] _protocolHandlers =
{
Expand Down Expand Up @@ -156,7 +156,7 @@ private ValueTask InitializeAsync<T>(T payload, CancellationToken cancellationTo
socketResult = await _socket.ReceiveAsync(arraySegment, cancellationToken);

// copy message segment to writer.
Memory<byte> memory = writer.GetMemory(socketResult.Count);
var memory = writer.GetMemory(socketResult.Count);
buffer.AsSpan().Slice(0, socketResult.Count).CopyTo(memory.Span);
writer.Advance(socketResult.Count);
read += socketResult.Count;
Expand Down
Expand Up @@ -208,12 +208,28 @@ public OperationCompiler(InputParser parser)
variants[item.Key] = item.Value;
}

// we will complete the selection variants, sets and selections
// without sealing them so that analyzers in this step can fully
// inspect them.
var variantsSpan = variants.AsSpan();
ref var variantsStart = ref GetReference(variantsSpan);
ref var variantsEnd = ref Unsafe.Add(ref variantsStart, variantsSpan.Length);

while (Unsafe.IsAddressLessThan(ref variantsStart, ref variantsEnd))
{
variantsStart.Complete();
variantsStart = ref Unsafe.Add(ref variantsStart, 1);
}

#if NET5_0_OR_GREATER
ref var optSpace = ref GetReference(AsSpan(_operationOptimizers));
var optSpan = AsSpan(_operationOptimizers);
ref var optStart = ref GetReference(optSpan);
ref var optEnd = ref Unsafe.Add(ref optStart, optSpan.Length);

for (var i = 0; i < _operationOptimizers.Count; i++)
while (Unsafe.IsAddressLessThan(ref optStart, ref optEnd))
{
Unsafe.Add(ref optSpace, i).OptimizeOperation(context);
optStart.OptimizeOperation(context);
optStart = ref Unsafe.Add(ref optStart, 1);
}
#else
for (var i = 0; i < _operationOptimizers.Count; i++)
Expand All @@ -224,11 +240,14 @@ public OperationCompiler(InputParser parser)

CompleteResolvers(schema);

ref var varSpace = ref GetReference(variants.AsSpan());
variantsSpan = variants.AsSpan();
variantsStart = ref GetReference(variantsSpan);
variantsEnd = ref Unsafe.Add(ref variantsStart, variantsSpan.Length);

for (var i = 0; i < _operationOptimizers.Count; i++)
while (Unsafe.IsAddressLessThan(ref variantsStart, ref variantsEnd))
{
Unsafe.Add(ref varSpace, i).Seal();
variantsStart.Seal();
variantsStart = ref Unsafe.Add(ref variantsStart, 1);
}
}

Expand Down
19 changes: 17 additions & 2 deletions src/HotChocolate/Core/src/Execution/Processing/Selection.cs
@@ -1,12 +1,10 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using HotChocolate.Execution.Properties;
using HotChocolate.Language;
using HotChocolate.Resolvers;
using HotChocolate.Types;
using Microsoft.Extensions.ObjectPool;

namespace HotChocolate.Execution.Processing;

Expand Down Expand Up @@ -313,6 +311,23 @@ internal void MarkAsStream(long ifCondition)
_flags |= Flags.Stream;
}

/// <summary>
/// Completes the selection without sealing it.
/// </summary>
internal void Complete(ISelectionSet declaringSelectionSet)
{
Debug.Assert(declaringSelectionSet is not null);

if ((_flags & Flags.Sealed) != Flags.Sealed)
{
DeclaringSelectionSet = declaringSelectionSet;
}

Debug.Assert(
ReferenceEquals(declaringSelectionSet, DeclaringSelectionSet),
"Selections can only belong to a single selectionSet.");
}

internal void Seal(ISelectionSet declaringSelectionSet)
{
if ((_flags & Flags.Sealed) != Flags.Sealed)
Expand Down
15 changes: 15 additions & 0 deletions src/HotChocolate/Core/src/Execution/Processing/SelectionSet.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using HotChocolate.Language;

namespace HotChocolate.Execution.Processing;

Expand Down Expand Up @@ -56,6 +57,20 @@ internal sealed class SelectionSet : ISelectionSet
/// <inheritdoc />
public IReadOnlyList<IFragment> Fragments => _fragments;

/// <summary>
/// Completes the selection set without sealing it.
/// </summary>
internal void Complete()
{
if ((_flags & Flags.Sealed) != Flags.Sealed)
{
for (var i = 0; i < _selections.Length; i++)
{
_selections[i].Complete(this);
}
}
}

internal void Seal()
{
if ((_flags & Flags.Sealed) != Flags.Sealed)
Expand Down

0 comments on commit f42fe7c

Please sign in to comment.