From bfa906ba25f4505200d6564ac44f6091c327561e Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Mon, 17 Feb 2020 10:14:49 +0100 Subject: [PATCH 01/10] fix dispose on websocket --- src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index 579be9e7..a79eb597 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -21,7 +21,7 @@ internal class GraphQLHttpWebSocket : IDisposable { private Subject _responseSubject; private readonly Subject _requestSubject = new Subject(); private readonly Subject _exceptionSubject = new Subject(); - private IDisposable _requestSubscription; + private readonly IDisposable _requestSubscription; public WebSocketState WebSocketState => clientWebSocket?.State ?? WebSocketState.None; @@ -272,6 +272,7 @@ private async Task DisposeAsync() { if (!_cancellationTokenSource.IsCancellationRequested) _cancellationTokenSource.Cancel(); await _closeAsync().ConfigureAwait(false); + _requestSubscription?.Dispose(); clientWebSocket?.Dispose(); _cancellationTokenSource.Dispose(); Debug.WriteLine($"websocket {clientWebSocket.GetHashCode()} disposed"); From d3fa68675400501c10b6fe1299a26984cf9221d3 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Mon, 17 Feb 2020 10:35:01 +0100 Subject: [PATCH 02/10] refactor disposing to better match steven clearys example --- .../Websocket/GraphQLHttpWebSocket.cs | 28 ++++++++++++------- .../GraphQL.Client.Serializer.Tests.csproj | 3 +- .../GraphQL.Client.Tests.Common.csproj | 1 + 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index a79eb597..2b2f1db4 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -90,7 +90,7 @@ private Task _backOff() { public Task InitializeWebSocket() { // do not attempt to initialize if cancellation is requested - if (_disposed != null) + if (Completion != null) throw new OperationCanceledException(); lock (_initializeLock) { @@ -254,20 +254,28 @@ private async Task _closeAsync(CancellationToken cancellationToken = default) { await this.clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", cancellationToken).ConfigureAwait(false); } -#endregion + #endregion -#region IDisposable + #region IDisposable + public void Dispose() => Complete(); - private Task _disposed; - private object _disposedLocker = new object(); - public void Dispose() { - // Async disposal as recommended by Stephen Cleary (https://blog.stephencleary.com/2013/03/async-oop-6-disposal.html) - lock (_disposedLocker) { - if (_disposed == null) _disposed = DisposeAsync(); + /// + /// Cancels the current operation, closes the websocket connection and disposes of internal resources. + /// + public void Complete() { + lock (completedLocker) { + if (Completion == null) Completion = CompleteAsync(); } } - private async Task DisposeAsync() { + /// + /// Task to await the completion (a.k.a. disposal) of this websocket. + /// + /// Async disposal as recommended by Stephen Cleary (https://blog.stephencleary.com/2013/03/async-oop-6-disposal.html) + public Task Completion { get; private set; } + + private readonly object completedLocker = new object(); + private async Task CompleteAsync() { Debug.WriteLine($"disposing websocket {clientWebSocket.GetHashCode()}..."); if (!_cancellationTokenSource.IsCancellationRequested) _cancellationTokenSource.Cancel(); diff --git a/tests/GraphQL.Client.Serializer.Tests/GraphQL.Client.Serializer.Tests.csproj b/tests/GraphQL.Client.Serializer.Tests/GraphQL.Client.Serializer.Tests.csproj index 583a8e82..b61c1466 100644 --- a/tests/GraphQL.Client.Serializer.Tests/GraphQL.Client.Serializer.Tests.csproj +++ b/tests/GraphQL.Client.Serializer.Tests/GraphQL.Client.Serializer.Tests.csproj @@ -2,8 +2,7 @@ netcoreapp3.1 - - false + false diff --git a/tests/GraphQL.Client.Tests.Common/GraphQL.Client.Tests.Common.csproj b/tests/GraphQL.Client.Tests.Common/GraphQL.Client.Tests.Common.csproj index 8526c03a..7b8d4982 100644 --- a/tests/GraphQL.Client.Tests.Common/GraphQL.Client.Tests.Common.csproj +++ b/tests/GraphQL.Client.Tests.Common/GraphQL.Client.Tests.Common.csproj @@ -2,6 +2,7 @@ netstandard2.0 + false From 72500b51c4dbc127586aaf95a7d12239d14e7089 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Mon, 17 Feb 2020 10:48:20 +0100 Subject: [PATCH 03/10] cleanup websocket code --- .../Websocket/GraphQLHttpWebSocket.cs | 133 +++++++++--------- 1 file changed, 65 insertions(+), 68 deletions(-) diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index 2b2f1db4..2789a305 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -14,57 +14,57 @@ namespace GraphQL.Client.Http.Websocket { internal class GraphQLHttpWebSocket : IDisposable { private readonly Uri webSocketUri; - private readonly GraphQLHttpClientOptions _options; + private readonly GraphQLHttpClientOptions options; private readonly ArraySegment buffer; - private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - private Subject _responseSubject; - private readonly Subject _requestSubject = new Subject(); - private readonly Subject _exceptionSubject = new Subject(); - private readonly IDisposable _requestSubscription; + private Subject responseSubject; + private readonly Subject requestSubject = new Subject(); + private readonly Subject exceptionSubject = new Subject(); + private readonly IDisposable requestSubscription; public WebSocketState WebSocketState => clientWebSocket?.State ?? WebSocketState.None; + public IObservable ReceiveErrors => exceptionSubject.AsObservable(); + public IObservable ResponseStream { get; } #if NETFRAMEWORK private WebSocket clientWebSocket = null; #else private ClientWebSocket clientWebSocket = null; #endif - private int _connectionAttempt = 0; + private int connectionAttempt = 0; public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClientOptions options) { this.webSocketUri = webSocketUri; - _options = options; + this.options = options; buffer = new ArraySegment(new byte[8192]); - _responseStream = _createResponseStream(); + ResponseStream = _createResponseStream(); - _requestSubscription = _requestSubject.Select(request => Observable.FromAsync(() => _sendWebSocketRequest(request))).Concat().Subscribe(); + requestSubscription = requestSubject.Select(request => Observable.FromAsync(() => _sendWebSocketRequest(request))).Concat().Subscribe(); } - public IObservable ReceiveErrors => _exceptionSubject.AsObservable(); - public IObservable ResponseStream => _responseStream; - public readonly IObservable _responseStream; + #region Send requests public Task SendWebSocketRequest(GraphQLWebSocketRequest request) { - _requestSubject.OnNext(request); + requestSubject.OnNext(request); return request.SendTask(); } private async Task _sendWebSocketRequest(GraphQLWebSocketRequest request) { try { - if (_cancellationTokenSource.Token.IsCancellationRequested) { + if (cancellationTokenSource.Token.IsCancellationRequested) { request.SendCanceled(); return; } await InitializeWebSocket().ConfigureAwait(false); - var requestBytes = _options.JsonSerializer.SerializeToBytes(request); + var requestBytes = options.JsonSerializer.SerializeToBytes(request); await this.clientWebSocket.SendAsync( new ArraySegment(requestBytes), WebSocketMessageType.Text, true, - _cancellationTokenSource.Token).ConfigureAwait(false); + cancellationTokenSource.Token).ConfigureAwait(false); request.SendCompleted(); } catch (Exception e) { @@ -72,28 +72,18 @@ await this.clientWebSocket.SendAsync( } } - public Task InitializeWebSocketTask { get; private set; } = Task.CompletedTask; - - private readonly object _initializeLock = new object(); - -#region Private Methods - - private Task _backOff() { - _connectionAttempt++; - - if (_connectionAttempt == 1) return Task.CompletedTask; + #endregion - var delay = _options.BackOffStrategy(_connectionAttempt - 1); - Debug.WriteLine($"connection attempt #{_connectionAttempt}, backing off for {delay.TotalSeconds} s"); - return Task.Delay(delay); - } + public Task InitializeWebSocketTask { get; private set; } = Task.CompletedTask; + private readonly object initializeLock = new object(); + public Task InitializeWebSocket() { // do not attempt to initialize if cancellation is requested if (Completion != null) throw new OperationCanceledException(); - lock (_initializeLock) { + lock (initializeLock) { // if an initialization task is already running, return that if (InitializeWebSocketTask != null && !InitializeWebSocketTask.IsFaulted && @@ -115,13 +105,13 @@ public Task InitializeWebSocket() { switch (clientWebSocket) { case ClientWebSocket nativeWebSocket: nativeWebSocket.Options.AddSubProtocol("graphql-ws"); - nativeWebSocket.Options.ClientCertificates = ((HttpClientHandler)_options.HttpMessageHandler).ClientCertificates; - nativeWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)_options.HttpMessageHandler).UseDefaultCredentials; + nativeWebSocket.Options.ClientCertificates = ((HttpClientHandler)options.HttpMessageHandler).ClientCertificates; + nativeWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)options.HttpMessageHandler).UseDefaultCredentials; break; case System.Net.WebSockets.Managed.ClientWebSocket managedWebSocket: managedWebSocket.Options.AddSubProtocol("graphql-ws"); - managedWebSocket.Options.ClientCertificates = ((HttpClientHandler)_options.HttpMessageHandler).ClientCertificates; - managedWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)_options.HttpMessageHandler).UseDefaultCredentials; + managedWebSocket.Options.ClientCertificates = ((HttpClientHandler)options.HttpMessageHandler).ClientCertificates; + managedWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)options.HttpMessageHandler).UseDefaultCredentials; break; default: throw new NotSupportedException($"unknown websocket type {clientWebSocket.GetType().Name}"); @@ -129,10 +119,10 @@ public Task InitializeWebSocket() { #else clientWebSocket = new ClientWebSocket(); clientWebSocket.Options.AddSubProtocol("graphql-ws"); - clientWebSocket.Options.ClientCertificates = ((HttpClientHandler)_options.HttpMessageHandler).ClientCertificates; - clientWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)_options.HttpMessageHandler).UseDefaultCredentials; + clientWebSocket.Options.ClientCertificates = ((HttpClientHandler)options.HttpMessageHandler).ClientCertificates; + clientWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)options.HttpMessageHandler).UseDefaultCredentials; #endif - return InitializeWebSocketTask = _connectAsync(_cancellationTokenSource.Token); + return InitializeWebSocketTask = _connectAsync(cancellationTokenSource.Token); } } @@ -144,25 +134,25 @@ private IObservable _createResponseStream() { } private async Task _createResultStream(IObserver observer, CancellationToken token) { - if (_responseSubject == null || _responseSubject.IsDisposed) { - _responseSubject = new Subject(); + if (responseSubject == null || responseSubject.IsDisposed) { + responseSubject = new Subject(); var observable = await _getReceiveResultStream().ConfigureAwait(false); - observable.Subscribe(_responseSubject); + observable.Subscribe(responseSubject); - _responseSubject.Subscribe(_ => { }, ex => { - _exceptionSubject.OnNext(ex); - _responseSubject?.Dispose(); - _responseSubject = null; + responseSubject.Subscribe(_ => { }, ex => { + exceptionSubject.OnNext(ex); + responseSubject?.Dispose(); + responseSubject = null; }, () => { - _responseSubject?.Dispose(); - _responseSubject = null; + responseSubject?.Dispose(); + responseSubject = null; }); } return new CompositeDisposable ( - _responseSubject.Subscribe(observer), + responseSubject.Subscribe(observer), Disposable.Create(() => { Debug.WriteLine("response stream disposed"); }) @@ -180,30 +170,39 @@ private async Task _connectAsync(CancellationToken token) { Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()}"); await clientWebSocket.ConnectAsync(webSocketUri, token).ConfigureAwait(false); Debug.WriteLine($"connection established on websocket {clientWebSocket.GetHashCode()}"); - _connectionAttempt = 1; + connectionAttempt = 1; } catch (Exception e) { - _exceptionSubject.OnNext(e); + exceptionSubject.OnNext(e); throw; } } + + private Task _backOff() { + connectionAttempt++; + + if (connectionAttempt == 1) return Task.CompletedTask; + var delay = options.BackOffStrategy?.Invoke(connectionAttempt - 1) ?? TimeSpan.FromSeconds(5); + Debug.WriteLine($"connection attempt #{connectionAttempt}, backing off for {delay.TotalSeconds} s"); + return Task.Delay(delay); + } - private Task _receiveAsyncTask = null; - private readonly object _receiveTaskLocker = new object(); + private Task receiveAsyncTask = null; + private readonly object receiveTaskLocker = new object(); /// /// wrapper method to pick up the existing request task if already running /// /// private Task _getReceiveTask() { - lock (_receiveTaskLocker) { - if (_receiveAsyncTask == null || - _receiveAsyncTask.IsFaulted || - _receiveAsyncTask.IsCompleted) - _receiveAsyncTask = _receiveResultAsync(); + lock (receiveTaskLocker) { + if (receiveAsyncTask == null || + receiveAsyncTask.IsFaulted || + receiveAsyncTask.IsCompleted) + receiveAsyncTask = _receiveResultAsync(); } - return _receiveAsyncTask; + return receiveAsyncTask; } private async Task _receiveResultAsync() { @@ -213,17 +212,17 @@ private async Task _receiveResultAsync() { using (var ms = new MemoryStream()) { WebSocketReceiveResult webSocketReceiveResult = null; do { - _cancellationTokenSource.Token.ThrowIfCancellationRequested(); + cancellationTokenSource.Token.ThrowIfCancellationRequested(); webSocketReceiveResult = await clientWebSocket.ReceiveAsync(buffer, CancellationToken.None); ms.Write(buffer.Array, buffer.Offset, webSocketReceiveResult.Count); } while (!webSocketReceiveResult.EndOfMessage); - _cancellationTokenSource.Token.ThrowIfCancellationRequested(); + cancellationTokenSource.Token.ThrowIfCancellationRequested(); ms.Seek(0, SeekOrigin.Begin); if (webSocketReceiveResult.MessageType == WebSocketMessageType.Text) { - var response = await _options.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms); + var response = await options.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms); response.MessageBytes = ms.ToArray(); return response; } @@ -254,8 +253,6 @@ private async Task _closeAsync(CancellationToken cancellationToken = default) { await this.clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", cancellationToken).ConfigureAwait(false); } - #endregion - #region IDisposable public void Dispose() => Complete(); @@ -277,12 +274,12 @@ public void Complete() { private readonly object completedLocker = new object(); private async Task CompleteAsync() { Debug.WriteLine($"disposing websocket {clientWebSocket.GetHashCode()}..."); - if (!_cancellationTokenSource.IsCancellationRequested) - _cancellationTokenSource.Cancel(); + if (!cancellationTokenSource.IsCancellationRequested) + cancellationTokenSource.Cancel(); await _closeAsync().ConfigureAwait(false); - _requestSubscription?.Dispose(); + requestSubscription?.Dispose(); clientWebSocket?.Dispose(); - _cancellationTokenSource.Dispose(); + cancellationTokenSource.Dispose(); Debug.WriteLine($"websocket {clientWebSocket.GetHashCode()} disposed"); } #endregion From 56bde94867dcaaea8210ddc404e581be637f2ef6 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Mon, 17 Feb 2020 11:00:46 +0100 Subject: [PATCH 04/10] add some comments --- .../Websocket/GraphQLHttpWebSocket.cs | 98 ++++++++++--------- 1 file changed, 54 insertions(+), 44 deletions(-) diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index 2789a305..4efa0923 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -17,22 +17,23 @@ internal class GraphQLHttpWebSocket : IDisposable { private readonly GraphQLHttpClientOptions options; private readonly ArraySegment buffer; private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - - private Subject responseSubject; private readonly Subject requestSubject = new Subject(); private readonly Subject exceptionSubject = new Subject(); private readonly IDisposable requestSubscription; - public WebSocketState WebSocketState => clientWebSocket?.State ?? WebSocketState.None; - public IObservable ReceiveErrors => exceptionSubject.AsObservable(); - public IObservable ResponseStream { get; } + private int connectionAttempt = 0; + private Subject responseSubject; #if NETFRAMEWORK private WebSocket clientWebSocket = null; #else private ClientWebSocket clientWebSocket = null; #endif - private int connectionAttempt = 0; + + + public WebSocketState WebSocketState => clientWebSocket?.State ?? WebSocketState.None; + public IObservable ReceiveErrors => exceptionSubject.AsObservable(); + public IObservable ResponseStream { get; } public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClientOptions options) { this.webSocketUri = webSocketUri; @@ -74,8 +75,7 @@ await this.clientWebSocket.SendAsync( #endregion - public Task InitializeWebSocketTask { get; private set; } = Task.CompletedTask; - + private Task initializeWebSocketTask = Task.CompletedTask; private readonly object initializeLock = new object(); public Task InitializeWebSocket() { @@ -85,10 +85,10 @@ public Task InitializeWebSocket() { lock (initializeLock) { // if an initialization task is already running, return that - if (InitializeWebSocketTask != null && - !InitializeWebSocketTask.IsFaulted && - !InitializeWebSocketTask.IsCompleted) - return InitializeWebSocketTask; + if (initializeWebSocketTask != null && + !initializeWebSocketTask.IsFaulted && + !initializeWebSocketTask.IsCompleted) + return initializeWebSocketTask; // if the websocket is open, return a completed task if (clientWebSocket != null && clientWebSocket.State == WebSocketState.Open) @@ -122,10 +122,39 @@ public Task InitializeWebSocket() { clientWebSocket.Options.ClientCertificates = ((HttpClientHandler)options.HttpMessageHandler).ClientCertificates; clientWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)options.HttpMessageHandler).UseDefaultCredentials; #endif - return InitializeWebSocketTask = _connectAsync(cancellationTokenSource.Token); + return initializeWebSocketTask = _connectAsync(cancellationTokenSource.Token); } } + private async Task _connectAsync(CancellationToken token) { + try { + await _backOff().ConfigureAwait(false); + Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()}"); + await clientWebSocket.ConnectAsync(webSocketUri, token).ConfigureAwait(false); + Debug.WriteLine($"connection established on websocket {clientWebSocket.GetHashCode()}"); + connectionAttempt = 1; + } + catch (Exception e) { + exceptionSubject.OnNext(e); + throw; + } + } + + /// + /// delay the next connection attempt using + /// + /// + private Task _backOff() { + connectionAttempt++; + + if (connectionAttempt == 1) return Task.CompletedTask; + + var delay = options.BackOffStrategy?.Invoke(connectionAttempt - 1) ?? TimeSpan.FromSeconds(5); + Debug.WriteLine($"connection attempt #{connectionAttempt}, backing off for {delay.TotalSeconds} s"); + return Task.Delay(delay); + } + + private IObservable _createResponseStream() { return Observable.Create(_createResultStream) // complete sequence on OperationCanceledException, this is triggered by the cancellation token on disposal @@ -135,10 +164,16 @@ private IObservable _createResponseStream() { private async Task _createResultStream(IObserver observer, CancellationToken token) { if (responseSubject == null || responseSubject.IsDisposed) { + // create new response subject responseSubject = new Subject(); - var observable = await _getReceiveResultStream().ConfigureAwait(false); - observable.Subscribe(responseSubject); + // initialize and connect websocket + await InitializeWebSocket().ConfigureAwait(false); + + // loop the receive task and subscribe the created subject to the results + Observable.Defer(() => _getReceiveTask().ToObservable()).Repeat().Subscribe(responseSubject); + + // dispose the subject on any error or completion (will be recreated) responseSubject.Subscribe(_ => { }, ex => { exceptionSubject.OnNext(ex); responseSubject?.Dispose(); @@ -159,35 +194,6 @@ private async Task _createResultStream(IObserver> _getReceiveResultStream() { - await InitializeWebSocket().ConfigureAwait(false); - return Observable.Defer(() => _getReceiveTask().ToObservable()).Repeat(); - } - - private async Task _connectAsync(CancellationToken token) { - try { - await _backOff().ConfigureAwait(false); - Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()}"); - await clientWebSocket.ConnectAsync(webSocketUri, token).ConfigureAwait(false); - Debug.WriteLine($"connection established on websocket {clientWebSocket.GetHashCode()}"); - connectionAttempt = 1; - } - catch (Exception e) { - exceptionSubject.OnNext(e); - throw; - } - } - - private Task _backOff() { - connectionAttempt++; - - if (connectionAttempt == 1) return Task.CompletedTask; - - var delay = options.BackOffStrategy?.Invoke(connectionAttempt - 1) ?? TimeSpan.FromSeconds(5); - Debug.WriteLine($"connection attempt #{connectionAttempt}, backing off for {delay.TotalSeconds} s"); - return Task.Delay(delay); - } - private Task receiveAsyncTask = null; private readonly object receiveTaskLocker = new object(); /// @@ -205,6 +211,10 @@ private Task _getReceiveTask() { return receiveAsyncTask; } + /// + /// read a single message from the websocket + /// + /// private async Task _receiveResultAsync() { try { Debug.WriteLine($"receiving data on websocket {clientWebSocket.GetHashCode()} ..."); From 34010007db664be07833b0bcd26a427b4a91eb47 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Mon, 17 Feb 2020 11:14:38 +0100 Subject: [PATCH 05/10] create observable for the websockets connection state --- .../GraphQLWebsocketConnectionState.cs | 7 +++++++ src/GraphQL.Client/GraphQLHttpClient.cs | 6 +++++- src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs | 11 ++++++++++- 3 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 src/GraphQL.Client.Abstractions.Websocket/GraphQLWebsocketConnectionState.cs diff --git a/src/GraphQL.Client.Abstractions.Websocket/GraphQLWebsocketConnectionState.cs b/src/GraphQL.Client.Abstractions.Websocket/GraphQLWebsocketConnectionState.cs new file mode 100644 index 00000000..3ab5a0e2 --- /dev/null +++ b/src/GraphQL.Client.Abstractions.Websocket/GraphQLWebsocketConnectionState.cs @@ -0,0 +1,7 @@ +namespace GraphQL.Client.Abstractions.Websocket { + public enum GraphQLWebsocketConnectionState { + Disconnected, + Connecting, + Connected + } +} diff --git a/src/GraphQL.Client/GraphQLHttpClient.cs b/src/GraphQL.Client/GraphQLHttpClient.cs index 465a0286..a387aa66 100644 --- a/src/GraphQL.Client/GraphQLHttpClient.cs +++ b/src/GraphQL.Client/GraphQLHttpClient.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Linq; using System.Net.Http; using System.Text; using System.Threading; @@ -33,6 +32,11 @@ public class GraphQLHttpClient : IGraphQLClient { /// public IObservable WebSocketReceiveErrors => graphQlHttpWebSocket.ReceiveErrors; + /// + /// the websocket connection state + /// + public IObservable WebsocketConnectionState => + graphQlHttpWebSocket.ConnectionState; #region Constructors diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index 4efa0923..0e44377f 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -19,6 +19,8 @@ internal class GraphQLHttpWebSocket : IDisposable { private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); private readonly Subject requestSubject = new Subject(); private readonly Subject exceptionSubject = new Subject(); + private readonly BehaviorSubject stateSubject = + new BehaviorSubject(GraphQLWebsocketConnectionState.Disconnected); private readonly IDisposable requestSubscription; private int connectionAttempt = 0; @@ -33,6 +35,8 @@ internal class GraphQLHttpWebSocket : IDisposable { public WebSocketState WebSocketState => clientWebSocket?.State ?? WebSocketState.None; public IObservable ReceiveErrors => exceptionSubject.AsObservable(); + public IObservable ConnectionState => stateSubject.DistinctUntilChanged(); + public IObservable ResponseStream { get; } public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClientOptions options) { @@ -95,8 +99,8 @@ public Task InitializeWebSocket() { return Task.CompletedTask; // else (re-)create websocket and connect - //_responseStreamConnection?.Dispose(); clientWebSocket?.Dispose(); + stateSubject.OnNext(GraphQLWebsocketConnectionState.Connecting); #if NETFRAMEWORK // fix websocket not supported on win 7 using @@ -131,10 +135,12 @@ private async Task _connectAsync(CancellationToken token) { await _backOff().ConfigureAwait(false); Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()}"); await clientWebSocket.ConnectAsync(webSocketUri, token).ConfigureAwait(false); + stateSubject.OnNext(GraphQLWebsocketConnectionState.Connected); Debug.WriteLine($"connection established on websocket {clientWebSocket.GetHashCode()}"); connectionAttempt = 1; } catch (Exception e) { + stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); exceptionSubject.OnNext(e); throw; } @@ -178,10 +184,12 @@ private async Task _createResultStream(IObserver { responseSubject?.Dispose(); responseSubject = null; + stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); }); } @@ -261,6 +269,7 @@ private async Task _closeAsync(CancellationToken cancellationToken = default) { Debug.WriteLine($"closing websocket {clientWebSocket.GetHashCode()}"); await this.clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", cancellationToken).ConfigureAwait(false); + stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); } #region IDisposable From c546271770a54758b03dd56f254d5b5ffae34e16 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Mon, 17 Feb 2020 11:29:36 +0100 Subject: [PATCH 06/10] add OnWebsocketConnected callback --- src/GraphQL.Client/GraphQLHttpClient.cs | 4 +-- .../GraphQLHttpClientOptions.cs | 5 ++++ .../Websocket/GraphQLHttpWebSocket.cs | 29 ++++++++++--------- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/GraphQL.Client/GraphQLHttpClient.cs b/src/GraphQL.Client/GraphQLHttpClient.cs index a387aa66..dbf4bcb7 100644 --- a/src/GraphQL.Client/GraphQLHttpClient.cs +++ b/src/GraphQL.Client/GraphQLHttpClient.cs @@ -51,7 +51,7 @@ public GraphQLHttpClient(Action configure) : this(conf public GraphQLHttpClient(GraphQLHttpClientOptions options, HttpClient httpClient) { Options = options; this.HttpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); - this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), Options); + this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), this); Options.JsonSerializer = JsonSerializer.EnsureAssigned(); } @@ -59,7 +59,7 @@ public GraphQLHttpClient(GraphQLHttpClientOptions options, HttpClient httpClient Options = options ?? throw new ArgumentNullException(nameof(options)); Options.JsonSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); this.HttpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); - this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), Options); + this.graphQlHttpWebSocket = new GraphQLHttpWebSocket(GetWebSocketUri(), this); } #endregion diff --git a/src/GraphQL.Client/GraphQLHttpClientOptions.cs b/src/GraphQL.Client/GraphQLHttpClientOptions.cs index 8a9afae8..f6b2b6eb 100644 --- a/src/GraphQL.Client/GraphQLHttpClientOptions.cs +++ b/src/GraphQL.Client/GraphQLHttpClientOptions.cs @@ -49,5 +49,10 @@ public class GraphQLHttpClientOptions { /// Request preprocessing function. Can be used i.e. to inject authorization info into a GraphQL request payload. /// public Func> PreprocessRequest { get; set; } = (request, client) => Task.FromResult(request); + + /// + /// This function is called after successfully establishing a websocket connection but before any regular request is made. + /// + public Func OnWebsocketConnected { get; set; } = client => Task.CompletedTask; } } diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index 0e44377f..b82ac252 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -14,7 +14,7 @@ namespace GraphQL.Client.Http.Websocket { internal class GraphQLHttpWebSocket : IDisposable { private readonly Uri webSocketUri; - private readonly GraphQLHttpClientOptions options; + private readonly GraphQLHttpClient client; private readonly ArraySegment buffer; private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); private readonly Subject requestSubject = new Subject(); @@ -25,6 +25,7 @@ internal class GraphQLHttpWebSocket : IDisposable { private int connectionAttempt = 0; private Subject responseSubject; + private GraphQLHttpClientOptions Options => client.Options; #if NETFRAMEWORK private WebSocket clientWebSocket = null; @@ -39,9 +40,9 @@ internal class GraphQLHttpWebSocket : IDisposable { public IObservable ResponseStream { get; } - public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClientOptions options) { + public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClient client) { this.webSocketUri = webSocketUri; - this.options = options; + this.client = client; buffer = new ArraySegment(new byte[8192]); ResponseStream = _createResponseStream(); @@ -64,7 +65,7 @@ private async Task _sendWebSocketRequest(GraphQLWebSocketRequest request) { } await InitializeWebSocket().ConfigureAwait(false); - var requestBytes = options.JsonSerializer.SerializeToBytes(request); + var requestBytes = Options.JsonSerializer.SerializeToBytes(request); await this.clientWebSocket.SendAsync( new ArraySegment(requestBytes), WebSocketMessageType.Text, @@ -109,13 +110,13 @@ public Task InitializeWebSocket() { switch (clientWebSocket) { case ClientWebSocket nativeWebSocket: nativeWebSocket.Options.AddSubProtocol("graphql-ws"); - nativeWebSocket.Options.ClientCertificates = ((HttpClientHandler)options.HttpMessageHandler).ClientCertificates; - nativeWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)options.HttpMessageHandler).UseDefaultCredentials; + nativeWebSocket.Options.ClientCertificates = ((HttpClientHandler)Options.HttpMessageHandler).ClientCertificates; + nativeWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)Options.HttpMessageHandler).UseDefaultCredentials; break; case System.Net.WebSockets.Managed.ClientWebSocket managedWebSocket: managedWebSocket.Options.AddSubProtocol("graphql-ws"); - managedWebSocket.Options.ClientCertificates = ((HttpClientHandler)options.HttpMessageHandler).ClientCertificates; - managedWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)options.HttpMessageHandler).UseDefaultCredentials; + managedWebSocket.Options.ClientCertificates = ((HttpClientHandler)Options.HttpMessageHandler).ClientCertificates; + managedWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)Options.HttpMessageHandler).UseDefaultCredentials; break; default: throw new NotSupportedException($"unknown websocket type {clientWebSocket.GetType().Name}"); @@ -123,8 +124,8 @@ public Task InitializeWebSocket() { #else clientWebSocket = new ClientWebSocket(); clientWebSocket.Options.AddSubProtocol("graphql-ws"); - clientWebSocket.Options.ClientCertificates = ((HttpClientHandler)options.HttpMessageHandler).ClientCertificates; - clientWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)options.HttpMessageHandler).UseDefaultCredentials; + clientWebSocket.Options.ClientCertificates = ((HttpClientHandler)Options.HttpMessageHandler).ClientCertificates; + clientWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)Options.HttpMessageHandler).UseDefaultCredentials; #endif return initializeWebSocketTask = _connectAsync(cancellationTokenSource.Token); } @@ -136,7 +137,9 @@ private async Task _connectAsync(CancellationToken token) { Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()}"); await clientWebSocket.ConnectAsync(webSocketUri, token).ConfigureAwait(false); stateSubject.OnNext(GraphQLWebsocketConnectionState.Connected); - Debug.WriteLine($"connection established on websocket {clientWebSocket.GetHashCode()}"); + Debug.WriteLine($"connection established on websocket {clientWebSocket.GetHashCode()}, invoking Options.OnWebsocketConnected()"); + await (Options.OnWebsocketConnected?.Invoke(client) ?? Task.CompletedTask); + Debug.WriteLine($"invoking Options.OnWebsocketConnected() on websocket {clientWebSocket.GetHashCode()}"); connectionAttempt = 1; } catch (Exception e) { @@ -155,7 +158,7 @@ private Task _backOff() { if (connectionAttempt == 1) return Task.CompletedTask; - var delay = options.BackOffStrategy?.Invoke(connectionAttempt - 1) ?? TimeSpan.FromSeconds(5); + var delay = Options.BackOffStrategy?.Invoke(connectionAttempt - 1) ?? TimeSpan.FromSeconds(5); Debug.WriteLine($"connection attempt #{connectionAttempt}, backing off for {delay.TotalSeconds} s"); return Task.Delay(delay); } @@ -240,7 +243,7 @@ private async Task _receiveResultAsync() { ms.Seek(0, SeekOrigin.Begin); if (webSocketReceiveResult.MessageType == WebSocketMessageType.Text) { - var response = await options.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms); + var response = await Options.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms); response.MessageBytes = ms.ToArray(); return response; } From 1080536408d34ccae4a99ad4a2235b324dbcaa49 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Tue, 18 Feb 2020 15:05:55 +0100 Subject: [PATCH 07/10] add tests for callback and WebsocketConnectionState --- .../GraphQL.Client.Tests.Common.csproj | 1 + .../Helpers/CallbackMonitor.cs | 115 ++++++++++++++++++ .../Helpers/CallbackTester.cs | 67 ---------- .../Helpers/MiscellaneousExtensions.cs | 14 +++ .../QueryAndMutationTests/Base.cs | 4 +- .../WebsocketTests/Base.cs | 94 ++++++++------ 6 files changed, 186 insertions(+), 109 deletions(-) create mode 100644 tests/GraphQL.Client.Tests.Common/Helpers/CallbackMonitor.cs delete mode 100644 tests/GraphQL.Client.Tests.Common/Helpers/CallbackTester.cs diff --git a/tests/GraphQL.Client.Tests.Common/GraphQL.Client.Tests.Common.csproj b/tests/GraphQL.Client.Tests.Common/GraphQL.Client.Tests.Common.csproj index 7b8d4982..dfbb9146 100644 --- a/tests/GraphQL.Client.Tests.Common/GraphQL.Client.Tests.Common.csproj +++ b/tests/GraphQL.Client.Tests.Common/GraphQL.Client.Tests.Common.csproj @@ -17,6 +17,7 @@ + diff --git a/tests/GraphQL.Client.Tests.Common/Helpers/CallbackMonitor.cs b/tests/GraphQL.Client.Tests.Common/Helpers/CallbackMonitor.cs new file mode 100644 index 00000000..4b623ac6 --- /dev/null +++ b/tests/GraphQL.Client.Tests.Common/Helpers/CallbackMonitor.cs @@ -0,0 +1,115 @@ +using System; +using System.Threading; +using FluentAssertions; +using FluentAssertions.Execution; +using FluentAssertions.Primitives; + +namespace GraphQL.Client.Tests.Common.Helpers { + public class CallbackMonitor { + private readonly ManualResetEventSlim callbackInvoked = new ManualResetEventSlim(); + + /// + /// The timeout for . Defaults to 1 s + /// + public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(1); + + /// + /// Indicates that an update has been received since the last + /// + public bool CallbackInvoked => callbackInvoked.IsSet; + /// + /// The last payload which was received. + /// + public T LastPayload { get; private set; } + + public void Invoke(T param) { + LastPayload = param; + callbackInvoked.Set(); + } + + /// + /// Asserts that a new update has been pushed to the within the configured since the last . + /// If supplied, the action is executed on the submitted payload. + /// + /// action to assert the contents of the payload + public void CallbackShouldHaveBeenInvoked(Action assertPayload = null, TimeSpan? timeout = null) { + try { + callbackInvoked.Wait(timeout ?? Timeout).Should().BeTrue("because the callback method should have been invoked (timeout: {0} s)", + (timeout ?? Timeout).TotalSeconds); + + assertPayload?.Invoke(LastPayload); + } + finally { + Reset(); + } + } + + /// + /// Asserts that no new update has been pushed within the given since the last + /// + /// the time in ms in which no new update must be pushed to the . defaults to 100 + public void CallbackShouldNotHaveBeenInvoked(TimeSpan? timeout = null) { + if (!timeout.HasValue) timeout = TimeSpan.FromMilliseconds(100); + try { + callbackInvoked.Wait(timeout.Value).Should().BeFalse("because the callback method should not have been invoked"); + } + finally { + Reset(); + } + } + + /// + /// Resets the tester class. Should be called before triggering the potential update + /// + public void Reset() { + LastPayload = default(T); + callbackInvoked.Reset(); + } + + + public CallbackAssertions Should() { + return new CallbackAssertions(this); + } + + public class CallbackAssertions : ReferenceTypeAssertions, CallbackAssertions> { + public CallbackAssertions(CallbackMonitor tester) { + Subject = tester; + } + + protected override string Identifier => "callback"; + + public AndWhichConstraint, TPayload> HaveBeenInvokedWithPayload(TimeSpan timeout, + string because = "", params object[] becauseArgs) { + Execute.Assertion + .BecauseOf(because, becauseArgs) + .Given(() => Subject.callbackInvoked.Wait(timeout)) + .ForCondition(isSet => isSet) + .FailWith("Expected {context:callback} to be invoked{reason}, but did not receive a call within {0}", timeout); + + Subject.callbackInvoked.Reset(); + return new AndWhichConstraint, TPayload>(this, Subject.LastPayload); + } + public AndWhichConstraint, TPayload> HaveBeenInvokedWithPayload(string because = "", params object[] becauseArgs) + => HaveBeenInvokedWithPayload(Subject.Timeout, because, becauseArgs); + + public AndConstraint> HaveBeenInvoked(TimeSpan timeout, string because = "", params object[] becauseArgs) + => HaveBeenInvokedWithPayload(timeout, because, becauseArgs); + public AndConstraint> HaveBeenInvoked(string because = "", params object[] becauseArgs) + => HaveBeenInvokedWithPayload(Subject.Timeout, because, becauseArgs); + + public AndConstraint> NotHaveBeenInvoked(TimeSpan timeout, + string because = "", params object[] becauseArgs) { + Execute.Assertion + .BecauseOf(because, becauseArgs) + .Given(() => Subject.callbackInvoked.Wait(timeout)) + .ForCondition(isSet => !isSet) + .FailWith("Expected {context:callback} to not be invoked{reason}, but did receive a call: {0}", Subject.LastPayload); + + Subject.callbackInvoked.Reset(); + return new AndConstraint>(this); + } + public AndConstraint> NotHaveBeenInvoked(string because = "", params object[] becauseArgs) + => NotHaveBeenInvoked(TimeSpan.FromMilliseconds(100), because, becauseArgs); + } + } +} diff --git a/tests/GraphQL.Client.Tests.Common/Helpers/CallbackTester.cs b/tests/GraphQL.Client.Tests.Common/Helpers/CallbackTester.cs deleted file mode 100644 index c8ca29c5..00000000 --- a/tests/GraphQL.Client.Tests.Common/Helpers/CallbackTester.cs +++ /dev/null @@ -1,67 +0,0 @@ -using System; -using System.Threading; -using FluentAssertions; - -namespace GraphQL.Client.Tests.Common.Helpers { - public class CallbackTester { - private ManualResetEventSlim _callbackInvoked { get; } = new ManualResetEventSlim(); - - /// - /// The timeout for . Defaults to 1 s - /// - public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(1); - - /// - /// Indicates that an update has been received since the last - /// - public bool CallbackInvoked => _callbackInvoked.IsSet; - /// - /// The last payload which was received. - /// - public T LastPayload { get; private set; } - - public void Callback(T param) { - LastPayload = param; - _callbackInvoked.Set(); - } - - /// - /// Asserts that a new update has been pushed to the within the configured since the last . - /// If supplied, the action is executed on the submitted payload. - /// - /// action to assert the contents of the payload - public void CallbackShouldHaveBeenInvoked(Action assertPayload = null, TimeSpan? timeout = null) { - try { - _callbackInvoked.Wait(timeout ?? Timeout).Should().BeTrue("because the callback method should have been invoked (timeout: {0} s)", - (timeout ?? Timeout).TotalSeconds); - - assertPayload?.Invoke(LastPayload); - } - finally { - Reset(); - } - } - - /// - /// Asserts that no new update has been pushed within the given since the last - /// - /// the time in ms in which no new update must be pushed to the . defaults to 100 - public void CallbackShouldNotHaveBeenInvoked(TimeSpan? timeout = null) { - if (!timeout.HasValue) timeout = TimeSpan.FromMilliseconds(100); - try { - _callbackInvoked.Wait(timeout.Value).Should().BeFalse("because the callback method should not have been invoked"); - } - finally { - Reset(); - } - } - - /// - /// Resets the tester class. Should be called before triggering the potential update - /// - public void Reset() { - LastPayload = default(T); - _callbackInvoked.Reset(); - } - } -} diff --git a/tests/GraphQL.Client.Tests.Common/Helpers/MiscellaneousExtensions.cs b/tests/GraphQL.Client.Tests.Common/Helpers/MiscellaneousExtensions.cs index 2da34009..0e254672 100644 --- a/tests/GraphQL.Client.Tests.Common/Helpers/MiscellaneousExtensions.cs +++ b/tests/GraphQL.Client.Tests.Common/Helpers/MiscellaneousExtensions.cs @@ -1,4 +1,6 @@ using System.Linq; +using System.Threading.Tasks; +using GraphQL.Client.Http; namespace GraphQL.Client.Tests.Common.Helpers { public static class MiscellaneousExtensions { @@ -7,5 +9,17 @@ public static string RemoveWhitespace(this string input) { .Where(c => !char.IsWhiteSpace(c)) .ToArray()); } + + public static CallbackMonitor ConfigureMonitorForOnWebsocketConnected( + this GraphQLHttpClient client) { + var tester = new CallbackMonitor(); + client.Options.OnWebsocketConnected = c => { + tester.Invoke(c); + return Task.CompletedTask; + }; + return tester; + } + + } } diff --git a/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Base.cs b/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Base.cs index be153ba2..16e73c92 100644 --- a/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Base.cs +++ b/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Base.cs @@ -139,9 +139,9 @@ query Human($id: String!){ [Fact] public async void PreprocessHttpRequestMessageIsCalled() { - var callbackTester = new CallbackTester(); + var callbackTester = new CallbackMonitor(); var graphQLRequest = new GraphQLHttpRequest($"{{ human(id: \"1\") {{ name }} }}") { - PreprocessHttpRequestMessage = callbackTester.Callback + PreprocessHttpRequestMessage = callbackTester.Invoke }; using (var setup = SetupTest()) { diff --git a/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs b/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs index ff1cabe1..fd2d6ffb 100644 --- a/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs +++ b/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs @@ -16,24 +16,20 @@ namespace GraphQL.Integration.Tests.WebsocketTests { public abstract class Base { - protected readonly ITestOutputHelper output; - protected readonly IGraphQLWebsocketJsonSerializer serializer; + protected readonly ITestOutputHelper Output; + protected readonly IGraphQLWebsocketJsonSerializer Serializer; protected IWebHost CreateServer(int port) => WebHostHelpers.CreateServer(port); - public Base(ITestOutputHelper output, IGraphQLWebsocketJsonSerializer serializer) { - this.output = output; - this.serializer = serializer; - } - - public Base(ITestOutputHelper output) { - this.output = output; + protected Base(ITestOutputHelper output, IGraphQLWebsocketJsonSerializer serializer) { + this.Output = output; + this.Serializer = serializer; } [Fact] public async void AssertTestingHarness() { var port = NetworkHelpers.GetFreeTcpPortNumber(); using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, serializer: serializer); + var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); const string message = "some random testing message"; var response = await client.AddMessageAsync(message).ConfigureAwait(false); @@ -47,7 +43,7 @@ public async void AssertTestingHarness() { public async void CanSendRequestViaWebsocket() { var port = NetworkHelpers.GetFreeTcpPortNumber(); using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, true, serializer); + var client = WebHostHelpers.GetGraphQLClient(port, true, Serializer); const string message = "some random testing message"; var response = await client.AddMessageAsync(message).ConfigureAwait(false); @@ -59,7 +55,7 @@ public async void CanSendRequestViaWebsocket() { public async void CanHandleRequestErrorViaWebsocket() { var port = NetworkHelpers.GetFreeTcpPortNumber(); using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, true, serializer); + var client = WebHostHelpers.GetGraphQLClient(port, true, Serializer); var response = await client.SendQueryAsync("this query is formatted quite badly").ConfigureAwait(false); Assert.Single(response.Errors); @@ -79,9 +75,11 @@ public async void CanHandleRequestErrorViaWebsocket() { [Fact] public async void CanCreateObservableSubscription() { var port = NetworkHelpers.GetFreeTcpPortNumber(); - using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, serializer: serializer); + using (CreateServer(port)){ + var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); + var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); await client.InitializeWebsocketConnection(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); Debug.WriteLine("creating subscription stream"); IObservable> observable = client.CreateSubscriptionStream(SubscriptionRequest); @@ -121,14 +119,15 @@ public class MessageAddedContent { public async void CanReconnectWithSameObservable() { var port = NetworkHelpers.GetFreeTcpPortNumber(); using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, serializer: serializer); - await client.InitializeWebsocketConnection(); + var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); + var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); Debug.WriteLine("creating subscription stream"); - IObservable> observable = client.CreateSubscriptionStream(SubscriptionRequest); + var observable = client.CreateSubscriptionStream(SubscriptionRequest); Debug.WriteLine("subscribing..."); var tester = observable.Monitor(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); const string message1 = "Hello World"; var response = await client.AddMessageAsync(message1).ConfigureAwait(false); @@ -143,9 +142,7 @@ public async void CanReconnectWithSameObservable() { .Which.Data.MessageAdded.Content.Should().Be(message2); Debug.WriteLine("disposing subscription..."); - tester.Dispose(); - await Task.Delay(500); - await client.InitializeWebsocketConnection(); + tester.Dispose(); // does not close the websocket connection Debug.WriteLine("creating new subscription..."); tester = observable.Monitor(); @@ -188,17 +185,19 @@ public class UserJoinedContent { [Fact] public async void CanConnectTwoSubscriptionsSimultaneously() { var port = NetworkHelpers.GetFreeTcpPortNumber(); - var callbackTester = new CallbackTester(); - var callbackTester2 = new CallbackTester(); + var callbackTester = new CallbackMonitor(); + var callbackTester2 = new CallbackMonitor(); using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, serializer: serializer); + var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); + var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); await client.InitializeWebsocketConnection(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); Debug.WriteLine("creating subscription stream"); IObservable> observable1 = - client.CreateSubscriptionStream(SubscriptionRequest, callbackTester.Callback); + client.CreateSubscriptionStream(SubscriptionRequest, callbackTester.Invoke); IObservable> observable2 = - client.CreateSubscriptionStream(SubscriptionRequest2, callbackTester2.Callback); + client.CreateSubscriptionStream(SubscriptionRequest2, callbackTester2.Invoke); Debug.WriteLine("subscribing..."); var tester = observable1.Monitor(); @@ -237,15 +236,24 @@ public async void CanConnectTwoSubscriptionsSimultaneously() { public async void CanHandleConnectionTimeout() { var port = NetworkHelpers.GetFreeTcpPortNumber(); var server = CreateServer(port); - var callbackTester = new CallbackTester(); + var errorMonitor = new CallbackMonitor(); + + var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); + var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); + var statusMonitor = client.WebsocketConnectionState.Monitor(); + statusMonitor.Should().HaveReceivedPayload().Which.Should() + .Be(GraphQLWebsocketConnectionState.Disconnected); - var client = WebHostHelpers.GetGraphQLClient(port, serializer: serializer); - await client.InitializeWebsocketConnection(); Debug.WriteLine("creating subscription stream"); - IObservable> observable = client.CreateSubscriptionStream(SubscriptionRequest, callbackTester.Callback); + IObservable> observable = client.CreateSubscriptionStream(SubscriptionRequest, errorMonitor.Invoke); Debug.WriteLine("subscribing..."); var tester = observable.Monitor(); + statusMonitor.Should().HaveReceivedPayload().Which.Should() + .Be(GraphQLWebsocketConnectionState.Connecting); + statusMonitor.Should().HaveReceivedPayload().Which.Should() + .Be(GraphQLWebsocketConnectionState.Connected); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); const string message1 = "Hello World"; var response = await client.AddMessageAsync(message1).ConfigureAwait(false); @@ -255,18 +263,20 @@ public async void CanHandleConnectionTimeout() { Debug.WriteLine("stopping web host..."); await server.StopAsync(CancellationToken.None).ConfigureAwait(false); + server.Dispose(); Debug.WriteLine("web host stopped..."); - callbackTester.CallbackShouldHaveBeenInvoked(exception => { - Assert.IsType(exception); - }, TimeSpan.FromSeconds(10)); + errorMonitor.Should().HaveBeenInvokedWithPayload(TimeSpan.FromSeconds(10)) + .Which.Should().BeOfType(); + statusMonitor.Should().HaveReceivedPayload().Which.Should() + .Be(GraphQLWebsocketConnectionState.Disconnected); - try { - server.Start(); - } - catch (Exception e) { - output.WriteLine($"failed to restart server: {e}"); - } + server = CreateServer(port); + statusMonitor.Should().HaveReceivedPayload(TimeSpan.FromSeconds(10)).Which.Should() + .Be(GraphQLWebsocketConnectionState.Connecting); + statusMonitor.Should().HaveReceivedPayload(TimeSpan.FromSeconds(10)).Which.Should() + .Be(GraphQLWebsocketConnectionState.Connected); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); // disposing the client should complete the subscription client.Dispose(); @@ -279,8 +289,10 @@ public async void CanHandleConnectionTimeout() { public async void CanHandleSubscriptionError() { var port = NetworkHelpers.GetFreeTcpPortNumber(); using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, serializer: serializer); + var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); + var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); await client.InitializeWebsocketConnection(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); Debug.WriteLine("creating subscription stream"); IObservable> observable = client.CreateSubscriptionStream( new GraphQLRequest(@" @@ -309,8 +321,10 @@ public async void CanHandleQueryErrorInSubscription() { var test = new GraphQLRequest("tset", new { test = "blaa" }); - var client = WebHostHelpers.GetGraphQLClient(port, serializer: serializer); + var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); + var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); await client.InitializeWebsocketConnection(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); Debug.WriteLine("creating subscription stream"); IObservable> observable = client.CreateSubscriptionStream( new GraphQLRequest(@" From e12ea5bb6e48e65feddd61031c1af1226a6908c4 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Tue, 18 Feb 2020 16:22:23 +0100 Subject: [PATCH 08/10] fix "Connecting" status --- src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs | 2 +- tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index b82ac252..832fb4cf 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -101,7 +101,6 @@ public Task InitializeWebSocket() { // else (re-)create websocket and connect clientWebSocket?.Dispose(); - stateSubject.OnNext(GraphQLWebsocketConnectionState.Connecting); #if NETFRAMEWORK // fix websocket not supported on win 7 using @@ -134,6 +133,7 @@ public Task InitializeWebSocket() { private async Task _connectAsync(CancellationToken token) { try { await _backOff().ConfigureAwait(false); + stateSubject.OnNext(GraphQLWebsocketConnectionState.Connecting); Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()}"); await clientWebSocket.ConnectAsync(webSocketUri, token).ConfigureAwait(false); stateSubject.OnNext(GraphQLWebsocketConnectionState.Connected); diff --git a/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs b/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs index fd2d6ffb..7e0e06a2 100644 --- a/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs +++ b/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs @@ -237,9 +237,16 @@ public async void CanHandleConnectionTimeout() { var port = NetworkHelpers.GetFreeTcpPortNumber(); var server = CreateServer(port); var errorMonitor = new CallbackMonitor(); + var reconnectBlocker = new ManualResetEventSlim(false); var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); + // configure back-off strategy to allow it to be controlled from within the unit test + client.Options.BackOffStrategy = i => { + reconnectBlocker.Wait(); + return TimeSpan.Zero; + }; + var statusMonitor = client.WebsocketConnectionState.Monitor(); statusMonitor.Should().HaveReceivedPayload().Which.Should() .Be(GraphQLWebsocketConnectionState.Disconnected); @@ -272,6 +279,7 @@ public async void CanHandleConnectionTimeout() { .Be(GraphQLWebsocketConnectionState.Disconnected); server = CreateServer(port); + reconnectBlocker.Set(); statusMonitor.Should().HaveReceivedPayload(TimeSpan.FromSeconds(10)).Which.Should() .Be(GraphQLWebsocketConnectionState.Connecting); statusMonitor.Should().HaveReceivedPayload(TimeSpan.FromSeconds(10)).Which.Should() From 40c910806f8f8a8d15fbb0bf3ad9f4fcd0cfd18a Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Tue, 18 Feb 2020 16:24:00 +0100 Subject: [PATCH 09/10] fix doc comment in src/GraphQL.Client/GraphQLHttpClientOptions.cs Co-Authored-By: Ivan Maximov --- src/GraphQL.Client/GraphQLHttpClientOptions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/GraphQL.Client/GraphQLHttpClientOptions.cs b/src/GraphQL.Client/GraphQLHttpClientOptions.cs index f6b2b6eb..a52c772c 100644 --- a/src/GraphQL.Client/GraphQLHttpClientOptions.cs +++ b/src/GraphQL.Client/GraphQLHttpClientOptions.cs @@ -51,7 +51,7 @@ public class GraphQLHttpClientOptions { public Func> PreprocessRequest { get; set; } = (request, client) => Task.FromResult(request); /// - /// This function is called after successfully establishing a websocket connection but before any regular request is made. + /// This callback is called after successfully establishing a websocket connection but before any regular request is made. /// public Func OnWebsocketConnected { get; set; } = client => Task.CompletedTask; } From d276793d1280324b1bd278d89ff13b267f075980 Mon Sep 17 00:00:00 2001 From: Alexander Rose Date: Tue, 18 Feb 2020 16:48:17 +0100 Subject: [PATCH 10/10] extend timeout on CanHandleSubscriptionError test --- tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs b/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs index 7e0e06a2..b17ba3c0 100644 --- a/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs +++ b/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs @@ -313,7 +313,7 @@ public async void CanHandleSubscriptionError() { Debug.WriteLine("subscribing..."); using (var tester = observable.Monitor()) { - tester.Should().HaveReceivedPayload() + tester.Should().HaveReceivedPayload(TimeSpan.FromSeconds(3)) .Which.Errors.Should().ContainSingle(); tester.Should().HaveCompleted(); client.Dispose();