From 51f8c0296a222b6ac90c7d49758e01ec6431c04b Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Wed, 5 Feb 2020 13:47:36 +0100 Subject: [PATCH 1/5] send connection init (chelliwell) --- .../Websocket/GraphQLHttpWebsocketHelpers.cs | 58 ++++++++++++------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs b/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs index 2227b777..d43d0026 100644 --- a/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs +++ b/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs @@ -27,34 +27,38 @@ internal static IObservable> CreateSubscriptionStream Id = startRequest.Id, Type = GraphQLWebSocketMessageType.GQL_STOP }; + var initRequest = new GraphQLWebSocketRequest { + Id = startRequest.Id, + Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT, + }; var observable = Observable.Create>(o => graphQlHttpWebSocket.ResponseStream // ignore null values and messages for other requests .Where(response => response != null && response.Id == startRequest.Id) .Subscribe(response => { - // terminate the sequence when a 'complete' message is received - if (response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) { - Debug.WriteLine($"received 'complete' message on subscription {startRequest.Id}"); - o.OnCompleted(); - return; - } - - // post the GraphQLResponse to the stream (even if a GraphQL error occurred) - Debug.WriteLine($"received payload on subscription {startRequest.Id}"); - var typedResponse = - JsonSerializer.Deserialize>(response.MessageBytes, - options.JsonSerializerOptions); - o.OnNext(typedResponse.Payload); - - // in case of a GraphQL error, terminate the sequence after the response has been posted - if (response.Type == GraphQLWebSocketMessageType.GQL_ERROR) { - Debug.WriteLine($"terminating subscription {startRequest.Id} because of a GraphQL error"); - o.OnCompleted(); - } - }, - o.OnError, - o.OnCompleted) + // terminate the sequence when a 'complete' message is received + if (response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) { + Debug.WriteLine($"received 'complete' message on subscription {startRequest.Id}"); + o.OnCompleted(); + return; + } + + // post the GraphQLResponse to the stream (even if a GraphQL error occurred) + Debug.WriteLine($"received payload on subscription {startRequest.Id}"); + var typedResponse = + JsonSerializer.Deserialize>(response.MessageBytes, + options.JsonSerializerOptions); + o.OnNext(typedResponse.Payload); + + // in case of a GraphQL error, terminate the sequence after the response has been posted + if (response.Type == GraphQLWebSocketMessageType.GQL_ERROR) { + Debug.WriteLine($"terminating subscription {startRequest.Id} because of a GraphQL error"); + o.OnCompleted(); + } + }, + o.OnError, + o.OnCompleted) ); try { @@ -81,6 +85,16 @@ internal static IObservable> CreateSubscriptionStream }) ); + // send connection init + Debug.WriteLine($"sending connection init on subscription {startRequest.Id}"); + try { + await graphQlHttpWebSocket.SendWebSocketRequest(initRequest).ConfigureAwait(false); + } + catch (Exception e) { + Console.WriteLine(e); + throw; + } + Debug.WriteLine($"sending initial message on subscription {startRequest.Id}"); // send subscription request try { From c2e0e2b0e538a7625fc598d4dcca1a3441cb091b Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Wed, 5 Feb 2020 13:51:56 +0100 Subject: [PATCH 2/5] use tls credentials on websocket connection (chelliwell) --- src/GraphQL.Client.Http/GraphQLHttpClient.cs | 4 ++-- src/GraphQL.Client.Http/Websocket/GraphQLHttpWebSocket.cs | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/GraphQL.Client.Http/GraphQLHttpClient.cs b/src/GraphQL.Client.Http/GraphQLHttpClient.cs index 8d2c5db5..f6cb447d 100644 --- a/src/GraphQL.Client.Http/GraphQLHttpClient.cs +++ b/src/GraphQL.Client.Http/GraphQLHttpClient.cs @@ -30,13 +30,13 @@ public GraphQLHttpClient(Uri endPoint) : this(o => o.EndPoint = endPoint) { } public GraphQLHttpClient(Action configure) { Options = new GraphQLHttpClientOptions(); configure(Options); - this.httpClient = new HttpClient(); + this.httpClient = new HttpClient(Options.HttpMessageHandler); this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), Options); } public GraphQLHttpClient(GraphQLHttpClientOptions options) { Options = options; - this.httpClient = new HttpClient(); + this.httpClient = new HttpClient(Options.HttpMessageHandler); this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), Options); } diff --git a/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebSocket.cs index 0c516c14..7098a8e5 100644 --- a/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebSocket.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.IO; +using System.Net.Http; using System.Net.WebSockets; using System.Reactive.Disposables; using System.Reactive.Linq; @@ -108,9 +109,13 @@ public Task InitializeWebSocket() { switch (clientWebSocket) { case ClientWebSocket nativeWebSocket: nativeWebSocket.Options.AddSubProtocol("graphql-ws"); + nativeWebSocket.Options.ClientCertificates = ((HttpClientHandler)_options.HttpMessageHandler).ClientCertificates; + nativeWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)_options.HttpMessageHandler).UseDefaultCredentials; break; case System.Net.WebSockets.Managed.ClientWebSocket managedWebSocket: managedWebSocket.Options.AddSubProtocol("graphql-ws"); + managedWebSocket.Options.ClientCertificates = ((HttpClientHandler)_options.HttpMessageHandler).ClientCertificates; + managedWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)_options.HttpMessageHandler).UseDefaultCredentials; break; default: throw new NotSupportedException($"unknown websocket type {clientWebSocket.GetType().Name}"); From 11811529cd78fdc280f8695a189c1a6f196e7903 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Wed, 5 Feb 2020 14:01:52 +0100 Subject: [PATCH 3/5] expose the HttpClient instance from GraphQLHttpClient --- src/GraphQL.Client.Http/GraphQLHttpClient.cs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/GraphQL.Client.Http/GraphQLHttpClient.cs b/src/GraphQL.Client.Http/GraphQLHttpClient.cs index f6cb447d..f8db3ecc 100644 --- a/src/GraphQL.Client.Http/GraphQLHttpClient.cs +++ b/src/GraphQL.Client.Http/GraphQLHttpClient.cs @@ -12,9 +12,13 @@ public class GraphQLHttpClient : IGraphQLClient { private readonly GraphQLHttpWebSocket graphQlHttpWebSocket; private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - private readonly HttpClient httpClient; private readonly ConcurrentDictionary, object> subscriptionStreams = new ConcurrentDictionary, object>(); + /// + /// the instance of which is used internally + /// + public HttpClient HttpClient { get; } + /// /// The Options to be used /// @@ -30,19 +34,19 @@ public GraphQLHttpClient(Uri endPoint) : this(o => o.EndPoint = endPoint) { } public GraphQLHttpClient(Action configure) { Options = new GraphQLHttpClientOptions(); configure(Options); - this.httpClient = new HttpClient(Options.HttpMessageHandler); + this.HttpClient = new HttpClient(Options.HttpMessageHandler); this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), Options); } public GraphQLHttpClient(GraphQLHttpClientOptions options) { Options = options; - this.httpClient = new HttpClient(Options.HttpMessageHandler); + this.HttpClient = new HttpClient(Options.HttpMessageHandler); this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), Options); } public GraphQLHttpClient(GraphQLHttpClientOptions options, HttpClient httpClient) { Options = options; - this.httpClient = httpClient; + this.HttpClient = httpClient; this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), Options); } @@ -99,7 +103,7 @@ public IObservable> CreateSubscriptionStream> SendHttpPostRequestAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { using var httpRequestMessage = this.GenerateHttpRequestMessage(request.SerializeToJson(Options)); - using var httpResponseMessage = await this.httpClient.SendAsync(httpRequestMessage, cancellationToken); + using var httpResponseMessage = await this.HttpClient.SendAsync(httpRequestMessage, cancellationToken); if (!httpResponseMessage.IsSuccessStatusCode) { throw new GraphQLHttpException(httpResponseMessage); } @@ -140,7 +144,7 @@ public void Dispose() { private void _dispose() { disposed = true; - this.httpClient.Dispose(); + this.HttpClient.Dispose(); this.graphQlHttpWebSocket.Dispose(); cancellationTokenSource.Cancel(); cancellationTokenSource.Dispose(); From f69d3ccaff79b2375f5373c421d962c1bfb70e15 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Wed, 5 Feb 2020 14:19:38 +0100 Subject: [PATCH 4/5] request preprocessing #104 --- src/GraphQL.Client.Http/GraphQLHttpClient.cs | 19 ++-- .../GraphQLHttpClientOptions.cs | 7 +- .../Websocket/GraphQLHttpWebsocketHelpers.cs | 90 ++++++++++--------- 3 files changed, 62 insertions(+), 54 deletions(-) diff --git a/src/GraphQL.Client.Http/GraphQLHttpClient.cs b/src/GraphQL.Client.Http/GraphQLHttpClient.cs index f8db3ecc..32ad6ea7 100644 --- a/src/GraphQL.Client.Http/GraphQLHttpClient.cs +++ b/src/GraphQL.Client.Http/GraphQLHttpClient.cs @@ -50,17 +50,17 @@ public GraphQLHttpClient(GraphQLHttpClientOptions options, HttpClient httpClient this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), Options); } + /// public Task> SendQueryAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { return Options.UseWebSocketForQueriesAndMutations - ? this.graphQlHttpWebSocket.Request(request, Options, cancellationToken) + ? this.graphQlHttpWebSocket.Request(request, this, cancellationToken) : this.SendHttpPostRequestAsync(request, cancellationToken); } - public Task> SendMutationAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { - return Options.UseWebSocketForQueriesAndMutations - ? this.graphQlHttpWebSocket.Request(request, Options, cancellationToken) - : this.SendHttpPostRequestAsync(request, cancellationToken); - } + /// + public Task> SendMutationAsync(GraphQLRequest request, + CancellationToken cancellationToken = default) + => SendQueryAsync(request, cancellationToken); /// public IObservable> CreateSubscriptionStream(GraphQLRequest request) { @@ -72,7 +72,7 @@ public IObservable> CreateSubscriptionStream>)subscriptionStreams[key]; - var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, Options, cancellationToken: cancellationTokenSource.Token); + var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, this, cancellationToken: cancellationTokenSource.Token); subscriptionStreams.TryAdd(key, observable); return observable; @@ -88,7 +88,7 @@ public IObservable> CreateSubscriptionStream>)subscriptionStreams[key]; - var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, Options, exceptionHandler, cancellationTokenSource.Token); + var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, this, exceptionHandler, cancellationTokenSource.Token); subscriptionStreams.TryAdd(key, observable); return observable; } @@ -102,7 +102,8 @@ public IObservable> CreateSubscriptionStream> SendHttpPostRequestAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { - using var httpRequestMessage = this.GenerateHttpRequestMessage(request.SerializeToJson(Options)); + var preprocessedRequest = await Options.PreprocessRequest(request, this); + using var httpRequestMessage = this.GenerateHttpRequestMessage(preprocessedRequest.SerializeToJson(Options)); using var httpResponseMessage = await this.HttpClient.SendAsync(httpRequestMessage, cancellationToken); if (!httpResponseMessage.IsSuccessStatusCode) { throw new GraphQLHttpException(httpResponseMessage); diff --git a/src/GraphQL.Client.Http/GraphQLHttpClientOptions.cs b/src/GraphQL.Client.Http/GraphQLHttpClientOptions.cs index ac5a8314..146ff3f4 100644 --- a/src/GraphQL.Client.Http/GraphQLHttpClientOptions.cs +++ b/src/GraphQL.Client.Http/GraphQLHttpClientOptions.cs @@ -2,6 +2,7 @@ using System.Net.Http; using System.Net.Http.Headers; using System.Text.Json; +using System.Threading.Tasks; using Dahomey.Json; namespace GraphQL.Client.Http { @@ -46,6 +47,10 @@ public class GraphQLHttpClientOptions { /// If , the websocket connection is also used for regular queries and mutations /// public bool UseWebSocketForQueriesAndMutations { get; set; } = false; - } + /// + /// Request preprocessing function. Can be used i.e. to inject authorization info into a GraphQL request payload. + /// + public Func> PreprocessRequest { get; set; } = (request, client) => Task.FromResult(request); + } } diff --git a/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs b/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs index d43d0026..5a2a348d 100644 --- a/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs +++ b/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs @@ -13,11 +13,12 @@ public static class GraphQLHttpWebsocketHelpers { internal static IObservable> CreateSubscriptionStream( this GraphQLHttpWebSocket graphQlHttpWebSocket, GraphQLRequest request, - GraphQLHttpClientOptions options, + GraphQLHttpClient client, Action exceptionHandler = null, CancellationToken cancellationToken = default) { return Observable.Defer(() => Observable.Create>(async observer => { + await client.Options.PreprocessRequest(request, client); var startRequest = new GraphQLWebSocketRequest { Id = Guid.NewGuid().ToString("N"), Type = GraphQLWebSocketMessageType.GQL_START, @@ -48,7 +49,7 @@ internal static IObservable> CreateSubscriptionStream Debug.WriteLine($"received payload on subscription {startRequest.Id}"); var typedResponse = JsonSerializer.Deserialize>(response.MessageBytes, - options.JsonSerializerOptions); + client.Options.JsonSerializerOptions); o.OnNext(typedResponse.Payload); // in case of a GraphQL error, terminate the sequence after the response has been posted @@ -154,50 +155,51 @@ internal static IObservable> CreateSubscriptionStream internal static Task> Request( this GraphQLHttpWebSocket graphQlHttpWebSocket, GraphQLRequest request, - GraphQLHttpClientOptions options, + GraphQLHttpClient client, CancellationToken cancellationToken = default) { return Observable.Create>(async observer => { - var websocketRequest = new GraphQLWebSocketRequest { - Id = Guid.NewGuid().ToString("N"), - Type = GraphQLWebSocketMessageType.GQL_START, - Payload = request - }; - var observable = graphQlHttpWebSocket.ResponseStream - .Where(response => response != null && response.Id == websocketRequest.Id) - .TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) - .Select(response => { - Debug.WriteLine($"received response for request {websocketRequest.Id}"); - var typedResponse = - JsonSerializer.Deserialize>(response.MessageBytes, - options.JsonSerializerOptions); - return typedResponse.Payload; - }); - - try { - // intialize websocket (completes immediately if socket is already open) - await graphQlHttpWebSocket.InitializeWebSocket().ConfigureAwait(false); - } - catch (Exception e) { - // subscribe observer to failed observable - return Observable.Throw>(e).Subscribe(observer); - } - - var disposable = new CompositeDisposable( - observable.Subscribe(observer) - ); - - Debug.WriteLine($"submitting request {websocketRequest.Id}"); - // send request - try { - await graphQlHttpWebSocket.SendWebSocketRequest(websocketRequest).ConfigureAwait(false); - } - catch (Exception e) { - Console.WriteLine(e); - throw; - } - - return disposable; - }) + await client.Options.PreprocessRequest(request, client); + var websocketRequest = new GraphQLWebSocketRequest { + Id = Guid.NewGuid().ToString("N"), + Type = GraphQLWebSocketMessageType.GQL_START, + Payload = request + }; + var observable = graphQlHttpWebSocket.ResponseStream + .Where(response => response != null && response.Id == websocketRequest.Id) + .TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) + .Select(response => { + Debug.WriteLine($"received response for request {websocketRequest.Id}"); + var typedResponse = + JsonSerializer.Deserialize>(response.MessageBytes, + client.Options.JsonSerializerOptions); + return typedResponse.Payload; + }); + + try { + // intialize websocket (completes immediately if socket is already open) + await graphQlHttpWebSocket.InitializeWebSocket().ConfigureAwait(false); + } + catch (Exception e) { + // subscribe observer to failed observable + return Observable.Throw>(e).Subscribe(observer); + } + + var disposable = new CompositeDisposable( + observable.Subscribe(observer) + ); + + Debug.WriteLine($"submitting request {websocketRequest.Id}"); + // send request + try { + await graphQlHttpWebSocket.SendWebSocketRequest(websocketRequest).ConfigureAwait(false); + } + catch (Exception e) { + Console.WriteLine(e); + throw; + } + + return disposable; + }) // complete sequence on OperationCanceledException, this is triggered by the cancellation token .Catch, OperationCanceledException>(exception => Observable.Empty>()) From fe6d8992905c2e1a8784d2cf4dd7631083029462 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Wed, 5 Feb 2020 14:48:11 +0100 Subject: [PATCH 5/5] minor refactoring --- src/GraphQL.Client.Http/GraphQLHttpClient.cs | 2 +- .../Websocket/GraphQLHttpWebsocketHelpers.cs | 2 +- .../{SubscriptionsTest.cs => WebsocketTest.cs} | 6 ++---- 3 files changed, 4 insertions(+), 6 deletions(-) rename tests/GraphQL.Integration.Tests/{SubscriptionsTest.cs => WebsocketTest.cs} (98%) diff --git a/src/GraphQL.Client.Http/GraphQLHttpClient.cs b/src/GraphQL.Client.Http/GraphQLHttpClient.cs index 32ad6ea7..43705941 100644 --- a/src/GraphQL.Client.Http/GraphQLHttpClient.cs +++ b/src/GraphQL.Client.Http/GraphQLHttpClient.cs @@ -53,7 +53,7 @@ public GraphQLHttpClient(GraphQLHttpClientOptions options, HttpClient httpClient /// public Task> SendQueryAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { return Options.UseWebSocketForQueriesAndMutations - ? this.graphQlHttpWebSocket.Request(request, this, cancellationToken) + ? this.graphQlHttpWebSocket.SendRequest(request, this, cancellationToken) : this.SendHttpPostRequestAsync(request, cancellationToken); } diff --git a/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs b/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs index 5a2a348d..4d929a83 100644 --- a/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs +++ b/src/GraphQL.Client.Http/Websocket/GraphQLHttpWebsocketHelpers.cs @@ -152,7 +152,7 @@ internal static IObservable> CreateSubscriptionStream .Publish().RefCount(); } - internal static Task> Request( + internal static Task> SendRequest( this GraphQLHttpWebSocket graphQlHttpWebSocket, GraphQLRequest request, GraphQLHttpClient client, diff --git a/tests/GraphQL.Integration.Tests/SubscriptionsTest.cs b/tests/GraphQL.Integration.Tests/WebsocketTest.cs similarity index 98% rename from tests/GraphQL.Integration.Tests/SubscriptionsTest.cs rename to tests/GraphQL.Integration.Tests/WebsocketTest.cs index a70a000c..3e5eafb1 100644 --- a/tests/GraphQL.Integration.Tests/SubscriptionsTest.cs +++ b/tests/GraphQL.Integration.Tests/WebsocketTest.cs @@ -12,14 +12,12 @@ using Xunit.Abstractions; namespace GraphQL.Integration.Tests { - public class SubscriptionsTest { + public class WebsocketTest { private readonly ITestOutputHelper output; private static IWebHost CreateServer(int port) => WebHostHelpers.CreateServer(port); - private static TimeSpan WaitForConnectionDelay = TimeSpan.FromMilliseconds(200); - - public SubscriptionsTest(ITestOutputHelper output) { + public WebsocketTest(ITestOutputHelper output) { this.output = output; }