diff --git a/SubscriptionIntegrationTest.ConsoleClient/SubscriptionIntegrationTest.ConsoleClient.csproj b/SubscriptionIntegrationTest.ConsoleClient/SubscriptionIntegrationTest.ConsoleClient.csproj index 83d4884b..bd35758f 100644 --- a/SubscriptionIntegrationTest.ConsoleClient/SubscriptionIntegrationTest.ConsoleClient.csproj +++ b/SubscriptionIntegrationTest.ConsoleClient/SubscriptionIntegrationTest.ConsoleClient.csproj @@ -7,7 +7,7 @@ - + diff --git a/benchmarks/GraphQL.Common.Benchmark/GraphQL.Common.Benchmark.csproj b/benchmarks/GraphQL.Common.Benchmark/GraphQL.Common.Benchmark.csproj index d5ccaa33..657b4f77 100644 --- a/benchmarks/GraphQL.Common.Benchmark/GraphQL.Common.Benchmark.csproj +++ b/benchmarks/GraphQL.Common.Benchmark/GraphQL.Common.Benchmark.csproj @@ -1,12 +1,12 @@ - + Exe - netcoreapp2.2 + netcoreapp3.0 - + diff --git a/samples/GraphQL.Client.Sample/Program.cs b/samples/GraphQL.Client.Sample/Program.cs index b3c9e601..db19a4e3 100644 --- a/samples/GraphQL.Client.Sample/Program.cs +++ b/samples/GraphQL.Client.Sample/Program.cs @@ -1,5 +1,6 @@ using System; using GraphQL.Client.Http; +using GraphQL.Common.Request; namespace GraphQL.Client.Sample { @@ -7,9 +8,8 @@ public class Program { public static void Main(string[] args) { using (var graphQLHttpClient = new GraphQLHttpClient("http://localhost:60341/graphql")) { - var subscriptionResult = graphQLHttpClient.SendSubscribeAsync(@"subscription { messageAdded{content}}").Result; - subscriptionResult.OnReceive += (res) => { Console.WriteLine(res.Data.messageAdded.content); }; - Console.ReadKey(); + var subscription = graphQLHttpClient.CreateSubscriptionStream(new GraphQLRequest(@"subscription { messageAdded{content}}")); + subscription.Subscribe(res => Console.WriteLine(res.Data.messageAdded.content)); } } diff --git a/src/GraphQL.Client/GraphQL.Client.csproj b/src/GraphQL.Client/GraphQL.Client.csproj index 8043bf35..514ee0cd 100644 --- a/src/GraphQL.Client/GraphQL.Client.csproj +++ b/src/GraphQL.Client/GraphQL.Client.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/GraphQL.Client/Http/GraphQLHttpClient.cs b/src/GraphQL.Client/Http/GraphQLHttpClient.cs index d242569b..f0c03452 100644 --- a/src/GraphQL.Client/Http/GraphQLHttpClient.cs +++ b/src/GraphQL.Client/Http/GraphQLHttpClient.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Net.Http; using System.Net.Http.Headers; using System.Net.WebSockets; @@ -41,7 +40,6 @@ public GraphQLHttpClientOptions Options { } /// - [Obsolete("EXPERIMENTAL")] public IObservable WebSocketReceiveErrors => graphQlHttpWebSocket.ReceiveErrors; #endregion @@ -49,6 +47,7 @@ public GraphQLHttpClientOptions Options { internal readonly GraphQLHttpHandler graphQLHttpHandler; internal readonly GraphQLHttpWebSocket graphQlHttpWebSocket; private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + private readonly ConcurrentDictionary> _subscriptionStreams = new ConcurrentDictionary>(); /// /// Initializes a new instance @@ -119,7 +118,7 @@ public Task SendQueryAsync(string query, CancellationToken canc public Task SendQueryAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { return Options.UseWebSocketForQueriesAndMutations - ? this.graphQlHttpWebSocket.Request(request, cancellationToken) + ? this.graphQlHttpWebSocket.SendRequest(request, cancellationToken) : this.graphQLHttpHandler.PostAsync(request, cancellationToken); } @@ -129,31 +128,10 @@ public Task SendMutationAsync(string query, CancellationToken c public Task SendMutationAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { return Options.UseWebSocketForQueriesAndMutations - ? this.graphQlHttpWebSocket.Request(request, cancellationToken) + ? this.graphQlHttpWebSocket.SendRequest(request, cancellationToken) : this.graphQLHttpHandler.PostAsync(request, cancellationToken); } - [Obsolete("EXPERIMENTAL API")] - public Task SendSubscribeAsync(string query, CancellationToken cancellationToken = default) => - this.SendSubscribeAsync(new GraphQLRequest(query), cancellationToken); - - [Obsolete("EXPERIMENTAL API")] - public Task SendSubscribeAsync(GraphQLRequest request, CancellationToken cancellationToken = default) - { - GraphQLHttpSubscriptionResult graphQLSubscriptionResult = _createSubscription(request, cancellationToken); - return Task.FromResult(graphQLSubscriptionResult); - } - - private GraphQLHttpSubscriptionResult _createSubscription(GraphQLRequest request, CancellationToken cancellationToken) - { - if (request == null) { throw new ArgumentNullException(nameof(request)); } - if (request.Query == null) { throw new ArgumentNullException(nameof(request.Query)); } - - var graphQLSubscriptionResult = new GraphQLHttpSubscriptionResult(_getWebSocketUri(), request); - graphQLSubscriptionResult.StartAsync(cancellationToken); - return graphQLSubscriptionResult; - } - private Uri _getWebSocketUri() { var webSocketSchema = this.EndPoint.Scheme == "https" ? "wss" : "ws"; @@ -161,23 +139,21 @@ private Uri _getWebSocketUri() } /// - [Obsolete("EXPERIMENTAL API")] public IObservable CreateSubscriptionStream(GraphQLRequest request) { if (_disposed) throw new ObjectDisposedException(nameof(GraphQLHttpClient)); - if (subscriptionStreams.ContainsKey(request)) - return subscriptionStreams[request]; + if (_subscriptionStreams.ContainsKey(request)) + return _subscriptionStreams[request]; - var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, Options, cancellationToken: _cancellationTokenSource.Token); + var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, cancellationToken: _cancellationTokenSource.Token); - subscriptionStreams.TryAdd(request, observable); + _subscriptionStreams.TryAdd(request, observable); return observable; } /// - [Obsolete("EXPERIMENTAL API")] public IObservable CreateSubscriptionStream(GraphQLRequest request, Action webSocketExceptionHandler) { if (_disposed) @@ -193,22 +169,19 @@ public IObservable CreateSubscriptionStream(GraphQLRequest requ } /// - [Obsolete("EXPERIMENTAL API")] public IObservable CreateSubscriptionStream(GraphQLRequest request, Action exceptionHandler) { if (_disposed) throw new ObjectDisposedException(nameof(GraphQLHttpClient)); - if(subscriptionStreams.ContainsKey(request)) - return subscriptionStreams[request]; + if(_subscriptionStreams.ContainsKey(request)) + return _subscriptionStreams[request]; - var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, Options, exceptionHandler, _cancellationTokenSource.Token); - subscriptionStreams.TryAdd(request, observable); + var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, exceptionHandler, _cancellationTokenSource.Token); + _subscriptionStreams.TryAdd(request, observable); return observable; } - - private ConcurrentDictionary> subscriptionStreams = new ConcurrentDictionary>(); - + /// /// Releases unmanaged resources /// @@ -224,7 +197,7 @@ public void Dispose() } private bool _disposed = false; - private object _disposeLocker = new object(); + private readonly object _disposeLocker = new object(); private void _dispose() { diff --git a/src/GraphQL.Client/Http/GraphQLHttpSubscriptionResult.cs b/src/GraphQL.Client/Http/GraphQLHttpSubscriptionResult.cs deleted file mode 100644 index 34ab6f0a..00000000 --- a/src/GraphQL.Client/Http/GraphQLHttpSubscriptionResult.cs +++ /dev/null @@ -1,90 +0,0 @@ -using System; -using System.Net.WebSockets; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using GraphQL.Common; -using GraphQL.Common.Request; -using GraphQL.Common.Response; -using Newtonsoft.Json; - -namespace GraphQL.Client.Http { - - /// - /// Represents the result of a subscription query - /// - [Obsolete("EXPERIMENTAL API")] - public class GraphQLHttpSubscriptionResult : IGraphQLSubscriptionResult { - - public event Action OnReceive; - - public GraphQLResponse LastResponse { get; private set; } - - private readonly ClientWebSocket clientWebSocket = new ClientWebSocket(); - private readonly Uri webSocketUri; - private readonly GraphQLRequest graphQLRequest; - private readonly byte[] buffer = new byte[1024 * 1024]; - - internal GraphQLHttpSubscriptionResult(Uri webSocketUri, GraphQLRequest graphQLRequest) { - this.webSocketUri = webSocketUri; - this.graphQLRequest = graphQLRequest; - this.clientWebSocket.Options.AddSubProtocol("graphql-ws"); - } - - public async void StartAsync(CancellationToken cancellationToken = default) { - await this.clientWebSocket.ConnectAsync(this.webSocketUri, cancellationToken).ConfigureAwait(false); - if (this.clientWebSocket.State == WebSocketState.Open) { - var arraySegment = new ArraySegment(this.buffer); - await this.SendInitialMessageAsync(cancellationToken).ConfigureAwait(false); - while (this.clientWebSocket.State == WebSocketState.Open) { - var webSocketReceiveResult = await this.clientWebSocket.ReceiveAsync(arraySegment, cancellationToken); - var stringResult = Encoding.UTF8.GetString(arraySegment.Array, 0, webSocketReceiveResult.Count); - var webSocketResponse = JsonConvert.DeserializeObject(stringResult); - if (webSocketResponse != null) - { - var response = (GraphQLResponse) webSocketResponse.Payload; - this.LastResponse = response; - this.OnReceive?.Invoke(response); - } - } - } - } - - public async Task StopAsync(CancellationToken cancellationToken = default) { - if (this.clientWebSocket.State == WebSocketState.Open) { - await this.SendCloseMessageAsync(cancellationToken); - } - await this.clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", cancellationToken); - } - - public void Dispose() { - this.StopAsync().Wait(); - this.clientWebSocket.Dispose(); - } - - private Task SendInitialMessageAsync(CancellationToken cancellationToken = default) { - var webSocketRequest = new GraphQLWebSocketRequest { - Id = "1", - Type = GQLWebSocketMessageType.GQL_START, - Payload = this.graphQLRequest - }; - return this.SendGraphQLSubscriptionRequest(webSocketRequest, cancellationToken); - } - - private Task SendCloseMessageAsync(CancellationToken cancellationToken = default) { - var webSocketRequest = new GraphQLWebSocketRequest { - Id = "1", - Type = GQLWebSocketMessageType.GQL_STOP, - Payload = this.graphQLRequest - }; - return this.SendGraphQLSubscriptionRequest(webSocketRequest); - } - - private Task SendGraphQLSubscriptionRequest(GraphQLWebSocketRequest graphQlWebSocketRequest, CancellationToken cancellationToken = default) { - var webSocketRequestString = JsonConvert.SerializeObject(graphQlWebSocketRequest); - var arraySegmentWebSocketRequest = new ArraySegment(Encoding.UTF8.GetBytes(webSocketRequestString)); - return this.clientWebSocket.SendAsync(arraySegmentWebSocketRequest, WebSocketMessageType.Text, true, cancellationToken); - } - } - -} diff --git a/src/GraphQL.Client/Http/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Http/GraphQLHttpWebSocket.cs index 28ae1e7b..f21f4876 100644 --- a/src/GraphQL.Client/Http/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Http/GraphQLHttpWebSocket.cs @@ -15,39 +15,34 @@ namespace GraphQL.Client.Http { + // ReSharper disable once InconsistentNaming internal class GraphQLHttpWebSocket: IDisposable { - private readonly Uri webSocketUri; + private readonly Uri _webSocketUri; private readonly GraphQLHttpClientOptions _options; - private readonly ArraySegment buffer; + private readonly ArraySegment _buffer; private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + private readonly Subject _requestSubject = new Subject(); + private readonly Subject _exceptionSubject = new Subject(); private Subject _responseSubject; - private Subject _requestSubject = new Subject(); - private Subject _exceptionSubject = new Subject(); - private IDisposable _requestSubscription; - - public WebSocketState WebSocketState => clientWebSocket?.State ?? WebSocketState.None; - - private WebSocket clientWebSocket = null; + private WebSocket _clientWebSocket = null; private int _connectionAttempt = 0; + public IObservable ReceiveErrors => _exceptionSubject.AsObservable(); + public IObservable ResponseStream { get; } + public WebSocketState WebSocketState => _clientWebSocket?.State ?? WebSocketState.None; + public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClientOptions options) { - this.webSocketUri = webSocketUri; + this._webSocketUri = webSocketUri; _options = options; - buffer = new ArraySegment(new byte[8192]); - _responseStream = _createResponseStream(); + _buffer = new ArraySegment(new byte[8192]); + ResponseStream = _createResponseStream(); - _requestSubscription = _requestSubject.Select(request => Observable.FromAsync(() => _sendWebSocketRequest(request))).Concat().Subscribe(); + _requestSubject.Select(request => Observable.FromAsync(() => _sendWebSocketRequest(request))).Concat().Subscribe(); } - public IObservable ReceiveErrors => _exceptionSubject.AsObservable(); - - public IObservable ResponseStream => _responseStream; - public IObservable _responseStream; - //private IDisposable _responseStreamConnection; - public Task SendWebSocketRequest(GraphQLWebSocketRequest request) { _requestSubject.OnNext(request); @@ -66,7 +61,7 @@ private async Task _sendWebSocketRequest(GraphQLWebSocketRequest request) await InitializeWebSocket().ConfigureAwait(false); var webSocketRequestString = JsonConvert.SerializeObject(request); - await this.clientWebSocket.SendAsync( + await this._clientWebSocket.SendAsync( new ArraySegment(Encoding.UTF8.GetBytes(webSocketRequestString)), WebSocketMessageType.Text, true, @@ -79,49 +74,39 @@ await this.clientWebSocket.SendAsync( } } - public Task InitializeWebSocketTask { get; private set; } = Task.CompletedTask; - - private object _initializeLock = new object(); - - #region Private Methods - - private Task _backOff() - { - _connectionAttempt++; - - if(_connectionAttempt == 1) return Task.CompletedTask; - - var delay = _options.BackOffStrategy(_connectionAttempt - 1); - Debug.WriteLine($"connection attempt #{_connectionAttempt}, backing off for {delay.TotalSeconds} s"); - return Task.Delay(delay); - } + private Task _initializeWebSocketTask = Task.CompletedTask; + private readonly object _initializeLock = new object(); + /// + /// Initializes the websocket. If the initialization process is already running, returns the running task. + /// If a healthy websocket connection is already established, a completed task is returned + /// + /// the async task initializing the websocket public Task InitializeWebSocket() { // do not attempt to initialize if cancellation is requested - if(_disposed != null) + if (_disposed != null) throw new OperationCanceledException(); lock (_initializeLock) { // if an initialization task is already running, return that - if(InitializeWebSocketTask != null && - !InitializeWebSocketTask.IsFaulted && - !InitializeWebSocketTask.IsCompleted) - return InitializeWebSocketTask; + if (_initializeWebSocketTask != null && + !_initializeWebSocketTask.IsFaulted && + !_initializeWebSocketTask.IsCompleted) + return _initializeWebSocketTask; // if the websocket is open, return a completed task - if (clientWebSocket != null && clientWebSocket.State == WebSocketState.Open) + if (_clientWebSocket != null && _clientWebSocket.State == WebSocketState.Open) return Task.CompletedTask; // else (re-)create websocket and connect - //_responseStreamConnection?.Dispose(); - clientWebSocket?.Dispose(); + _clientWebSocket?.Dispose(); // fix websocket not supported on win 7 using // https://github.com/PingmanTools/System.Net.WebSockets.Client.Managed - clientWebSocket = SystemClientWebSocket.CreateClientWebSocket(); - switch (clientWebSocket) + _clientWebSocket = SystemClientWebSocket.CreateClientWebSocket(); + switch (_clientWebSocket) { case ClientWebSocket nativeWebSocket: nativeWebSocket.Options.AddSubProtocol("graphql-ws"); @@ -130,12 +115,14 @@ public Task InitializeWebSocket() managedWebSocket.Options.AddSubProtocol("graphql-ws"); break; default: - throw new NotSupportedException($"unknown websocket type {clientWebSocket.GetType().Name}"); + throw new NotSupportedException($"unknown websocket type {_clientWebSocket.GetType().Name}"); } - return InitializeWebSocketTask = _connectAsync(_cancellationTokenSource.Token); + return _initializeWebSocketTask = _connectAsync(_cancellationTokenSource.Token); } } + + #region Private Methods private IObservable _createResponseStream() { @@ -186,10 +173,9 @@ private async Task _connectAsync(CancellationToken token) try { await _backOff().ConfigureAwait(false); - Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()}"); - await clientWebSocket.ConnectAsync(webSocketUri, token).ConfigureAwait(false); - Debug.WriteLine($"connection established on websocket {clientWebSocket.GetHashCode()}"); - //_responseStreamConnection = _responseStream.Connect(); + Debug.WriteLine($"opening websocket {_clientWebSocket.GetHashCode()}"); + await _clientWebSocket.ConnectAsync(_webSocketUri, token).ConfigureAwait(false); + Debug.WriteLine($"connection established on websocket {_clientWebSocket.GetHashCode()}"); _connectionAttempt = 1; } catch (Exception e) @@ -199,9 +185,20 @@ private async Task _connectAsync(CancellationToken token) } } + private Task _backOff() + { + _connectionAttempt++; + + if (_connectionAttempt == 1) return Task.CompletedTask; + + var delay = _options.BackOffStrategy(_connectionAttempt - 1); + Debug.WriteLine($"connection attempt #{_connectionAttempt}, backing off for {delay.TotalSeconds} s"); + return Task.Delay(delay); + } private Task _receiveAsyncTask = null; - private object _receiveTaskLocker = new object(); + private readonly object _receiveTaskLocker = new object(); + /// /// wrapper method to pick up the existing request task if already running /// @@ -223,16 +220,16 @@ private async Task _receiveResultAsync() { try { - Debug.WriteLine($"receiving data on websocket {clientWebSocket.GetHashCode()} ..."); - WebSocketReceiveResult webSocketReceiveResult = null; + Debug.WriteLine($"receiving data on websocket {_clientWebSocket.GetHashCode()} ..."); using (var ms = new MemoryStream()) { + WebSocketReceiveResult webSocketReceiveResult = null; do { _cancellationTokenSource.Token.ThrowIfCancellationRequested(); - webSocketReceiveResult = await clientWebSocket.ReceiveAsync(buffer, CancellationToken.None); - ms.Write(buffer.Array, buffer.Offset, webSocketReceiveResult.Count); + webSocketReceiveResult = await _clientWebSocket.ReceiveAsync(_buffer, CancellationToken.None).ConfigureAwait(false); + ms.Write(_buffer.Array, _buffer.Offset, webSocketReceiveResult.Count); } while (!webSocketReceiveResult.EndOfMessage); @@ -243,8 +240,8 @@ private async Task _receiveResultAsync() { using (var reader = new StreamReader(ms, Encoding.UTF8)) { - var stringResult = await reader.ReadToEndAsync(); - Debug.WriteLine($"data received on websocket {clientWebSocket.GetHashCode()}: {stringResult}"); + var stringResult = await reader.ReadToEndAsync().ConfigureAwait(false); + Debug.WriteLine($"data received on websocket {_clientWebSocket.GetHashCode()}: {stringResult}"); return JsonConvert.DeserializeObject(stringResult); } } @@ -263,20 +260,20 @@ private async Task _receiveResultAsync() private async Task _closeAsync(CancellationToken cancellationToken = default) { - if(clientWebSocket == null) + if(_clientWebSocket == null) return; // don't attempt to close the websocket if it is in a failed state - if (this.clientWebSocket.State != WebSocketState.Open && - this.clientWebSocket.State != WebSocketState.CloseReceived && - this.clientWebSocket.State != WebSocketState.CloseSent) + if (this._clientWebSocket.State != WebSocketState.Open && + this._clientWebSocket.State != WebSocketState.CloseReceived && + this._clientWebSocket.State != WebSocketState.CloseSent) { - Debug.WriteLine($"websocket {clientWebSocket.GetHashCode()} state = {this.clientWebSocket.State}"); + Debug.WriteLine($"websocket {_clientWebSocket.GetHashCode()} state = {this._clientWebSocket.State}"); return; } - Debug.WriteLine($"closing websocket {clientWebSocket.GetHashCode()}"); - await this.clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", cancellationToken).ConfigureAwait(false); + Debug.WriteLine($"closing websocket {_clientWebSocket.GetHashCode()}"); + await this._clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", cancellationToken).ConfigureAwait(false); } #endregion @@ -284,7 +281,7 @@ private async Task _closeAsync(CancellationToken cancellationToken = default) #region IDisposable private Task _disposed; - private object _disposedLocker = new object(); + private readonly object _disposedLocker = new object(); public void Dispose() { // Async disposal as recommended by Stephen Cleary (https://blog.stephencleary.com/2013/03/async-oop-6-disposal.html) @@ -296,13 +293,13 @@ public void Dispose() private async Task DisposeAsync() { - Debug.WriteLine($"disposing websocket {clientWebSocket.GetHashCode()}..."); + Debug.WriteLine($"disposing websocket {_clientWebSocket?.GetHashCode()}..."); if (!_cancellationTokenSource.IsCancellationRequested) _cancellationTokenSource.Cancel(); await _closeAsync().ConfigureAwait(false); - clientWebSocket?.Dispose(); + _clientWebSocket?.Dispose(); _cancellationTokenSource.Dispose(); - Debug.WriteLine($"websocket {clientWebSocket.GetHashCode()} disposed"); + Debug.WriteLine($"websocket {_clientWebSocket?.GetHashCode()} disposed"); } #endregion } diff --git a/src/GraphQL.Client/Http/GraphQLHttpSubscriptionHelpers.cs b/src/GraphQL.Client/Http/GraphQLHttpWebsocketHelpers.cs similarity index 68% rename from src/GraphQL.Client/Http/GraphQLHttpSubscriptionHelpers.cs rename to src/GraphQL.Client/Http/GraphQLHttpWebsocketHelpers.cs index ef71a0ce..1eee72bd 100644 --- a/src/GraphQL.Client/Http/GraphQLHttpSubscriptionHelpers.cs +++ b/src/GraphQL.Client/Http/GraphQLHttpWebsocketHelpers.cs @@ -3,6 +3,7 @@ using System.Net.WebSockets; using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; using GraphQL.Common; @@ -12,12 +13,12 @@ namespace GraphQL.Client.Http { - public static class GraphQLHttpSubscriptionHelpers + // ReSharper disable once InconsistentNaming + public static class GraphQLHttpWebsocketHelpers { internal static IObservable CreateSubscriptionStream( this GraphQLHttpWebSocket graphQlHttpWebSocket, GraphQLRequest request, - GraphQLHttpClientOptions options, Action exceptionHandler = null, CancellationToken cancellationToken = default) { @@ -36,9 +37,7 @@ internal static IObservable CreateSubscriptionStream( Type = GQLWebSocketMessageType.GQL_STOP }; var observable = graphQlHttpWebSocket.ResponseStream - .Where(response => { - return response != null && response.Id == startRequest.Id; - }) + .Where(response => response != null && response.Id == startRequest.Id) .SelectMany(response => { switch (response.Type) @@ -148,74 +147,73 @@ internal static IObservable CreateSubscriptionStream( .Publish().RefCount(); } - internal static async Task Request( + internal static Task SendRequest( this GraphQLHttpWebSocket graphQlHttpWebSocket, GraphQLRequest request, CancellationToken cancellationToken = default) { - return await Observable.Create(async observer => - { - var websocketRequest = new GraphQLWebSocketRequest + return Observable.Create(async observer => { - Id = Guid.NewGuid().ToString("N"), - Type = GQLWebSocketMessageType.GQL_START, - Payload = request - }; - var observable = graphQlHttpWebSocket.ResponseStream - .Where(response => { - return response != null && response.Id == websocketRequest.Id; - }) - .SelectMany(response => + var websocketRequest = new GraphQLWebSocketRequest { - switch (response.Type) + Id = Guid.NewGuid().ToString("N"), + Type = GQLWebSocketMessageType.GQL_START, + Payload = request + }; + var observable = graphQlHttpWebSocket.ResponseStream + .Where(response => response != null && response.Id == websocketRequest.Id) + .SelectMany(response => { - case GQLWebSocketMessageType.GQL_COMPLETE: - Debug.WriteLine($"received 'complete' message on request {websocketRequest.Id}"); - return Observable.Empty(); - case GQLWebSocketMessageType.GQL_ERROR: - Debug.WriteLine($"received 'error' message on request {websocketRequest.Id}"); - return Observable.Throw( - new GraphQLSubscriptionException(response.Payload)); - default: - Debug.WriteLine($"received response for request {websocketRequest.Id}"); - return Observable.Return(((JObject)response?.Payload) - ?.ToObject()); - } - }); + switch (response.Type) + { + case GQLWebSocketMessageType.GQL_COMPLETE: + Debug.WriteLine($"received 'complete' message on request {websocketRequest.Id}"); + return Observable.Empty(); + case GQLWebSocketMessageType.GQL_ERROR: + Debug.WriteLine($"received 'error' message on request {websocketRequest.Id}"); + return Observable.Throw( + new GraphQLSubscriptionException(response.Payload)); + default: + Debug.WriteLine($"received response for request {websocketRequest.Id}"); + return Observable.Return(((JObject) response?.Payload) + ?.ToObject()); + } + }); - try - { - // intialize websocket (completes immediately if socket is already open) - await graphQlHttpWebSocket.InitializeWebSocket().ConfigureAwait(false); - } - catch (Exception e) - { - // subscribe observer to failed observable - return Observable.Throw(e).Subscribe(observer); - } + try + { + // intialize websocket (completes immediately if socket is already open) + await graphQlHttpWebSocket.InitializeWebSocket().ConfigureAwait(false); + } + catch (Exception e) + { + // subscribe observer to failed observable + return Observable.Throw(e).Subscribe(observer); + } - var disposable = new CompositeDisposable( - observable.Subscribe(observer) - ); + var disposable = new CompositeDisposable( + observable.Subscribe(observer) + ); - Debug.WriteLine($"submitting request {websocketRequest.Id}"); - // send request - try - { - await graphQlHttpWebSocket.SendWebSocketRequest(websocketRequest).ConfigureAwait(false); - } - catch (Exception e) - { - Console.WriteLine(e); - throw; - } + Debug.WriteLine($"submitting request {websocketRequest.Id}"); + // send request + try + { + await graphQlHttpWebSocket.SendWebSocketRequest(websocketRequest).ConfigureAwait(false); + } + catch (Exception e) + { + Console.WriteLine(e); + throw; + } - return disposable; - }) - // complete sequence on OperationCanceledException, this is triggered by the cancellation token - .Catch(exception => - Observable.Empty()) - .FirstOrDefaultAsync(); + return disposable; + }) + // complete sequence on OperationCanceledException, this is triggered by the cancellation token + .Catch(exception => + Observable.Empty()) + .FirstOrDefaultAsync() + .ToTask(cancellationToken); } } } diff --git a/src/GraphQL.Client/IGraphQLClient.cs b/src/GraphQL.Client/IGraphQLClient.cs index 098d0524..204da12b 100644 --- a/src/GraphQL.Client/IGraphQLClient.cs +++ b/src/GraphQL.Client/IGraphQLClient.cs @@ -27,16 +27,7 @@ public interface IGraphQLClient : IDisposable { /// The Cancellation Token /// The Response Task SendMutationAsync(GraphQLRequest request, CancellationToken cancellationToken = default); - - /// - /// Send a Subscription async - /// - /// The Request - /// The Cancellation Token - /// The Subscription Response - [Obsolete("EXPERIMENTAL")] - Task SendSubscribeAsync(GraphQLRequest request, CancellationToken cancellationToken = default); - + /// /// Creates a subscription to a GraphQL server. The connection is not established until the first actual subscription is made.
/// All subscriptions made to this stream share the same hot observable.
@@ -44,7 +35,6 @@ public interface IGraphQLClient : IDisposable { ///
/// the GraphQL request for this subscription /// an observable stream for the specified subscription - [Obsolete("EXPERIMENTAL")] IObservable CreateSubscriptionStream(GraphQLRequest request); /// @@ -57,7 +47,6 @@ public interface IGraphQLClient : IDisposable { /// the GraphQL request for this subscription /// an external handler for all s occuring within the sequence /// an observable stream for the specified subscription - [Obsolete("EXPERIMENTAL")] IObservable CreateSubscriptionStream(GraphQLRequest request, Action webSocketExceptionHandler); /// @@ -70,13 +59,11 @@ public interface IGraphQLClient : IDisposable { /// the GraphQL request for this subscription /// an external handler for all s occuring within the sequence /// an observable stream for the specified subscription - [Obsolete("EXPERIMENTAL")] IObservable CreateSubscriptionStream(GraphQLRequest request, Action exceptionHandler); /// /// Publishes all exceptions which occur inside the websocket receive stream (i.e. for logging purposes) /// - [Obsolete("EXPERIMENTAL")] IObservable WebSocketReceiveErrors { get; } } diff --git a/src/GraphQL.Client/IGraphQLSubscriptionResult.cs b/src/GraphQL.Client/IGraphQLSubscriptionResult.cs deleted file mode 100644 index cb0626ee..00000000 --- a/src/GraphQL.Client/IGraphQLSubscriptionResult.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System; -using GraphQL.Common.Response; - -namespace GraphQL.Client { - - /// - /// Represents the Result of a subscription - /// - [Obsolete("EXPERIMENTAL")] - public interface IGraphQLSubscriptionResult : IDisposable { - - /// - /// Event triggered when a new Response is received - /// - event Action OnReceive; - - /// - /// Last Response Received - /// - GraphQLResponse LastResponse { get; } - - } - -} diff --git a/tests/GraphQL.Client.Tests/GraphQL.Client.Tests.csproj b/tests/GraphQL.Client.Tests/GraphQL.Client.Tests.csproj index 1c9a24d9..c49723ea 100644 --- a/tests/GraphQL.Client.Tests/GraphQL.Client.Tests.csproj +++ b/tests/GraphQL.Client.Tests/GraphQL.Client.Tests.csproj @@ -9,7 +9,7 @@ - + @@ -17,4 +17,8 @@ + + + + diff --git a/tests/GraphQL.Common.Tests/GraphQL.Common.Tests.csproj b/tests/GraphQL.Common.Tests/GraphQL.Common.Tests.csproj index 4aae0487..dbcbc5d4 100644 --- a/tests/GraphQL.Common.Tests/GraphQL.Common.Tests.csproj +++ b/tests/GraphQL.Common.Tests/GraphQL.Common.Tests.csproj @@ -11,5 +11,9 @@ + + + + diff --git a/tests/GraphQL.Integration.Tests/GraphQL.Integration.Tests.csproj b/tests/GraphQL.Integration.Tests/GraphQL.Integration.Tests.csproj index 5fc195bc..2fb41339 100644 --- a/tests/GraphQL.Integration.Tests/GraphQL.Integration.Tests.csproj +++ b/tests/GraphQL.Integration.Tests/GraphQL.Integration.Tests.csproj @@ -1,17 +1,17 @@  - netcoreapp2.1 + netcoreapp3.0 false - - - - - + + + + + all diff --git a/tests/GraphQL.Integration.Tests/CallbackTester.cs b/tests/GraphQL.Integration.Tests/Helpers/CallbackTester.cs similarity index 100% rename from tests/GraphQL.Integration.Tests/CallbackTester.cs rename to tests/GraphQL.Integration.Tests/Helpers/CallbackTester.cs diff --git a/tests/GraphQL.Integration.Tests/GraphQLClientExtensions.cs b/tests/GraphQL.Integration.Tests/Helpers/GraphQLClientExtensions.cs similarity index 100% rename from tests/GraphQL.Integration.Tests/GraphQLClientExtensions.cs rename to tests/GraphQL.Integration.Tests/Helpers/GraphQLClientExtensions.cs diff --git a/tests/GraphQL.Integration.Tests/NetworkHelpers.cs b/tests/GraphQL.Integration.Tests/Helpers/NetworkHelpers.cs similarity index 100% rename from tests/GraphQL.Integration.Tests/NetworkHelpers.cs rename to tests/GraphQL.Integration.Tests/Helpers/NetworkHelpers.cs diff --git a/tests/GraphQL.Integration.Tests/ObservableTester.cs b/tests/GraphQL.Integration.Tests/Helpers/ObservableTester.cs similarity index 100% rename from tests/GraphQL.Integration.Tests/ObservableTester.cs rename to tests/GraphQL.Integration.Tests/Helpers/ObservableTester.cs diff --git a/tests/GraphQL.Integration.Tests/WebApplicationFactoryExtensions.cs b/tests/GraphQL.Integration.Tests/Helpers/WebApplicationFactoryExtensions.cs similarity index 100% rename from tests/GraphQL.Integration.Tests/WebApplicationFactoryExtensions.cs rename to tests/GraphQL.Integration.Tests/Helpers/WebApplicationFactoryExtensions.cs diff --git a/tests/GraphQL.Integration.Tests/SubscriptionsTest.cs b/tests/GraphQL.Integration.Tests/WebsocketTests.cs similarity index 93% rename from tests/GraphQL.Integration.Tests/SubscriptionsTest.cs rename to tests/GraphQL.Integration.Tests/WebsocketTests.cs index 635a0d1c..955506e9 100644 --- a/tests/GraphQL.Integration.Tests/SubscriptionsTest.cs +++ b/tests/GraphQL.Integration.Tests/WebsocketTests.cs @@ -13,7 +13,7 @@ namespace GraphQL.Integration.Tests { - public class SubscriptionsTest + public class WebsocketTests { public static IWebHost CreateServer(int port) { @@ -37,16 +37,15 @@ public static IWebHost CreateServer(int port) return host; } - private readonly IWebHost _server; - - public SubscriptionsTest() + public WebsocketTests() { } - private GraphQLHttpClient GetGraphQLClient(int port) + private GraphQLHttpClient GetGraphQLClient(int port, bool queriesViaWebsocket = false) => new GraphQLHttpClient(new GraphQLHttpClientOptions { EndPoint = new Uri($"http://localhost:{port}/graphql"), + UseWebSocketForQueriesAndMutations = queriesViaWebsocket }); @@ -65,6 +64,21 @@ public async void AssertTestingHarness() } } + [Fact] + public async void CanSendQueryViaWebsocket() + { + var port = NetworkHelpers.GetFreeTcpPortNumber(); + using (CreateServer(port)) + { + var client = GetGraphQLClient(port, true); + + const string message = "some random testing message"; + var response = await client.AddMessageAsync(message).ConfigureAwait(false); + + Assert.Equal(message, (string)response.Data.addMessage.content); + } + } + private const string SubscriptionQuery = @" subscription { messageAdded{ diff --git a/tests/IntegrationTestServer/IntegrationTestServer.csproj b/tests/IntegrationTestServer/IntegrationTestServer.csproj index cfba48cd..18a9fc43 100644 --- a/tests/IntegrationTestServer/IntegrationTestServer.csproj +++ b/tests/IntegrationTestServer/IntegrationTestServer.csproj @@ -1,7 +1,7 @@ - + - netcoreapp2.1 + netcoreapp3.0 @@ -15,10 +15,8 @@ - - - - + +