diff --git a/src/GraphQL.Client.LocalExecution/GraphQL.Client.LocalExecution.csproj b/src/GraphQL.Client.LocalExecution/GraphQL.Client.LocalExecution.csproj index d69fc21b..00ef36b3 100644 --- a/src/GraphQL.Client.LocalExecution/GraphQL.Client.LocalExecution.csproj +++ b/src/GraphQL.Client.LocalExecution/GraphQL.Client.LocalExecution.csproj @@ -8,8 +8,9 @@ - + + diff --git a/src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs b/src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs index f2c077f6..bd2b54fa 100644 --- a/src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs +++ b/src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs @@ -4,7 +4,6 @@ using System.Linq; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; -using System.Text; using System.Threading; using System.Threading.Tasks; using GraphQL.Client.Abstractions; @@ -21,7 +20,7 @@ namespace GraphQL.Client.LocalExecution public static class GraphQLLocalExecutionClient { public static GraphQLLocalExecutionClient New(TSchema schema, IGraphQLJsonSerializer serializer) where TSchema : ISchema - => new GraphQLLocalExecutionClient(schema, serializer); + => new GraphQLLocalExecutionClient(schema, serializer, new SubscriptionDocumentExecuter(), new DocumentWriter()); } public class GraphQLLocalExecutionClient : IGraphQLClient where TSchema : ISchema @@ -41,18 +40,18 @@ public class GraphQLLocalExecutionClient : IGraphQLClient where TSchema public IGraphQLJsonSerializer Serializer { get; } - private readonly DocumentExecuter _documentExecuter; - private readonly DocumentWriter _documentWriter; + private readonly IDocumentExecuter _documentExecuter; + private readonly IDocumentWriter _documentWriter; - public GraphQLLocalExecutionClient(TSchema schema, IGraphQLJsonSerializer serializer) + public GraphQLLocalExecutionClient(TSchema schema, IGraphQLJsonSerializer serializer, IDocumentExecuter documentExecuter, IDocumentWriter documentWriter) { Schema = schema ?? throw new ArgumentNullException(nameof(schema), "no schema configured"); Serializer = serializer ?? throw new ArgumentNullException(nameof(serializer), "please configure the JSON serializer you want to use"); if (!Schema.Initialized) Schema.Initialize(); - _documentExecuter = new DocumentExecuter(); - _documentWriter = new DocumentWriter(); + _documentExecuter = documentExecuter; + _documentWriter = documentWriter; } public void Dispose() { } @@ -78,7 +77,7 @@ public IObservable> CreateSubscriptionStream> ExecuteQueryAsync(GraphQLRequest request, CancellationToken cancellationToken) { var executionResult = await ExecuteAsync(request, cancellationToken); - return await ExecutionResultToGraphQLResponse(executionResult, cancellationToken); + return await ExecutionResultToGraphQLResponseAsync(executionResult, cancellationToken); } private async Task>> ExecuteSubscriptionAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { @@ -87,12 +86,12 @@ private async Task>> ExecuteSubscriptionA return stream == null ? Observable.Throw>(new InvalidOperationException("the GraphQL execution did not return an observable")) - : stream.SelectMany(executionResult => Observable.FromAsync(token => ExecutionResultToGraphQLResponse(executionResult, token))); + : stream.SelectMany(executionResult => Observable.FromAsync(token => ExecutionResultToGraphQLResponseAsync(executionResult, token))); } private async Task ExecuteAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { - var serializedRequest = Serializer.SerializeToString(request); + string serializedRequest = Serializer.SerializeToString(request); var deserializedRequest = JsonConvert.DeserializeObject(serializedRequest); var inputs = deserializedRequest.Variables != null @@ -103,8 +102,8 @@ private async Task ExecuteAsync(GraphQLRequest request, Cancell var result = await _documentExecuter.ExecuteAsync(options => { options.Schema = Schema; - options.OperationName = request.OperationName; - options.Query = request.Query; + options.OperationName = deserializedRequest?.OperationName; + options.Query = deserializedRequest?.Query; options.Inputs = inputs; options.CancellationToken = cancellationToken; }); @@ -112,13 +111,12 @@ private async Task ExecuteAsync(GraphQLRequest request, Cancell return result; } - private async Task> ExecutionResultToGraphQLResponse(ExecutionResult executionResult, CancellationToken cancellationToken = default) + private async Task> ExecutionResultToGraphQLResponseAsync(ExecutionResult executionResult, CancellationToken cancellationToken = default) { - string json = await _documentWriter.WriteToStringAsync(executionResult); - // serialize result into utf8 byte stream - var resultStream = new MemoryStream(Encoding.UTF8.GetBytes(json)); - // deserialize using the provided serializer - return await Serializer.DeserializeFromUtf8StreamAsync(resultStream, cancellationToken); + using var stream = new MemoryStream(); + await _documentWriter.WriteAsync(stream, executionResult, cancellationToken); + stream.Seek(0, SeekOrigin.Begin); + return await Serializer.DeserializeFromUtf8StreamAsync(stream, cancellationToken); } #endregion diff --git a/src/GraphQL.Client.LocalExecution/ServiceCollectionExtensions.cs b/src/GraphQL.Client.LocalExecution/ServiceCollectionExtensions.cs new file mode 100644 index 00000000..3d961d14 --- /dev/null +++ b/src/GraphQL.Client.LocalExecution/ServiceCollectionExtensions.cs @@ -0,0 +1,18 @@ +using GraphQL.Client.Abstractions; +using GraphQL.DI; +using GraphQL.MicrosoftDI; +using GraphQL.Types; +using Microsoft.Extensions.DependencyInjection; + +namespace GraphQL.Client.LocalExecution +{ + public static class ServiceCollectionExtensions + { + public static IGraphQLBuilder AddGraphQLLocalExecutionClient(this IServiceCollection services) where TSchema : ISchema + { + services.AddSingleton>(); + services.AddSingleton>(); + return services.AddGraphQL(); + } + } +} diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index ec4ac4b7..9f12d5bf 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -182,7 +182,7 @@ public IObservable> CreateSubscriptionStream> CreateSubscriptionStream public Task> SendRequest(GraphQLRequest request, CancellationToken cancellationToken = default) => Observable.Create>(async observer => + { + var preprocessedRequest = await _client.Options.PreprocessRequest(request, _client); + var websocketRequest = new GraphQLWebSocketRequest { - await _client.Options.PreprocessRequest(request, _client); - var websocketRequest = new GraphQLWebSocketRequest + Id = Guid.NewGuid().ToString("N"), + Type = GraphQLWebSocketMessageType.GQL_START, + Payload = preprocessedRequest + }; + var observable = IncomingMessageStream + .Where(response => response != null && response.Id == websocketRequest.Id) + .TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) + .Select(response => { - Id = Guid.NewGuid().ToString("N"), - Type = GraphQLWebSocketMessageType.GQL_START, - Payload = request - }; - var observable = IncomingMessageStream - .Where(response => response != null && response.Id == websocketRequest.Id) - .TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) - .Select(response => - { - Debug.WriteLine($"received response for request {websocketRequest.Id}"); - var typedResponse = - _client.JsonSerializer.DeserializeToWebsocketResponse( - response.MessageBytes); - return typedResponse.Payload; - }); + Debug.WriteLine($"received response for request {websocketRequest.Id}"); + var typedResponse = + _client.JsonSerializer.DeserializeToWebsocketResponse( + response.MessageBytes); + return typedResponse.Payload; + }); - try - { - // initialize websocket (completes immediately if socket is already open) - await InitializeWebSocket(); - } - catch (Exception e) - { - // subscribe observer to failed observable - return Observable.Throw>(e).Subscribe(observer); - } + try + { + // initialize websocket (completes immediately if socket is already open) + await InitializeWebSocket(); + } + catch (Exception e) + { + // subscribe observer to failed observable + return Observable.Throw>(e).Subscribe(observer); + } - var disposable = new CompositeDisposable( - observable.Subscribe(observer) - ); + var disposable = new CompositeDisposable( + observable.Subscribe(observer) + ); - Debug.WriteLine($"submitting request {websocketRequest.Id}"); - // send request - try - { - await QueueWebSocketRequest(websocketRequest); - } - catch (Exception e) - { - Debug.WriteLine(e); - throw; - } + Debug.WriteLine($"submitting request {websocketRequest.Id}"); + // send request + try + { + await QueueWebSocketRequest(websocketRequest); + } + catch (Exception e) + { + Debug.WriteLine(e); + throw; + } - return disposable; - }) + return disposable; + }) // complete sequence on OperationCanceledException, this is triggered by the cancellation token .Catch, OperationCanceledException>(exception => Observable.Empty>()) @@ -410,7 +410,7 @@ public Task InitializeWebSocket() } catch (NotImplementedException) { - Debug.WriteLine("property 'ClientWebSocketOptions.ClientCertificates' not implemented by current platform"); + Debug.WriteLine("property 'ClientWebSocketOptions.ClientCertificates' not implemented by current platform"); } catch (PlatformNotSupportedException) { @@ -423,7 +423,7 @@ public Task InitializeWebSocket() } catch (NotImplementedException) { - Debug.WriteLine("property 'ClientWebSocketOptions.UseDefaultCredentials' not implemented by current platform"); + Debug.WriteLine("property 'ClientWebSocketOptions.UseDefaultCredentials' not implemented by current platform"); } catch (PlatformNotSupportedException) { @@ -479,7 +479,7 @@ private async Task ConnectAsync(CancellationToken token) Debug.WriteLine($"new incoming message stream {_incomingMessages.GetHashCode()} created"); _incomingMessagesConnection = new CompositeDisposable(maintenanceSubscription, connection); - + var initRequest = new GraphQLWebSocketRequest { Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT, @@ -488,7 +488,7 @@ private async Task ConnectAsync(CancellationToken token) // setup task to await connection_ack message var ackTask = _incomingMessages - .Where(response => response != null ) + .Where(response => response != null) .TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ACK || response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ERROR) .LastAsync() @@ -640,7 +640,7 @@ private async Task CloseAsync() } Debug.WriteLine($"send \"connection_terminate\" message"); - await SendWebSocketMessageAsync(new GraphQLWebSocketRequest{Type = GraphQLWebSocketMessageType.GQL_CONNECTION_TERMINATE}); + await SendWebSocketMessageAsync(new GraphQLWebSocketRequest { Type = GraphQLWebSocketMessageType.GQL_CONNECTION_TERMINATE }); Debug.WriteLine($"closing websocket {_clientWebSocket.GetHashCode()}"); await _clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);