Skip to content

Commit

Permalink
Reworked the Gateway Configuration (#5308)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib committed Aug 17, 2022
1 parent c73b817 commit 25ae2ba
Show file tree
Hide file tree
Showing 35 changed files with 851 additions and 294 deletions.
Expand Up @@ -166,9 +166,13 @@ public OperationCompiler(InputParser parser)
// more mutations on the compiled selection variants.
// after we have executed all optimizers we will seal the selection variants.
var context = new OperationOptimizerContext(
operationId,
document,
operationDefinition,
schema,
operationType,
variants,
_includeConditions,
_contextData);

foreach (var item in _selectionVariants)
Expand Down
Expand Up @@ -11,23 +11,50 @@ namespace HotChocolate.Execution.Processing;
/// </summary>
public readonly ref struct OperationOptimizerContext
{
private readonly IReadOnlyList<SelectionVariants> _variants;
private readonly SelectionVariants[] _variants;
private readonly IncludeCondition[] _includeConditions;
private readonly ObjectType _rootType;
private readonly Dictionary<string, object?> _contextData;

/// <summary>
/// Initializes a new instance of <see cref="OperationOptimizerContext"/>
/// </summary>
internal OperationOptimizerContext(
string id,
DocumentNode document,
OperationDefinitionNode definition,
ISchema schema,
IObjectType rootType,
ObjectType rootType,
SelectionVariants[] variants,
IncludeCondition[] includeConditions,
Dictionary<string, object?> contextData)
{
_variants = variants;
Id = id;
Document = document;
Definition = definition;
Schema = schema;
RootType = rootType;
ContextData = contextData;
_rootType = rootType;
_variants = variants;
_includeConditions = includeConditions;
_contextData = contextData;
}

/// <summary>
/// Gets the internal unique identifier for this operation.
/// </summary>
public string Id { get; }

/// <summary>
/// Gets the parsed query document that contains the
/// operation-<see cref="Definition" />.
/// </summary>
public DocumentNode Document { get; }

/// <summary>
/// Gets the syntax node representing the operation definition.
/// </summary>
public OperationDefinitionNode Definition { get; }

/// <summary>
/// Gets the schema for which the query is compiled.
/// </summary>
Expand All @@ -36,7 +63,7 @@ namespace HotChocolate.Execution.Processing;
/// <summary>
/// Gets the root type on which the operation is executed.
/// </summary>
public IObjectType RootType { get; }
public IObjectType RootType => _rootType;

/// <summary>
/// Gets the prepared root selections for this operation.
Expand All @@ -52,7 +79,7 @@ namespace HotChocolate.Execution.Processing;
/// The context data dictionary can be used by middleware components and
/// resolvers to store and retrieve data during execution.
/// </summary>
public IDictionary<string, object?> ContextData { get; }
public IDictionary<string, object?> ContextData => _contextData;

/// <summary>
/// Sets the resolvers on the specified <paramref name="selection"/>.
Expand Down Expand Up @@ -82,4 +109,17 @@ namespace HotChocolate.Execution.Processing;
/// </returns>
public FieldDelegate CompileResolverPipeline(IObjectField field, FieldNode selection)
=> OperationCompiler.CreateFieldMiddleware(Schema, field, selection);

/// <summary>
/// Creates a temporary operation object for the optimizer.
/// </summary>
public IOperation CreateOperation()
=> new Operation(
Id,
Document,
Definition,
_rootType,
_variants,
_includeConditions,
_contextData);
}
Expand Up @@ -7,8 +7,7 @@

namespace HotChocolate.Subscriptions.Redis;

public class RedisEventStream<TMessage>
: ISourceStream<TMessage>
public class RedisEventStream<TMessage> : ISourceStream<TMessage>
{
private readonly ChannelMessageQueue _channel;
private readonly IMessageSerializer _messageSerializer;
Expand Down Expand Up @@ -66,8 +65,7 @@ public async ValueTask DisposeAsync()
}
}

private sealed class EnumerateMessages<T>
: IAsyncEnumerable<T>
private sealed class EnumerateMessages<T> : IAsyncEnumerable<T>
{
private readonly ChannelMessageQueue _channel;
private readonly Func<string, T> _messageSerializer;
Expand All @@ -85,8 +83,7 @@ private sealed class EnumerateMessages<T>
{
while (!cancellationToken.IsCancellationRequested)
{
ChannelMessage message = await _channel.ReadAsync(cancellationToken)
.ConfigureAwait(false);
var message = await _channel.ReadAsync(cancellationToken).ConfigureAwait(false);
string body = message.Message;

if (body.Equals(RedisPubSub.Completed, StringComparison.Ordinal))
Expand Down
14 changes: 7 additions & 7 deletions src/HotChocolate/Core/src/Subscriptions.Redis/RedisPubSub.cs
Expand Up @@ -29,17 +29,17 @@ public RedisPubSub(IConnectionMultiplexer connection, IMessageSerializer message
CancellationToken cancellationToken = default)
where TTopic : notnull
{
ISubscriber subscriber = _connection.GetSubscriber();
var serializedTopic = topic is string s ? s : _messageSerializer.Serialize(topic);
var subscriber = _connection.GetSubscriber();
var serializedTopic = topic as string ?? _messageSerializer.Serialize(topic);
var serializedMessage = _messageSerializer.Serialize(message);
await subscriber.PublishAsync(serializedTopic, serializedMessage).ConfigureAwait(false);
}

public async ValueTask CompleteAsync<TTopic>(TTopic topic)
where TTopic : notnull
{
ISubscriber subscriber = _connection.GetSubscriber();
var serializedTopic = topic is string s ? s : _messageSerializer.Serialize(topic);
var subscriber = _connection.GetSubscriber();
var serializedTopic = topic as string ?? _messageSerializer.Serialize(topic);
await subscriber.PublishAsync(serializedTopic, Completed).ConfigureAwait(false);
}

Expand All @@ -48,10 +48,10 @@ public async ValueTask CompleteAsync<TTopic>(TTopic topic)
CancellationToken cancellationToken = default)
where TTopic : notnull
{
ISubscriber subscriber = _connection.GetSubscriber();
var serializedTopic = topic is string s ? s : _messageSerializer.Serialize(topic);
var subscriber = _connection.GetSubscriber();
var serializedTopic = topic as string ?? _messageSerializer.Serialize(topic);

ChannelMessageQueue channel = await subscriber
var channel = await subscriber
.SubscribeAsync(serializedTopic)
.ConfigureAwait(false);

Expand Down
Expand Up @@ -7,6 +7,9 @@

namespace HotChocolate.Execution.Processing;

/// <summary>
/// Represents a compiled GraphQL operation.
/// </summary>
public interface IOperation : IHasReadOnlyContextData
{
/// <summary>
Expand Down
6 changes: 3 additions & 3 deletions src/HotChocolate/Fusion/src/Core/Clients/GraphQLHttpClient.cs
Expand Up @@ -14,11 +14,13 @@ public sealed class GraphQLHttpClient : IGraphQLClient
{
private readonly IHttpClientFactory _httpClientFactory;
private readonly JsonRequestFormatter _formatter = new();
private readonly HttpClient _client;

public GraphQLHttpClient(string schemaName, IHttpClientFactory httpClientFactory)
{
_httpClientFactory = httpClientFactory;
SchemaName = schemaName;
_client =_httpClientFactory.CreateClient(SchemaName);
}

// TODO: naming? SubGraphName?
Expand All @@ -28,10 +30,8 @@ public async Task<GraphQLResponse> ExecuteAsync(GraphQLRequest request, Cancella
{
// todo : this is just a naive dummy implementation
using var writer = new ArrayWriter();
using var client = _httpClientFactory.CreateClient(SchemaName);
using var requestMessage = CreateRequestMessage(writer, request);
using var responseMessage = await client.SendAsync(requestMessage, cancellationToken);
var s = await responseMessage.Content.ReadAsStringAsync(cancellationToken);
using var responseMessage = await _client.SendAsync(requestMessage, cancellationToken);
responseMessage.EnsureSuccessStatusCode(); // TODO : remove for production

await using var contentStream = await responseMessage.Content.ReadAsStreamAsync(cancellationToken);
Expand Down
Expand Up @@ -61,10 +61,11 @@ public static class RequestExecutorBuilderExtensions
.AddDocument(schemaDoc)
.UseField(next => next)
.UseDefaultGatewayPipeline()
.AddOperationCompilerOptimizer<OperationQueryPlanCompiler>()
.ConfigureSchemaServices(
sc =>
{
foreach (var schemaName in configuration.Bindings)
foreach (var schemaName in configuration.SchemaNames)
{
sc.AddSingleton<IGraphQLClient>(
sp => new GraphQLHttpClient(
Expand All @@ -73,8 +74,8 @@ public static class RequestExecutorBuilderExtensions
}
sc.TryAddSingleton(configuration);
sc.TryAddSingleton<RequestPlaner>();
sc.TryAddSingleton<RequirementsPlaner>();
sc.TryAddSingleton<RequestPlanner>();
sc.TryAddSingleton<RequirementsPlanner>();
sc.TryAddSingleton<ExecutionPlanBuilder>();
sc.TryAddSingleton<GraphQLClientFactory>();
sc.TryAddSingleton<FederatedQueryExecutor>();
Expand All @@ -95,7 +96,6 @@ public static class RequestExecutorBuilderExtensions
.UseOperationComplexityAnalyzer()
.UseOperationResolver()
.UseOperationVariableCoercion()
.UseRequest<QueryPlanMiddleware>()
.UseRequest<OperationExecutionMiddleware>();
}
}
Expand Up @@ -6,24 +6,22 @@ namespace HotChocolate.Fusion.Execution;
internal sealed class FederatedQueryContext
{
public FederatedQueryContext(
ISchema schema,
ResultBuilder result,
IOperation operation,
OperationContext operationContext,
QueryPlan plan,
IReadOnlySet<ISelectionSet> requiresFetch)
{
Schema = schema;
Result = result;
Operation = operation;
OperationContext = operationContext;
Plan = plan;
RequiresFetch = requiresFetch;
}

public ISchema Schema { get; }
public OperationContext OperationContext { get; }

public ResultBuilder Result { get; }
public ISchema Schema => OperationContext.Schema;

public IOperation Operation { get; }
public ResultBuilder Result => OperationContext.Result;

public IOperation Operation => OperationContext.Operation;

public QueryPlan Plan { get; }

Expand Down
@@ -1,14 +1,18 @@
using System.Collections.Immutable;
using System.Diagnostics;
using System.Text.Json;
using HotChocolate.Execution;
using HotChocolate.Execution.Processing;
using HotChocolate.Execution.Processing.Tasks;
using HotChocolate.Fusion.Clients;
using HotChocolate.Fusion.Metadata;
using HotChocolate.Language;
using HotChocolate.Types;
using HotChocolate.Types.Introspection;
using HotChocolate.Utilities;
using static HotChocolate.Fusion.Utilities.JsonValueToGraphQLValueConverter;
using IType = HotChocolate.Types.IType;
using ObjectType = HotChocolate.Fusion.Metadata.ObjectType;
using static HotChocolate.Fusion.Utilities.JsonValueToGraphQLValueConverter;

namespace HotChocolate.Fusion.Execution;

Expand All @@ -31,11 +35,39 @@ internal sealed class FederatedQueryExecutor
FederatedQueryContext context,
CancellationToken cancellationToken = default)
{
var scopedContext = ImmutableDictionary<string, object?>.Empty;
var rootSelectionSet = context.Operation.RootSelectionSet;
var rootResult = context.Result.RentObject(rootSelectionSet.Selections.Count);
var rootWorkItem = new WorkItem(rootSelectionSet, rootResult);
context.Fetch.Add(rootWorkItem);

// introspection
if (context.Plan.HasIntrospectionSelections)
{
var rootSelections = rootSelectionSet.Selections;
var operationContext = context.OperationContext;

for (var i = 0; i < rootSelections.Count; i++)
{
var selection = rootSelections[i];
if (selection.Field.IsIntrospectionField)
{
var resolverTask = operationContext.CreateResolverTask(
selection,
operationContext.RootValue,
rootResult,
i,
operationContext.PathFactory.Append(Path.Root, selection.ResponseName),
scopedContext);
resolverTask.BeginExecute(cancellationToken);

// todo : this is just temporary
await resolverTask.WaitForCompletionAsync(cancellationToken);
}
}
}

// federated stuff
while (context.Fetch.Count > 0)
{
await FetchAsync(context, cancellationToken).ConfigureAwait(false);
Expand All @@ -46,7 +78,8 @@ internal sealed class FederatedQueryExecutor
}
}

return QueryResultBuilder.New().SetData(rootResult).Create();
context.Result.SetData(rootResult);
return context.Result.BuildResult();
}

// note: this is inefficient and we want to group here, for now we just want to get it working.
Expand Down Expand Up @@ -80,11 +113,17 @@ private async Task FetchAsync(FederatedQueryContext context, CancellationToken c
{
var executor = _executorFactory.Create(requestNode.Handler.SchemaName);
var request = requestNode.Handler.CreateRequest(variableValues);
var result = await executor.ExecuteAsync(request, ct).ConfigureAwait(false);
var data = requestNode.Handler.UnwrapResult(result);
var response = await executor.ExecuteAsync(request, ct).ConfigureAwait(false);
var data = requestNode.Handler.UnwrapResult(response);

ExtractSelectionResults(selections, request.SchemaName, data, selectionResults);
ExtractVariables(data, exportKeys, variableValues);

context.Result.RegisterForCleanup(() =>
{
response.Dispose();
return default;
});
}

context.Compose.Enqueue(workItem);
Expand All @@ -111,6 +150,14 @@ private async Task FetchAsync(FederatedQueryContext context, CancellationToken c
for (var i = 0; i < selections.Count; i++)
{
var selection = selections[i];

if (selection.Field.IsIntrospectionField &&
(selection.Field.Name.EqualsOrdinal(IntrospectionFields.Schema) ||
selection.Field.Name.EqualsOrdinal(IntrospectionFields.Type)))
{
continue;
}

var selectionResult = selectionResults[i];
var nullable = selection.TypeKind is not TypeKind.NonNull;
var namedType = selection.Type.NamedType();
Expand Down
@@ -1,20 +1,16 @@
using static HotChocolate.Fusion.Metadata.ConfigurationDirectiveNames;

namespace HotChocolate.Fusion.Metadata;

internal static class ConfigurationDirectiveNames
{
public const string VariableDirective = "variable";
public const string FetchDirective = "fetch";
public const string BindDirective = "bind";
public const string SourceDirective = "source";
public const string HttpDirective = "httpClient";
public const string FusionDirective = "fusion";
public const string NameArg = "name";
public const string SelectArg = "select";
public const string TypeArg = "type";
public const string FromArg = "from";
public const string ToArg = "to";
public const string AsArg = "as";
public const string SchemaArg = "schema";
public const string ArgumentArg = "argument";
public const string BaseAddressArg = "baseAddress";
}

0 comments on commit 25ae2ba

Please sign in to comment.