Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="GraphQL" Version="4.6.0" />
<PackageReference Include="GraphQL.MicrosoftDI" Version="4.6.1" />
<PackageReference Include="GraphQL.NewtonsoftJson" Version="4.6.0" />
<PackageReference Include="GraphQL.SystemReactive" Version="4.1.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>

Expand Down
34 changes: 16 additions & 18 deletions src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GraphQL.Client.Abstractions;
Expand All @@ -21,7 +20,7 @@ namespace GraphQL.Client.LocalExecution
public static class GraphQLLocalExecutionClient
{
public static GraphQLLocalExecutionClient<TSchema> New<TSchema>(TSchema schema, IGraphQLJsonSerializer serializer) where TSchema : ISchema
=> new GraphQLLocalExecutionClient<TSchema>(schema, serializer);
=> new GraphQLLocalExecutionClient<TSchema>(schema, serializer, new SubscriptionDocumentExecuter(), new DocumentWriter());
}

public class GraphQLLocalExecutionClient<TSchema> : IGraphQLClient where TSchema : ISchema
Expand All @@ -41,18 +40,18 @@ public class GraphQLLocalExecutionClient<TSchema> : IGraphQLClient where TSchema

public IGraphQLJsonSerializer Serializer { get; }

private readonly DocumentExecuter _documentExecuter;
private readonly DocumentWriter _documentWriter;
private readonly IDocumentExecuter _documentExecuter;
private readonly IDocumentWriter _documentWriter;

public GraphQLLocalExecutionClient(TSchema schema, IGraphQLJsonSerializer serializer)
public GraphQLLocalExecutionClient(TSchema schema, IGraphQLJsonSerializer serializer, IDocumentExecuter documentExecuter, IDocumentWriter documentWriter)
{
Schema = schema ?? throw new ArgumentNullException(nameof(schema), "no schema configured");
Serializer = serializer ?? throw new ArgumentNullException(nameof(serializer), "please configure the JSON serializer you want to use");

if (!Schema.Initialized)
Schema.Initialize();
_documentExecuter = new DocumentExecuter();
_documentWriter = new DocumentWriter();
_documentExecuter = documentExecuter;
_documentWriter = documentWriter;
}

public void Dispose() { }
Expand All @@ -78,7 +77,7 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
private async Task<GraphQLResponse<TResponse>> ExecuteQueryAsync<TResponse>(GraphQLRequest request, CancellationToken cancellationToken)
{
var executionResult = await ExecuteAsync(request, cancellationToken);
return await ExecutionResultToGraphQLResponse<TResponse>(executionResult, cancellationToken);
return await ExecutionResultToGraphQLResponseAsync<TResponse>(executionResult, cancellationToken);
}
private async Task<IObservable<GraphQLResponse<TResponse>>> ExecuteSubscriptionAsync<TResponse>(GraphQLRequest request, CancellationToken cancellationToken = default)
{
Expand All @@ -87,12 +86,12 @@ private async Task<IObservable<GraphQLResponse<TResponse>>> ExecuteSubscriptionA

return stream == null
? Observable.Throw<GraphQLResponse<TResponse>>(new InvalidOperationException("the GraphQL execution did not return an observable"))
: stream.SelectMany(executionResult => Observable.FromAsync(token => ExecutionResultToGraphQLResponse<TResponse>(executionResult, token)));
: stream.SelectMany(executionResult => Observable.FromAsync(token => ExecutionResultToGraphQLResponseAsync<TResponse>(executionResult, token)));
}

private async Task<ExecutionResult> ExecuteAsync(GraphQLRequest request, CancellationToken cancellationToken = default)
{
var serializedRequest = Serializer.SerializeToString(request);
string serializedRequest = Serializer.SerializeToString(request);

var deserializedRequest = JsonConvert.DeserializeObject<GraphQLRequest>(serializedRequest);
var inputs = deserializedRequest.Variables != null
Expand All @@ -103,22 +102,21 @@ private async Task<ExecutionResult> ExecuteAsync(GraphQLRequest request, Cancell
var result = await _documentExecuter.ExecuteAsync(options =>
{
options.Schema = Schema;
options.OperationName = request.OperationName;
options.Query = request.Query;
options.OperationName = deserializedRequest?.OperationName;
options.Query = deserializedRequest?.Query;
options.Inputs = inputs;
options.CancellationToken = cancellationToken;
});

return result;
}

private async Task<GraphQLResponse<TResponse>> ExecutionResultToGraphQLResponse<TResponse>(ExecutionResult executionResult, CancellationToken cancellationToken = default)
private async Task<GraphQLResponse<TResponse>> ExecutionResultToGraphQLResponseAsync<TResponse>(ExecutionResult executionResult, CancellationToken cancellationToken = default)
{
string json = await _documentWriter.WriteToStringAsync(executionResult);
// serialize result into utf8 byte stream
var resultStream = new MemoryStream(Encoding.UTF8.GetBytes(json));
// deserialize using the provided serializer
return await Serializer.DeserializeFromUtf8StreamAsync<TResponse>(resultStream, cancellationToken);
using var stream = new MemoryStream();
await _documentWriter.WriteAsync(stream, executionResult, cancellationToken);
stream.Seek(0, SeekOrigin.Begin);
return await Serializer.DeserializeFromUtf8StreamAsync<TResponse>(stream, cancellationToken);
}

#endregion
Expand Down
18 changes: 18 additions & 0 deletions src/GraphQL.Client.LocalExecution/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using GraphQL.Client.Abstractions;
using GraphQL.DI;
using GraphQL.MicrosoftDI;
using GraphQL.Types;
using Microsoft.Extensions.DependencyInjection;

namespace GraphQL.Client.LocalExecution
{
public static class ServiceCollectionExtensions
{
public static IGraphQLBuilder AddGraphQLLocalExecutionClient<TSchema>(this IServiceCollection services) where TSchema : ISchema
{
services.AddSingleton<GraphQLLocalExecutionClient<TSchema>>();
services.AddSingleton<IGraphQLClient, GraphQLLocalExecutionClient<TSchema>>();
return services.AddGraphQL();
}
}
}
98 changes: 49 additions & 49 deletions src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
catch (OperationCanceledException) { }
})
);

Debug.WriteLine($"sending start message on subscription {startRequest.Id}");
// send subscription request
try
Expand Down Expand Up @@ -265,55 +265,55 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
/// <returns></returns>
public Task<GraphQLResponse<TResponse>> SendRequest<TResponse>(GraphQLRequest request, CancellationToken cancellationToken = default) =>
Observable.Create<GraphQLResponse<TResponse>>(async observer =>
{
var preprocessedRequest = await _client.Options.PreprocessRequest(request, _client);
var websocketRequest = new GraphQLWebSocketRequest
{
await _client.Options.PreprocessRequest(request, _client);
var websocketRequest = new GraphQLWebSocketRequest
Id = Guid.NewGuid().ToString("N"),
Type = GraphQLWebSocketMessageType.GQL_START,
Payload = preprocessedRequest
};
var observable = IncomingMessageStream
.Where(response => response != null && response.Id == websocketRequest.Id)
.TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE)
.Select(response =>
{
Id = Guid.NewGuid().ToString("N"),
Type = GraphQLWebSocketMessageType.GQL_START,
Payload = request
};
var observable = IncomingMessageStream
.Where(response => response != null && response.Id == websocketRequest.Id)
.TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE)
.Select(response =>
{
Debug.WriteLine($"received response for request {websocketRequest.Id}");
var typedResponse =
_client.JsonSerializer.DeserializeToWebsocketResponse<TResponse>(
response.MessageBytes);
return typedResponse.Payload;
});
Debug.WriteLine($"received response for request {websocketRequest.Id}");
var typedResponse =
_client.JsonSerializer.DeserializeToWebsocketResponse<TResponse>(
response.MessageBytes);
return typedResponse.Payload;
});

try
{
// initialize websocket (completes immediately if socket is already open)
await InitializeWebSocket();
}
catch (Exception e)
{
// subscribe observer to failed observable
return Observable.Throw<GraphQLResponse<TResponse>>(e).Subscribe(observer);
}
try
{
// initialize websocket (completes immediately if socket is already open)
await InitializeWebSocket();
}
catch (Exception e)
{
// subscribe observer to failed observable
return Observable.Throw<GraphQLResponse<TResponse>>(e).Subscribe(observer);
}

var disposable = new CompositeDisposable(
observable.Subscribe(observer)
);
var disposable = new CompositeDisposable(
observable.Subscribe(observer)
);

Debug.WriteLine($"submitting request {websocketRequest.Id}");
// send request
try
{
await QueueWebSocketRequest(websocketRequest);
}
catch (Exception e)
{
Debug.WriteLine(e);
throw;
}
Debug.WriteLine($"submitting request {websocketRequest.Id}");
// send request
try
{
await QueueWebSocketRequest(websocketRequest);
}
catch (Exception e)
{
Debug.WriteLine(e);
throw;
}

return disposable;
})
return disposable;
})
// complete sequence on OperationCanceledException, this is triggered by the cancellation token
.Catch<GraphQLResponse<TResponse>, OperationCanceledException>(exception =>
Observable.Empty<GraphQLResponse<TResponse>>())
Expand Down Expand Up @@ -410,7 +410,7 @@ public Task InitializeWebSocket()
}
catch (NotImplementedException)
{
Debug.WriteLine("property 'ClientWebSocketOptions.ClientCertificates' not implemented by current platform");
Debug.WriteLine("property 'ClientWebSocketOptions.ClientCertificates' not implemented by current platform");
}
catch (PlatformNotSupportedException)
{
Expand All @@ -423,7 +423,7 @@ public Task InitializeWebSocket()
}
catch (NotImplementedException)
{
Debug.WriteLine("property 'ClientWebSocketOptions.UseDefaultCredentials' not implemented by current platform");
Debug.WriteLine("property 'ClientWebSocketOptions.UseDefaultCredentials' not implemented by current platform");
}
catch (PlatformNotSupportedException)
{
Expand Down Expand Up @@ -479,7 +479,7 @@ private async Task ConnectAsync(CancellationToken token)
Debug.WriteLine($"new incoming message stream {_incomingMessages.GetHashCode()} created");

_incomingMessagesConnection = new CompositeDisposable(maintenanceSubscription, connection);

var initRequest = new GraphQLWebSocketRequest
{
Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT,
Expand All @@ -488,7 +488,7 @@ private async Task ConnectAsync(CancellationToken token)

// setup task to await connection_ack message
var ackTask = _incomingMessages
.Where(response => response != null )
.Where(response => response != null)
.TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ACK ||
response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ERROR)
.LastAsync()
Expand Down Expand Up @@ -640,7 +640,7 @@ private async Task CloseAsync()
}

Debug.WriteLine($"send \"connection_terminate\" message");
await SendWebSocketMessageAsync(new GraphQLWebSocketRequest{Type = GraphQLWebSocketMessageType.GQL_CONNECTION_TERMINATE});
await SendWebSocketMessageAsync(new GraphQLWebSocketRequest { Type = GraphQLWebSocketMessageType.GQL_CONNECTION_TERMINATE });

Debug.WriteLine($"closing websocket {_clientWebSocket.GetHashCode()}");
await _clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
Expand Down