diff --git a/.github/workflows/branches-windows.yml b/.github/workflows/branches-windows.yml index 430b2121..fdf2b81c 100644 --- a/.github/workflows/branches-windows.yml +++ b/.github/workflows/branches-windows.yml @@ -1,5 +1,5 @@ name: Branch workflow -on: +on: push: branches-ignore: - '**' diff --git a/.gitignore b/.gitignore index f173174b..be040ad5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .idea/ .vs/ +.vscode/ bin/ obj/ *.user diff --git a/src/GraphQL.Client.Abstractions.Websocket/IGraphQLWebsocketJsonSerializer.cs b/src/GraphQL.Client.Abstractions.Websocket/IGraphQLWebsocketJsonSerializer.cs index e2f445f5..376f45b6 100644 --- a/src/GraphQL.Client.Abstractions.Websocket/IGraphQLWebsocketJsonSerializer.cs +++ b/src/GraphQL.Client.Abstractions.Websocket/IGraphQLWebsocketJsonSerializer.cs @@ -10,7 +10,7 @@ namespace GraphQL.Client.Abstractions.Websocket public interface IGraphQLWebsocketJsonSerializer: IGraphQLJsonSerializer { byte[] SerializeToBytes(GraphQLWebSocketRequest request); - Task DeserializeToWebsocketResponseWrapperAsync(Stream stream); + Task DeserializeToWebsocketResponseWrapperAsync(Stream stream); GraphQLWebSocketResponse> DeserializeToWebsocketResponse(byte[] bytes); } diff --git a/src/GraphQL.Client.Abstractions.Websocket/WebsocketResponseWrapper.cs b/src/GraphQL.Client.Abstractions.Websocket/WebsocketMessageWrapper.cs similarity index 69% rename from src/GraphQL.Client.Abstractions.Websocket/WebsocketResponseWrapper.cs rename to src/GraphQL.Client.Abstractions.Websocket/WebsocketMessageWrapper.cs index 5e90a38f..2a27677e 100644 --- a/src/GraphQL.Client.Abstractions.Websocket/WebsocketResponseWrapper.cs +++ b/src/GraphQL.Client.Abstractions.Websocket/WebsocketMessageWrapper.cs @@ -1,7 +1,7 @@ using System.Runtime.Serialization; namespace GraphQL.Client.Abstractions.Websocket { - public class WebsocketResponseWrapper : GraphQLWebSocketResponse { + public class WebsocketMessageWrapper : GraphQLWebSocketResponse { [IgnoreDataMember] public byte[] MessageBytes { get; set; } diff --git a/src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs b/src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs index d73e2cc6..6624020c 100644 --- a/src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs +++ b/src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs @@ -76,11 +76,11 @@ public IObservable> CreateSubscriptionStream> ExecuteQueryAsync(GraphQLRequest request, CancellationToken cancellationToken) { - var executionResult = await ExecuteAsync(request, cancellationToken).ConfigureAwait(false); - return await ExecutionResultToGraphQLResponse(executionResult, cancellationToken).ConfigureAwait(false); + var executionResult = await ExecuteAsync(request, cancellationToken); + return await ExecutionResultToGraphQLResponse(executionResult, cancellationToken); } private async Task>> ExecuteSubscriptionAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { - var result = await ExecuteAsync(request, cancellationToken).ConfigureAwait(false); + var result = await ExecuteAsync(request, cancellationToken); return ((SubscriptionExecutionResult)result).Streams?.Values.SingleOrDefault()? .SelectMany(executionResult => Observable.FromAsync(token => ExecutionResultToGraphQLResponse(executionResult, token))); } @@ -100,7 +100,7 @@ private async Task ExecuteAsync(GraphQLRequest request, Cancell options.Query = request.Query; options.Inputs = inputs; options.CancellationToken = cancellationToken; - }).ConfigureAwait(false); + }); return result; } diff --git a/src/GraphQL.Client.Serializer.Newtonsoft/NewtonsoftJsonSerializer.cs b/src/GraphQL.Client.Serializer.Newtonsoft/NewtonsoftJsonSerializer.cs index a40ad456..7194a547 100644 --- a/src/GraphQL.Client.Serializer.Newtonsoft/NewtonsoftJsonSerializer.cs +++ b/src/GraphQL.Client.Serializer.Newtonsoft/NewtonsoftJsonSerializer.cs @@ -42,8 +42,8 @@ public byte[] SerializeToBytes(Abstractions.Websocket.GraphQLWebSocketRequest re return Encoding.UTF8.GetBytes(json); } - public Task DeserializeToWebsocketResponseWrapperAsync(Stream stream) { - return DeserializeFromUtf8Stream(stream); + public Task DeserializeToWebsocketResponseWrapperAsync(Stream stream) { + return DeserializeFromUtf8Stream(stream); } public GraphQLWebSocketResponse> DeserializeToWebsocketResponse(byte[] bytes) { diff --git a/src/GraphQL.Client.Serializer.SystemTextJson/SystemTextJsonSerializer.cs b/src/GraphQL.Client.Serializer.SystemTextJson/SystemTextJsonSerializer.cs index 420a8522..62be2e37 100644 --- a/src/GraphQL.Client.Serializer.SystemTextJson/SystemTextJsonSerializer.cs +++ b/src/GraphQL.Client.Serializer.SystemTextJson/SystemTextJsonSerializer.cs @@ -45,8 +45,8 @@ public byte[] SerializeToBytes(Abstractions.Websocket.GraphQLWebSocketRequest re return JsonSerializer.SerializeToUtf8Bytes(new GraphQLWebSocketRequest(request), Options); } - public Task DeserializeToWebsocketResponseWrapperAsync(Stream stream) { - return JsonSerializer.DeserializeAsync(stream, Options).AsTask(); + public Task DeserializeToWebsocketResponseWrapperAsync(Stream stream) { + return JsonSerializer.DeserializeAsync(stream, Options).AsTask(); } public GraphQLWebSocketResponse> DeserializeToWebsocketResponse(byte[] bytes) { diff --git a/src/GraphQL.Client/GraphQLHttpClient.cs b/src/GraphQL.Client/GraphQLHttpClient.cs index eb25ebc0..30e85fd0 100644 --- a/src/GraphQL.Client/GraphQLHttpClient.cs +++ b/src/GraphQL.Client/GraphQLHttpClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Diagnostics; using System.Net.Http; using System.Text; using System.Threading; @@ -65,7 +66,7 @@ public GraphQLHttpClient(GraphQLHttpClientOptions options, HttpClient httpClient /// public Task> SendQueryAsync(GraphQLRequest request, CancellationToken cancellationToken = default) { return Options.UseWebSocketForQueriesAndMutations - ? this.graphQlHttpWebSocket.SendRequest(request, this, cancellationToken) + ? this.graphQlHttpWebSocket.SendRequest(request, cancellationToken) : this.SendHttpPostRequestAsync(request, cancellationToken); } @@ -84,7 +85,7 @@ public IObservable> CreateSubscriptionStream>)subscriptionStreams[key]; - var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, this, cancellationToken: cancellationTokenSource.Token); + var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request); subscriptionStreams.TryAdd(key, observable); return observable; @@ -100,7 +101,7 @@ public IObservable> CreateSubscriptionStream>)subscriptionStreams[key]; - var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, this, exceptionHandler, cancellationTokenSource.Token); + var observable = graphQlHttpWebSocket.CreateSubscriptionStream(request, exceptionHandler); subscriptionStreams.TryAdd(key, observable); return observable; } @@ -164,9 +165,10 @@ public void Dispose() { private void _dispose() { disposed = true; + Debug.WriteLine($"disposing GraphQLHttpClient on endpoint {Options.EndPoint}"); + cancellationTokenSource.Cancel(); this.HttpClient.Dispose(); this.graphQlHttpWebSocket.Dispose(); - cancellationTokenSource.Cancel(); cancellationTokenSource.Dispose(); } diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index 832fb4cf..ec3d5a18 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -3,6 +3,7 @@ using System.IO; using System.Net.Http; using System.Net.WebSockets; +using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; @@ -13,19 +14,29 @@ namespace GraphQL.Client.Http.Websocket { internal class GraphQLHttpWebSocket : IDisposable { + + #region Private fields + private readonly Uri webSocketUri; private readonly GraphQLHttpClient client; private readonly ArraySegment buffer; - private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + private readonly CancellationTokenSource internalCancellationTokenSource = new CancellationTokenSource(); + private readonly CancellationToken internalCancellationToken; private readonly Subject requestSubject = new Subject(); private readonly Subject exceptionSubject = new Subject(); private readonly BehaviorSubject stateSubject = new BehaviorSubject(GraphQLWebsocketConnectionState.Disconnected); private readonly IDisposable requestSubscription; + private readonly EventLoopScheduler receiveLoopScheduler = new EventLoopScheduler(); + private readonly EventLoopScheduler sendLoopScheduler = new EventLoopScheduler(); private int connectionAttempt = 0; - private Subject responseSubject; + private IConnectableObservable incomingMessages; + private IDisposable incomingMessagesConnection; private GraphQLHttpClientOptions Options => client.Options; + + private Task initializeWebSocketTask = Task.CompletedTask; + private readonly object initializeLock = new object(); #if NETFRAMEWORK private WebSocket clientWebSocket = null; @@ -33,44 +44,283 @@ internal class GraphQLHttpWebSocket : IDisposable { private ClientWebSocket clientWebSocket = null; #endif + #endregion + + + #region Public properties + /// + /// The current websocket state + /// public WebSocketState WebSocketState => clientWebSocket?.State ?? WebSocketState.None; + + /// + /// Publishes all errors which occur within the receive pipeline + /// public IObservable ReceiveErrors => exceptionSubject.AsObservable(); + + /// + /// Publishes the connection state of the + /// public IObservable ConnectionState => stateSubject.DistinctUntilChanged(); - public IObservable ResponseStream { get; } + /// + /// Publishes all messages which are received on the websocket + /// + public IObservable IncomingMessageStream { get; } + #endregion + + public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClient client) { + internalCancellationToken = internalCancellationTokenSource.Token; this.webSocketUri = webSocketUri; this.client = client; buffer = new ArraySegment(new byte[8192]); - ResponseStream = _createResponseStream(); + IncomingMessageStream = GetMessageStream(); + receiveLoopScheduler.Schedule(() => + Debug.WriteLine($"receive loop scheduler thread id: {Thread.CurrentThread.ManagedThreadId}")); - requestSubscription = requestSubject.Select(request => Observable.FromAsync(() => _sendWebSocketRequest(request))).Concat().Subscribe(); + requestSubscription = requestSubject + .ObserveOn(sendLoopScheduler) + .Subscribe(async request => await SendWebSocketRequest(request)); } #region Send requests - public Task SendWebSocketRequest(GraphQLWebSocketRequest request) { + /// + /// Create a new subscription stream + /// + /// the response type + /// the to start the subscription + /// Optional: exception handler for handling exceptions within the receive pipeline + /// a which represents the subscription + public IObservable> CreateSubscriptionStream(GraphQLRequest request, Action exceptionHandler = null) { + return Observable.Defer(() => + Observable.Create>(async observer => { + Debug.WriteLine($"Create observable thread id: {Thread.CurrentThread.ManagedThreadId}"); + await client.Options.PreprocessRequest(request, client); + var startRequest = new GraphQLWebSocketRequest { + Id = Guid.NewGuid().ToString("N"), + Type = GraphQLWebSocketMessageType.GQL_START, + Payload = request + }; + var closeRequest = new GraphQLWebSocketRequest { + Id = startRequest.Id, + Type = GraphQLWebSocketMessageType.GQL_STOP + }; + var initRequest = new GraphQLWebSocketRequest { + Id = startRequest.Id, + Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT, + }; + + var observable = Observable.Create>(o => + IncomingMessageStream + // ignore null values and messages for other requests + .Where(response => response != null && response.Id == startRequest.Id) + .Subscribe(response => { + // terminate the sequence when a 'complete' message is received + if (response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) { + Debug.WriteLine($"received 'complete' message on subscription {startRequest.Id}"); + o.OnCompleted(); + return; + } + + // post the GraphQLResponse to the stream (even if a GraphQL error occurred) + Debug.WriteLine($"received payload on subscription {startRequest.Id} (thread {Thread.CurrentThread.ManagedThreadId})"); + var typedResponse = + client.Options.JsonSerializer.DeserializeToWebsocketResponse( + response.MessageBytes); + o.OnNext(typedResponse.Payload); + + // in case of a GraphQL error, terminate the sequence after the response has been posted + if (response.Type == GraphQLWebSocketMessageType.GQL_ERROR) { + Debug.WriteLine($"terminating subscription {startRequest.Id} because of a GraphQL error"); + o.OnCompleted(); + } + }, + e => { + Debug.WriteLine($"response stream for subscription {startRequest.Id} failed: {e}"); + o.OnError(e); + }, + () => { + Debug.WriteLine($"response stream for subscription {startRequest.Id} completed"); + o.OnCompleted(); + }) + ); + + try { + // initialize websocket (completes immediately if socket is already open) + await InitializeWebSocket(); + } + catch (Exception e) { + // subscribe observer to failed observable + return Observable.Throw>(e).Subscribe(observer); + } + + var disposable = new CompositeDisposable( + observable.Subscribe(observer), + Disposable.Create(async () => { + // only try to send close request on open websocket + if (WebSocketState != WebSocketState.Open) return; + + try { + Debug.WriteLine($"sending close message on subscription {startRequest.Id}"); + await QueueWebSocketRequest(closeRequest); + } + // do not break on disposing + catch (OperationCanceledException) { } + }) + ); + + // send connection init + Debug.WriteLine($"sending connection init on subscription {startRequest.Id}"); + try { + await QueueWebSocketRequest(initRequest); + } + catch (Exception e) { + Console.WriteLine(e); + throw; + } + + Debug.WriteLine($"sending initial message on subscription {startRequest.Id}"); + // send subscription request + try { + await QueueWebSocketRequest(startRequest); + } + catch (Exception e) { + Console.WriteLine(e); + throw; + } + + return disposable; + })) + // complete sequence on OperationCanceledException, this is triggered by the cancellation token + .Catch, OperationCanceledException>(exception => + Observable.Empty>()) + // wrap results + .Select(response => new Tuple, Exception>(response, null)) + // do exception handling + .Catch, Exception>, Exception>(e => { + try { + if (exceptionHandler == null) { + // if the external handler is not set, propagate all exceptions except WebSocketExceptions + // this will ensure that the client tries to re-establish subscriptions on connection loss + if (!(e is WebSocketException)) throw e; + } + else { + // exceptions thrown by the handler will propagate to OnError() + exceptionHandler?.Invoke(e); + } + + // throw exception on the observable to be caught by Retry() or complete sequence if cancellation was requested + if (internalCancellationToken.IsCancellationRequested) + return Observable.Empty, Exception>>(); + else { + Debug.WriteLine($"Catch handler thread id: {Thread.CurrentThread.ManagedThreadId}"); + return Observable.Throw, Exception>>(e); + } + } + catch (Exception exception) { + // wrap all other exceptions to be propagated behind retry + return Observable.Return(new Tuple, Exception>(null, exception)); + } + }) + // attempt to recreate the websocket for rethrown exceptions + .Retry() + // unwrap and push results or throw wrapped exceptions + .SelectMany(t => { + Debug.WriteLine($"unwrap exception thread id: {Thread.CurrentThread.ManagedThreadId}"); + // if the result contains an exception, throw it on the observable + if (t.Item2 != null) + return Observable.Throw>(t.Item2); + + return t.Item1 == null + ? Observable.Empty>() + : Observable.Return(t.Item1); + }) + // transform to hot observable and auto-connect + .Publish().RefCount(); + } + + /// + /// Send a regular GraphQL request (query, mutation) via websocket + /// + /// the response type + /// the to send + /// the token to cancel the request + /// + public Task> SendRequest(GraphQLRequest request, CancellationToken cancellationToken = default) { + return Observable.Create>(async observer => { + await client.Options.PreprocessRequest(request, client); + var websocketRequest = new GraphQLWebSocketRequest { + Id = Guid.NewGuid().ToString("N"), + Type = GraphQLWebSocketMessageType.GQL_START, + Payload = request + }; + var observable = IncomingMessageStream + .Where(response => response != null && response.Id == websocketRequest.Id) + .TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) + .Select(response => { + Debug.WriteLine($"received response for request {websocketRequest.Id}"); + var typedResponse = + client.Options.JsonSerializer.DeserializeToWebsocketResponse( + response.MessageBytes); + return typedResponse.Payload; + }); + + try { + // initialize websocket (completes immediately if socket is already open) + await InitializeWebSocket(); + } + catch (Exception e) { + // subscribe observer to failed observable + return Observable.Throw>(e).Subscribe(observer); + } + + var disposable = new CompositeDisposable( + observable.Subscribe(observer) + ); + + Debug.WriteLine($"submitting request {websocketRequest.Id}"); + // send request + try { + await QueueWebSocketRequest(websocketRequest); + } + catch (Exception e) { + Console.WriteLine(e); + throw; + } + + return disposable; + }) + // complete sequence on OperationCanceledException, this is triggered by the cancellation token + .Catch, OperationCanceledException>(exception => + Observable.Empty>()) + .FirstAsync() + .ToTask(cancellationToken); + } + + private Task QueueWebSocketRequest(GraphQLWebSocketRequest request) { requestSubject.OnNext(request); return request.SendTask(); } - private async Task _sendWebSocketRequest(GraphQLWebSocketRequest request) { + private async Task SendWebSocketRequest(GraphQLWebSocketRequest request) { try { - if (cancellationTokenSource.Token.IsCancellationRequested) { + if (internalCancellationToken.IsCancellationRequested) { request.SendCanceled(); return; } - await InitializeWebSocket().ConfigureAwait(false); + await InitializeWebSocket(); var requestBytes = Options.JsonSerializer.SerializeToBytes(request); await this.clientWebSocket.SendAsync( new ArraySegment(requestBytes), WebSocketMessageType.Text, true, - cancellationTokenSource.Token).ConfigureAwait(false); + internalCancellationToken); request.SendCompleted(); } catch (Exception e) { @@ -79,9 +329,6 @@ await this.clientWebSocket.SendAsync( } #endregion - - private Task initializeWebSocketTask = Task.CompletedTask; - private readonly object initializeLock = new object(); public Task InitializeWebSocket() { // do not attempt to initialize if cancellation is requested @@ -126,21 +373,49 @@ public Task InitializeWebSocket() { clientWebSocket.Options.ClientCertificates = ((HttpClientHandler)Options.HttpMessageHandler).ClientCertificates; clientWebSocket.Options.UseDefaultCredentials = ((HttpClientHandler)Options.HttpMessageHandler).UseDefaultCredentials; #endif - return initializeWebSocketTask = _connectAsync(cancellationTokenSource.Token); + return initializeWebSocketTask = ConnectAsync(internalCancellationToken); } } - private async Task _connectAsync(CancellationToken token) { + private async Task ConnectAsync(CancellationToken token) { try { - await _backOff().ConfigureAwait(false); + await BackOff(); stateSubject.OnNext(GraphQLWebsocketConnectionState.Connecting); - Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()}"); - await clientWebSocket.ConnectAsync(webSocketUri, token).ConfigureAwait(false); + Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()} (thread {Thread.CurrentThread.ManagedThreadId})"); + await clientWebSocket.ConnectAsync(webSocketUri, token); stateSubject.OnNext(GraphQLWebsocketConnectionState.Connected); Debug.WriteLine($"connection established on websocket {clientWebSocket.GetHashCode()}, invoking Options.OnWebsocketConnected()"); await (Options.OnWebsocketConnected?.Invoke(client) ?? Task.CompletedTask); Debug.WriteLine($"invoking Options.OnWebsocketConnected() on websocket {clientWebSocket.GetHashCode()}"); connectionAttempt = 1; + + // create receiving observable + incomingMessages = Observable + .Defer(() => GetReceiveTask().ToObservable().ObserveOn(receiveLoopScheduler)) + .Repeat() + // complete sequence on OperationCanceledException, this is triggered by the cancellation token on disposal + .Catch(exception => Observable.Empty()) + .Publish(); + + // subscribe maintenance + var maintenanceSubscription = incomingMessages.Subscribe(_ => { }, ex => { + Debug.WriteLine($"incoming message stream {incomingMessages.GetHashCode()} received an error: {ex}"); + exceptionSubject.OnNext(ex); + incomingMessagesConnection?.Dispose(); + stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); + }, + () => { + Debug.WriteLine($"incoming message stream {incomingMessages.GetHashCode()} completed"); + incomingMessagesConnection?.Dispose(); + stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); + }); + + + // connect observable + var connection = incomingMessages.Connect(); + Debug.WriteLine($"new incoming message stream {incomingMessages.GetHashCode()} created"); + + incomingMessagesConnection = new CompositeDisposable(maintenanceSubscription, connection); } catch (Exception e) { stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); @@ -153,70 +428,50 @@ private async Task _connectAsync(CancellationToken token) { /// delay the next connection attempt using /// /// - private Task _backOff() { + private Task BackOff() { connectionAttempt++; if (connectionAttempt == 1) return Task.CompletedTask; var delay = Options.BackOffStrategy?.Invoke(connectionAttempt - 1) ?? TimeSpan.FromSeconds(5); Debug.WriteLine($"connection attempt #{connectionAttempt}, backing off for {delay.TotalSeconds} s"); - return Task.Delay(delay); + return Task.Delay(delay, internalCancellationToken); } - - private IObservable _createResponseStream() { - return Observable.Create(_createResultStream) - // complete sequence on OperationCanceledException, this is triggered by the cancellation token on disposal - .Catch(exception => - Observable.Empty()); + private IObservable GetMessageStream() { + return Observable.Using(() => new EventLoopScheduler(), scheduler => + Observable.Create(async observer => { + // make sure the websocket ist connected + await InitializeWebSocket(); + // subscribe observer to message stream + var subscription = new CompositeDisposable(incomingMessages.ObserveOn(scheduler).Subscribe(observer)); + // register the observer's OnCompleted method with the cancellation token to complete the sequence on disposal + subscription.Add(internalCancellationTokenSource.Token.Register(observer.OnCompleted)); + + // add some debug output + var hashCode = subscription.GetHashCode(); + subscription.Add(Disposable.Create(() => { + Debug.WriteLine($"incoming message subscription {hashCode} disposed"); + })); + Debug.WriteLine($"new incoming message subscription {hashCode} created"); + + return subscription; + })); } - private async Task _createResultStream(IObserver observer, CancellationToken token) { - if (responseSubject == null || responseSubject.IsDisposed) { - // create new response subject - responseSubject = new Subject(); - - // initialize and connect websocket - await InitializeWebSocket().ConfigureAwait(false); - - // loop the receive task and subscribe the created subject to the results - Observable.Defer(() => _getReceiveTask().ToObservable()).Repeat().Subscribe(responseSubject); - - // dispose the subject on any error or completion (will be recreated) - responseSubject.Subscribe(_ => { }, ex => { - exceptionSubject.OnNext(ex); - responseSubject?.Dispose(); - responseSubject = null; - stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); - }, - () => { - responseSubject?.Dispose(); - responseSubject = null; - stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); - }); - } - - return new CompositeDisposable - ( - responseSubject.Subscribe(observer), - Disposable.Create(() => { - Debug.WriteLine("response stream disposed"); - }) - ); - } - - private Task receiveAsyncTask = null; + private Task receiveAsyncTask = null; private readonly object receiveTaskLocker = new object(); /// /// wrapper method to pick up the existing request task if already running /// /// - private Task _getReceiveTask() { + private Task GetReceiveTask() { lock (receiveTaskLocker) { + internalCancellationToken.ThrowIfCancellationRequested(); if (receiveAsyncTask == null || receiveAsyncTask.IsFaulted || receiveAsyncTask.IsCompleted) - receiveAsyncTask = _receiveResultAsync(); + receiveAsyncTask = ReceiveWebsocketMessagesAsync(); } return receiveAsyncTask; @@ -226,30 +481,32 @@ private Task _getReceiveTask() { /// read a single message from the websocket /// /// - private async Task _receiveResultAsync() { + private async Task ReceiveWebsocketMessagesAsync() { + internalCancellationToken.ThrowIfCancellationRequested(); + try { - Debug.WriteLine($"receiving data on websocket {clientWebSocket.GetHashCode()} ..."); - - using (var ms = new MemoryStream()) { - WebSocketReceiveResult webSocketReceiveResult = null; - do { - cancellationTokenSource.Token.ThrowIfCancellationRequested(); - webSocketReceiveResult = await clientWebSocket.ReceiveAsync(buffer, CancellationToken.None); - ms.Write(buffer.Array, buffer.Offset, webSocketReceiveResult.Count); - } - while (!webSocketReceiveResult.EndOfMessage); + Debug.WriteLine($"waiting for data on websocket {clientWebSocket.GetHashCode()} (thread {Thread.CurrentThread.ManagedThreadId})..."); + + using var ms = new MemoryStream(); + WebSocketReceiveResult webSocketReceiveResult = null; + do { + // cancellation is done implicitly via the close method + webSocketReceiveResult = await clientWebSocket.ReceiveAsync(buffer, CancellationToken.None); + ms.Write(buffer.Array, buffer.Offset, webSocketReceiveResult.Count); + } + while (!webSocketReceiveResult.EndOfMessage && !internalCancellationToken.IsCancellationRequested); - cancellationTokenSource.Token.ThrowIfCancellationRequested(); - ms.Seek(0, SeekOrigin.Begin); + internalCancellationToken.ThrowIfCancellationRequested(); + ms.Seek(0, SeekOrigin.Begin); - if (webSocketReceiveResult.MessageType == WebSocketMessageType.Text) { - var response = await Options.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms); - response.MessageBytes = ms.ToArray(); - return response; - } - else { - throw new NotSupportedException("binary websocket messages are not supported"); - } + if (webSocketReceiveResult.MessageType == WebSocketMessageType.Text) { + var response = await Options.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms); + response.MessageBytes = ms.ToArray(); + Debug.WriteLine($"{response.MessageBytes.Length} bytes received for id {response.Id} on websocket {clientWebSocket.GetHashCode()} (thread {Thread.CurrentThread.ManagedThreadId})..."); + return response; + } + else { + throw new NotSupportedException("binary websocket messages are not supported"); } } catch (Exception e) { @@ -258,7 +515,7 @@ private async Task _receiveResultAsync() { } } - private async Task _closeAsync(CancellationToken cancellationToken = default) { + private async Task CloseAsync() { if (clientWebSocket == null) return; @@ -271,7 +528,7 @@ private async Task _closeAsync(CancellationToken cancellationToken = default) { } Debug.WriteLine($"closing websocket {clientWebSocket.GetHashCode()}"); - await this.clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", cancellationToken).ConfigureAwait(false); + await this.clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); } @@ -296,12 +553,25 @@ public void Complete() { private readonly object completedLocker = new object(); private async Task CompleteAsync() { Debug.WriteLine($"disposing websocket {clientWebSocket.GetHashCode()}..."); - if (!cancellationTokenSource.IsCancellationRequested) - cancellationTokenSource.Cancel(); - await _closeAsync().ConfigureAwait(false); + incomingMessagesConnection?.Dispose(); + + if (!internalCancellationTokenSource.IsCancellationRequested) + internalCancellationTokenSource.Cancel(); + + await CloseAsync(); requestSubscription?.Dispose(); clientWebSocket?.Dispose(); - cancellationTokenSource.Dispose(); + + stateSubject?.OnCompleted(); + stateSubject?.Dispose(); + + exceptionSubject?.OnCompleted(); + exceptionSubject?.Dispose(); + internalCancellationTokenSource.Dispose(); + + sendLoopScheduler?.Dispose(); + receiveLoopScheduler?.Dispose(); + Debug.WriteLine($"websocket {clientWebSocket.GetHashCode()} disposed"); } #endregion diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebsocketHelpers.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebsocketHelpers.cs deleted file mode 100644 index 9148a727..00000000 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebsocketHelpers.cs +++ /dev/null @@ -1,210 +0,0 @@ -using System; -using System.Diagnostics; -using System.Net.WebSockets; -using System.Reactive.Disposables; -using System.Reactive.Linq; -using System.Reactive.Threading.Tasks; -using System.Threading; -using System.Threading.Tasks; -using GraphQL.Client.Abstractions.Websocket; - -namespace GraphQL.Client.Http.Websocket { - public static class GraphQLHttpWebsocketHelpers { - internal static IObservable> CreateSubscriptionStream( - this GraphQLHttpWebSocket graphQlHttpWebSocket, - GraphQLRequest request, - GraphQLHttpClient client, - Action exceptionHandler = null, - CancellationToken cancellationToken = default) { - return Observable.Defer(() => - Observable.Create>(async observer => { - await client.Options.PreprocessRequest(request, client); - var startRequest = new GraphQLWebSocketRequest { - Id = Guid.NewGuid().ToString("N"), - Type = GraphQLWebSocketMessageType.GQL_START, - Payload = request - }; - var closeRequest = new GraphQLWebSocketRequest { - Id = startRequest.Id, - Type = GraphQLWebSocketMessageType.GQL_STOP - }; - var initRequest = new GraphQLWebSocketRequest { - Id = startRequest.Id, - Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT, - }; - - var observable = Observable.Create>(o => - graphQlHttpWebSocket.ResponseStream - // ignore null values and messages for other requests - .Where(response => response != null && response.Id == startRequest.Id) - .Subscribe(response => { - // terminate the sequence when a 'complete' message is received - if (response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) { - Debug.WriteLine($"received 'complete' message on subscription {startRequest.Id}"); - o.OnCompleted(); - return; - } - - // post the GraphQLResponse to the stream (even if a GraphQL error occurred) - Debug.WriteLine($"received payload on subscription {startRequest.Id}"); - var typedResponse = - client.Options.JsonSerializer.DeserializeToWebsocketResponse( - response.MessageBytes); - o.OnNext(typedResponse.Payload); - - // in case of a GraphQL error, terminate the sequence after the response has been posted - if (response.Type == GraphQLWebSocketMessageType.GQL_ERROR) { - Debug.WriteLine($"terminating subscription {startRequest.Id} because of a GraphQL error"); - o.OnCompleted(); - } - }, - o.OnError, - o.OnCompleted) - ); - - try { - // initialize websocket (completes immediately if socket is already open) - await graphQlHttpWebSocket.InitializeWebSocket().ConfigureAwait(false); - } - catch (Exception e) { - // subscribe observer to failed observable - return Observable.Throw>(e).Subscribe(observer); - } - - var disposable = new CompositeDisposable( - observable.Subscribe(observer), - Disposable.Create(async () => { - // only try to send close request on open websocket - if (graphQlHttpWebSocket.WebSocketState != WebSocketState.Open) return; - - try { - Debug.WriteLine($"sending close message on subscription {startRequest.Id}"); - await graphQlHttpWebSocket.SendWebSocketRequest(closeRequest).ConfigureAwait(false); - } - // do not break on disposing - catch (OperationCanceledException) { } - }) - ); - - // send connection init - Debug.WriteLine($"sending connection init on subscription {startRequest.Id}"); - try { - await graphQlHttpWebSocket.SendWebSocketRequest(initRequest).ConfigureAwait(false); - } - catch (Exception e) { - Console.WriteLine(e); - throw; - } - - Debug.WriteLine($"sending initial message on subscription {startRequest.Id}"); - // send subscription request - try { - await graphQlHttpWebSocket.SendWebSocketRequest(startRequest).ConfigureAwait(false); - } - catch (Exception e) { - Console.WriteLine(e); - throw; - } - - return disposable; - })) - // complete sequence on OperationCanceledException, this is triggered by the cancellation token - .Catch, OperationCanceledException>(exception => - Observable.Empty>()) - // wrap results - .Select(response => new Tuple, Exception>(response, null)) - // do exception handling - .Catch, Exception>, Exception>(e => { - try { - if (exceptionHandler == null) { - // if the external handler is not set, propagate all exceptions except WebSocketExceptions - // this will ensure that the client tries to re-establish subscriptions on connection loss - if (!(e is WebSocketException)) throw e; - } - else { - // exceptions thrown by the handler will propagate to OnError() - exceptionHandler?.Invoke(e); - } - - // throw exception on the observable to be caught by Retry() or complete sequence if cancellation was requested - return cancellationToken.IsCancellationRequested - ? Observable.Empty, Exception>>() - : Observable.Throw, Exception>>(e); - } - catch (Exception exception) { - // wrap all other exceptions to be propagated behind retry - return Observable.Return(new Tuple, Exception>(null, exception)); - } - }) - // attempt to recreate the websocket for rethrown exceptions - .Retry() - // unwrap and push results or throw wrapped exceptions - .SelectMany(t => { - // if the result contains an exception, throw it on the observable - if (t.Item2 != null) - return Observable.Throw>(t.Item2); - - return t.Item1 == null - ? Observable.Empty>() - : Observable.Return(t.Item1); - }) - // transform to hot observable and auto-connect - .Publish().RefCount(); - } - - internal static Task> SendRequest( - this GraphQLHttpWebSocket graphQlHttpWebSocket, - GraphQLRequest request, - GraphQLHttpClient client, - CancellationToken cancellationToken = default) { - return Observable.Create>(async observer => { - await client.Options.PreprocessRequest(request, client); - var websocketRequest = new GraphQLWebSocketRequest { - Id = Guid.NewGuid().ToString("N"), - Type = GraphQLWebSocketMessageType.GQL_START, - Payload = request - }; - var observable = graphQlHttpWebSocket.ResponseStream - .Where(response => response != null && response.Id == websocketRequest.Id) - .TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE) - .Select(response => { - Debug.WriteLine($"received response for request {websocketRequest.Id}"); - var typedResponse = - client.Options.JsonSerializer.DeserializeToWebsocketResponse( - response.MessageBytes); - return typedResponse.Payload; - }); - - try { - // intialize websocket (completes immediately if socket is already open) - await graphQlHttpWebSocket.InitializeWebSocket().ConfigureAwait(false); - } - catch (Exception e) { - // subscribe observer to failed observable - return Observable.Throw>(e).Subscribe(observer); - } - - var disposable = new CompositeDisposable( - observable.Subscribe(observer) - ); - - Debug.WriteLine($"submitting request {websocketRequest.Id}"); - // send request - try { - await graphQlHttpWebSocket.SendWebSocketRequest(websocketRequest).ConfigureAwait(false); - } - catch (Exception e) { - Console.WriteLine(e); - throw; - } - - return disposable; - }) - // complete sequence on OperationCanceledException, this is triggered by the cancellation token - .Catch, OperationCanceledException>(exception => - Observable.Empty>()) - .FirstOrDefaultAsync() - .ToTask(cancellationToken); - } - } -} diff --git a/tests/GraphQL.Client.Serializer.Tests/BaseSerializerTest.cs b/tests/GraphQL.Client.Serializer.Tests/BaseSerializerTest.cs index a07f38f1..579e756e 100644 --- a/tests/GraphQL.Client.Serializer.Tests/BaseSerializerTest.cs +++ b/tests/GraphQL.Client.Serializer.Tests/BaseSerializerTest.cs @@ -46,7 +46,7 @@ public async void CanDeserializeExtensions() { var response = await ChatClient.SendQueryAsync(new GraphQLRequest("query { extensionsTest }"), () => new { extensionsTest = "" }) - .ConfigureAwait(false); + ; response.Errors.Should().NotBeNull(); response.Errors.Should().ContainSingle(); @@ -76,7 +76,7 @@ query Droid($id: String!) { "Human"); var response = await StarWarsClient.SendQueryAsync(graphQLRequest, () => new { Human = new { Name = string.Empty } }) - .ConfigureAwait(false); + ; Assert.Null(response.Errors); Assert.Equal(name, response.Data.Human.Name); @@ -85,7 +85,7 @@ query Droid($id: String!) { [Fact] public async void CanDoSerializationWithPredefinedTypes() { const string message = "some random testing message"; - var response = await ChatClient.AddMessageAsync(message).ConfigureAwait(false); + var response = await ChatClient.AddMessageAsync(message); Assert.Equal(message, response.Data.AddMessage.Content); } diff --git a/tests/GraphQL.Client.Tests.Common/Chat/Schema/ChatQuery.cs b/tests/GraphQL.Client.Tests.Common/Chat/Schema/ChatQuery.cs index 641ca262..0816fd8c 100644 --- a/tests/GraphQL.Client.Tests.Common/Chat/Schema/ChatQuery.cs +++ b/tests/GraphQL.Client.Tests.Common/Chat/Schema/ChatQuery.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; +using System.Threading; using GraphQL.Types; namespace GraphQL.Client.Tests.Common.Chat.Schema { @@ -12,6 +12,12 @@ public class ChatQuery : ObjectGraphType { {"another extension", 4711} }; + // properties for unit testing + + public readonly ManualResetEventSlim LongRunningQueryBlocker = new ManualResetEventSlim(); + public readonly ManualResetEventSlim WaitingOnQueryBlocker = new ManualResetEventSlim(); + + public ChatQuery(IChat chat) { Name = "ChatQuery"; @@ -26,8 +32,10 @@ public ChatQuery(IChat chat) { Field() .Name("longRunning") - .ResolveAsync(async context => { - await Task.Delay(TimeSpan.FromSeconds(5)); + .Resolve(context => { + WaitingOnQueryBlocker.Set(); + LongRunningQueryBlocker.Wait(); + WaitingOnQueryBlocker.Reset(); return "finally returned"; }); } diff --git a/tests/GraphQL.Client.Tests.Common/Chat/Schema/IChat.cs b/tests/GraphQL.Client.Tests.Common/Chat/Schema/IChat.cs index 2e31e8b4..9ebfbd1a 100644 --- a/tests/GraphQL.Client.Tests.Common/Chat/Schema/IChat.cs +++ b/tests/GraphQL.Client.Tests.Common/Chat/Schema/IChat.cs @@ -16,10 +16,10 @@ public interface IChat { Message AddMessage(ReceivedMessage message); } - + public class Chat : IChat { - private readonly ISubject _messageStream = new ReplaySubject(1); - private readonly ISubject _userJoined = new Subject(); + private readonly ISubject messageStream = new ReplaySubject(1); + private readonly ISubject userJoined = new Subject(); public Chat() { AllMessages = new ConcurrentStack(); @@ -29,9 +29,9 @@ public Chat() { }; } - public ConcurrentDictionary Users { get; set; } + public ConcurrentDictionary Users { get; private set; } - public ConcurrentStack AllMessages { get; } + public ConcurrentStack AllMessages { get; private set; } public Message AddMessage(ReceivedMessage message) { if (!Users.TryGetValue(message.FromId, out var displayName)) { @@ -50,7 +50,7 @@ public Message AddMessage(ReceivedMessage message) { public Message AddMessage(Message message) { AllMessages.Push(message); - _messageStream.OnNext(message); + messageStream.OnNext(message); return message; } @@ -64,12 +64,12 @@ public MessageFrom Join(string userId) { DisplayName = displayName }; - _userJoined.OnNext(joinedUser); + userJoined.OnNext(joinedUser); return joinedUser; } public IObservable Messages(string user) { - return _messageStream + return messageStream .Select(message => { message.Sub = user; return message; @@ -78,11 +78,11 @@ public IObservable Messages(string user) { } public void AddError(Exception exception) { - _messageStream.OnError(exception); + messageStream.OnError(exception); } public IObservable UserJoined() { - return _userJoined.AsObservable(); + return userJoined.AsObservable(); } } diff --git a/tests/GraphQL.Client.Tests.Common/Common.cs b/tests/GraphQL.Client.Tests.Common/Common.cs index e912d907..05673f23 100644 --- a/tests/GraphQL.Client.Tests.Common/Common.cs +++ b/tests/GraphQL.Client.Tests.Common/Common.cs @@ -5,9 +5,11 @@ namespace GraphQL.Client.Tests.Common { - public static class Common - { - public static StarWarsSchema GetStarWarsSchema() { + public static class Common { + public const string StarWarsEndpoint = "/graphql/starwars"; + public const string ChatEndpoint = "/graphql/chat"; + + public static StarWarsSchema GetStarWarsSchema() { var services = new ServiceCollection(); services.AddTransient(provider => new FuncDependencyResolver(provider.GetService)); services.AddStarWarsSchema(); @@ -33,7 +35,9 @@ public static void AddStarWarsSchema(this IServiceCollection services) { } public static void AddChatSchema(this IServiceCollection services) { - services.AddSingleton(); + var chat = new Chat.Schema.Chat(); + services.AddSingleton(chat); + services.AddSingleton(chat); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/tests/GraphQL.Client.Tests.Common/Helpers/CallbackMonitor.cs b/tests/GraphQL.Client.Tests.Common/Helpers/CallbackMonitor.cs index 4b623ac6..2a27a259 100644 --- a/tests/GraphQL.Client.Tests.Common/Helpers/CallbackMonitor.cs +++ b/tests/GraphQL.Client.Tests.Common/Helpers/CallbackMonitor.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading; using FluentAssertions; using FluentAssertions.Execution; @@ -24,40 +25,10 @@ public class CallbackMonitor { public void Invoke(T param) { LastPayload = param; + Debug.WriteLine($"CallbackMonitor invoke handler thread id: {Thread.CurrentThread.ManagedThreadId}"); callbackInvoked.Set(); } - - /// - /// Asserts that a new update has been pushed to the within the configured since the last . - /// If supplied, the action is executed on the submitted payload. - /// - /// action to assert the contents of the payload - public void CallbackShouldHaveBeenInvoked(Action assertPayload = null, TimeSpan? timeout = null) { - try { - callbackInvoked.Wait(timeout ?? Timeout).Should().BeTrue("because the callback method should have been invoked (timeout: {0} s)", - (timeout ?? Timeout).TotalSeconds); - - assertPayload?.Invoke(LastPayload); - } - finally { - Reset(); - } - } - - /// - /// Asserts that no new update has been pushed within the given since the last - /// - /// the time in ms in which no new update must be pushed to the . defaults to 100 - public void CallbackShouldNotHaveBeenInvoked(TimeSpan? timeout = null) { - if (!timeout.HasValue) timeout = TimeSpan.FromMilliseconds(100); - try { - callbackInvoked.Wait(timeout.Value).Should().BeFalse("because the callback method should not have been invoked"); - } - finally { - Reset(); - } - } - + /// /// Resets the tester class. Should be called before triggering the potential update /// @@ -65,8 +36,7 @@ public void Reset() { LastPayload = default(T); callbackInvoked.Reset(); } - - + public CallbackAssertions Should() { return new CallbackAssertions(this); } @@ -82,7 +52,10 @@ public AndWhichConstraint, TPayload> HaveBeenInvoke string because = "", params object[] becauseArgs) { Execute.Assertion .BecauseOf(because, becauseArgs) - .Given(() => Subject.callbackInvoked.Wait(timeout)) + .Given(() => { + Debug.WriteLine($"HaveBeenInvokedWithPayload thread id: {Thread.CurrentThread.ManagedThreadId}"); + return Subject.callbackInvoked.Wait(timeout); + }) .ForCondition(isSet => isSet) .FailWith("Expected {context:callback} to be invoked{reason}, but did not receive a call within {0}", timeout); diff --git a/tests/GraphQL.Client.Tests.Common/Helpers/ConcurrentTaskWrapper.cs b/tests/GraphQL.Client.Tests.Common/Helpers/ConcurrentTaskWrapper.cs new file mode 100644 index 00000000..64ea0554 --- /dev/null +++ b/tests/GraphQL.Client.Tests.Common/Helpers/ConcurrentTaskWrapper.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading.Tasks; + +namespace GraphQL.Client.Tests.Common.Helpers { + + public class ConcurrentTaskWrapper { + public static ConcurrentTaskWrapper New(Func> createTask) { + return new ConcurrentTaskWrapper(createTask); + } + + private readonly Func createTask; + private Task internalTask = null; + + public ConcurrentTaskWrapper(Func createTask) { + this.createTask = createTask; + } + + public Task Invoke() { + if (internalTask != null) + return internalTask; + + return internalTask = createTask(); + } + } + + public class ConcurrentTaskWrapper { + private readonly Func> createTask; + private Task internalTask = null; + + public ConcurrentTaskWrapper(Func> createTask) { + this.createTask = createTask; + } + + public Task Invoke() { + if (internalTask != null) + return internalTask; + + return internalTask = createTask(); + } + + public void Start() { + if (internalTask == null) + internalTask = createTask(); + } + + public Func> Invoking() { + return Invoke; + } + + public void Clear() { + internalTask = null; + } + } +} diff --git a/tests/GraphQL.Client.Tests.Common/Helpers/ObservableTester.cs b/tests/GraphQL.Client.Tests.Common/Helpers/ObservableTester.cs index 43bcbe18..05ed7e38 100644 --- a/tests/GraphQL.Client.Tests.Common/Helpers/ObservableTester.cs +++ b/tests/GraphQL.Client.Tests.Common/Helpers/ObservableTester.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; @@ -12,11 +13,12 @@ public class ObservableTester : IDisposable { private readonly ManualResetEventSlim updateReceived = new ManualResetEventSlim(); private readonly ManualResetEventSlim completed = new ManualResetEventSlim(); private readonly ManualResetEventSlim error = new ManualResetEventSlim(); + private readonly EventLoopScheduler observeScheduler = new EventLoopScheduler(); /// /// The timeout for . Defaults to 1 s /// - public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(1); + public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(3); /// /// Indicates that an update has been received since the last @@ -34,17 +36,25 @@ public class ObservableTester : IDisposable { /// /// the under test public ObservableTester(IObservable observable) { - subscription = observable.ObserveOn(TaskPoolScheduler.Default).Subscribe( + + observeScheduler.Schedule(() => + Debug.WriteLine($"Observe scheduler thread id: {Thread.CurrentThread.ManagedThreadId}")); + + subscription = observable.ObserveOn(observeScheduler).Subscribe( obj => { + Debug.WriteLine($"observable tester {GetHashCode()}: payload received"); LastPayload = obj; updateReceived.Set(); }, ex => { + Debug.WriteLine($"observable tester {GetHashCode()} error received: {ex}"); Error = ex; error.Set(); }, - () => completed.Set() - ); + () => { + Debug.WriteLine($"observable tester {GetHashCode()}: completed"); + completed.Set(); + }); } /// @@ -57,6 +67,7 @@ private void Reset() { /// public void Dispose() { subscription?.Dispose(); + observeScheduler.Dispose(); } public SubscriptionAssertions Should() { @@ -74,7 +85,12 @@ public AndWhichConstraint, TPayload> HaveReceiv string because = "", params object[] becauseArgs) { Execute.Assertion .BecauseOf(because, becauseArgs) - .Given(() => Subject.updateReceived.Wait(timeout)) + .Given(() => { + var isSet = Subject.updateReceived.Wait(timeout); + if(!isSet) + Debug.WriteLine($"waiting for payload on thread {Thread.CurrentThread.ManagedThreadId} timed out!"); + return isSet; + }) .ForCondition(isSet => isSet) .FailWith("Expected {context:Subscription} to receive new payload{reason}, but did not receive an update within {0}", timeout); @@ -124,6 +140,19 @@ public AndConstraint> HaveCompleted(TimeSpan ti } public AndConstraint> HaveCompleted(string because = "", params object[] becauseArgs) => HaveCompleted(Subject.Timeout, because, becauseArgs); + + public AndConstraint> NotHaveCompleted(TimeSpan timeout, + string because = "", params object[] becauseArgs) { + Execute.Assertion + .BecauseOf(because, becauseArgs) + .Given(() => Subject.completed.Wait(timeout)) + .ForCondition(isSet => !isSet) + .FailWith("Expected {context:Subscription} not to complete within {0}{reason}, but it did!", timeout); + + return new AndConstraint>(this); + } + public AndConstraint> NotHaveCompleted(string because = "", params object[] becauseArgs) + => NotHaveCompleted(Subject.Timeout, because, becauseArgs); } } diff --git a/tests/GraphQL.Integration.Tests/GraphQL.Integration.Tests.csproj b/tests/GraphQL.Integration.Tests/GraphQL.Integration.Tests.csproj index 3273adae..e2e2d726 100644 --- a/tests/GraphQL.Integration.Tests/GraphQL.Integration.Tests.csproj +++ b/tests/GraphQL.Integration.Tests/GraphQL.Integration.Tests.csproj @@ -10,6 +10,7 @@ + diff --git a/tests/GraphQL.Integration.Tests/Helpers/IntegrationServerTestFixture.cs b/tests/GraphQL.Integration.Tests/Helpers/IntegrationServerTestFixture.cs new file mode 100644 index 00000000..e8344a13 --- /dev/null +++ b/tests/GraphQL.Integration.Tests/Helpers/IntegrationServerTestFixture.cs @@ -0,0 +1,56 @@ +using System; +using System.Threading.Tasks; +using GraphQL.Client.Abstractions.Websocket; +using GraphQL.Client.Http; +using GraphQL.Client.Serializer.Newtonsoft; +using GraphQL.Client.Serializer.SystemTextJson; +using GraphQL.Client.Tests.Common; +using GraphQL.Client.Tests.Common.Helpers; +using Microsoft.AspNetCore.Hosting; + +namespace GraphQL.Integration.Tests.Helpers { + public abstract class IntegrationServerTestFixture { + public int Port { get; private set; } + public IWebHost Server { get; private set; } + public abstract IGraphQLWebsocketJsonSerializer Serializer { get; } + + public IntegrationServerTestFixture() + { + Port = NetworkHelpers.GetFreeTcpPortNumber(); + } + + public async Task CreateServer() { + if (Server != null) return; + Server = await WebHostHelpers.CreateServer(Port); + } + + public async Task ShutdownServer() { + if (Server == null) + return; + + await Server.StopAsync(); + Server.Dispose(); + Server = null; + } + + public GraphQLHttpClient GetStarWarsClient(bool requestsViaWebsocket = false) + => GetGraphQLClient(Common.StarWarsEndpoint, requestsViaWebsocket); + + public GraphQLHttpClient GetChatClient(bool requestsViaWebsocket = false) + => GetGraphQLClient(Common.ChatEndpoint, requestsViaWebsocket); + + private GraphQLHttpClient GetGraphQLClient(string endpoint, bool requestsViaWebsocket = false) { + if(Serializer == null) + throw new InvalidOperationException("JSON serializer not configured"); + return WebHostHelpers.GetGraphQLClient(Port, endpoint, requestsViaWebsocket, Serializer); + } + } + + public class NewtonsoftIntegrationServerTestFixture: IntegrationServerTestFixture { + public override IGraphQLWebsocketJsonSerializer Serializer { get; } = new NewtonsoftJsonSerializer(); + } + + public class SystemTextJsonIntegrationServerTestFixture : IntegrationServerTestFixture { + public override IGraphQLWebsocketJsonSerializer Serializer { get; } = new SystemTextJsonSerializer(); + } +} diff --git a/tests/GraphQL.Integration.Tests/Helpers/WebHostHelpers.cs b/tests/GraphQL.Integration.Tests/Helpers/WebHostHelpers.cs index 0ee54a08..923c11d1 100644 --- a/tests/GraphQL.Integration.Tests/Helpers/WebHostHelpers.cs +++ b/tests/GraphQL.Integration.Tests/Helpers/WebHostHelpers.cs @@ -1,18 +1,23 @@ using System; -using GraphQL.Client; +using System.Reactive.Disposables; +using System.Threading.Tasks; using GraphQL.Client.Abstractions.Websocket; using GraphQL.Client.Http; using GraphQL.Client.Serializer.Newtonsoft; +using GraphQL.Client.Tests.Common; using GraphQL.Client.Tests.Common.Helpers; +using IntegrationTestServer; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace GraphQL.Integration.Tests.Helpers { public static class WebHostHelpers { - public static IWebHost CreateServer(int port) where TStartup : class + public static async Task CreateServer(int port) { var configBuilder = new ConfigurationBuilder(); configBuilder.AddInMemoryCollection(); @@ -25,41 +30,48 @@ public static IWebHost CreateServer(int port) where TStartup : class }) .UseConfiguration(config) .UseKestrel() - .UseStartup() + .UseStartup() .Build(); - host.Start(); - + var tcs = new TaskCompletionSource(); + host.Services.GetService().ApplicationStarted.Register(() => tcs.TrySetResult(true)); + await host.StartAsync(); + await tcs.Task; return host; } - - - public static GraphQLHttpClient GetGraphQLClient(int port, bool requestsViaWebsocket = false, IGraphQLWebsocketJsonSerializer serializer = null) + + public static GraphQLHttpClient GetGraphQLClient(int port, string endpoint, bool requestsViaWebsocket = false, IGraphQLWebsocketJsonSerializer serializer = null) => new GraphQLHttpClient(new GraphQLHttpClientOptions { - EndPoint = new Uri($"http://localhost:{port}/graphql"), + EndPoint = new Uri($"http://localhost:{port}{endpoint}"), UseWebSocketForQueriesAndMutations = requestsViaWebsocket, JsonSerializer = serializer ?? new NewtonsoftJsonSerializer() }); - - public static TestServerSetup SetupTest(bool requestsViaWebsocket = false, IGraphQLWebsocketJsonSerializer serializer = null) - where TStartup : class - { - var port = NetworkHelpers.GetFreeTcpPortNumber(); - return new TestServerSetup { - Server = CreateServer(port), - Client = GetGraphQLClient(port, requestsViaWebsocket, serializer) - }; - } } public class TestServerSetup : IDisposable { + public TestServerSetup(IGraphQLWebsocketJsonSerializer serializer) { + this.serializer = serializer; + Port = NetworkHelpers.GetFreeTcpPortNumber(); + } + + public int Port { get; } public IWebHost Server { get; set; } - public GraphQLHttpClient Client { get; set; } + public IGraphQLWebsocketJsonSerializer serializer { get; set; } + + public GraphQLHttpClient GetStarWarsClient(bool requestsViaWebsocket = false) + => GetGraphQLClient(Common.StarWarsEndpoint, requestsViaWebsocket); + + public GraphQLHttpClient GetChatClient(bool requestsViaWebsocket = false) + => GetGraphQLClient(Common.ChatEndpoint, requestsViaWebsocket); + + private GraphQLHttpClient GetGraphQLClient(string endpoint, bool requestsViaWebsocket = false) { + return WebHostHelpers.GetGraphQLClient(Port, endpoint, requestsViaWebsocket); + } + public void Dispose() { Server?.Dispose(); - Client?.Dispose(); } } } diff --git a/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Base.cs b/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Base.cs index 304e2982..e60c26f0 100644 --- a/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Base.cs +++ b/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Base.cs @@ -1,41 +1,49 @@ -using System; using System.Net.Http; using System.Threading; using System.Threading.Tasks; using FluentAssertions; +using FluentAssertions.Extensions; using GraphQL.Client.Abstractions; -using GraphQL.Client.Abstractions.Websocket; using GraphQL.Client.Http; +using GraphQL.Client.Tests.Common.Chat.Schema; using GraphQL.Client.Tests.Common.Helpers; using GraphQL.Client.Tests.Common.StarWars; using GraphQL.Integration.Tests.Helpers; -using IntegrationTestServer; +using Microsoft.Extensions.DependencyInjection; using Xunit; namespace GraphQL.Integration.Tests.QueryAndMutationTests { - public abstract class Base { + public abstract class Base: IAsyncLifetime { - protected IGraphQLWebsocketJsonSerializer serializer; + protected IntegrationServerTestFixture Fixture; + protected GraphQLHttpClient StarWarsClient; + protected GraphQLHttpClient ChatClient; - private TestServerSetup SetupTest(bool requestsViaWebsocket = false) => WebHostHelpers.SetupTest(requestsViaWebsocket, serializer); + protected Base(IntegrationServerTestFixture fixture) { + Fixture = fixture; + } + + public async Task InitializeAsync() { + await Fixture.CreateServer(); + StarWarsClient = Fixture.GetStarWarsClient(); + ChatClient = Fixture.GetChatClient(); + } - protected Base(IGraphQLWebsocketJsonSerializer serializer) { - this.serializer = serializer; + public Task DisposeAsync() { + ChatClient?.Dispose(); + StarWarsClient?.Dispose(); + return Task.CompletedTask; } [Theory] [ClassData(typeof(StarWarsHumans))] public async void QueryTheory(int id, string name) { var graphQLRequest = new GraphQLRequest($"{{ human(id: \"{id}\") {{ name }} }}"); + var response = await StarWarsClient.SendQueryAsync(graphQLRequest, () => new { Human = new { Name = string.Empty }}); - using (var setup = SetupTest()) { - var response = await setup.Client.SendQueryAsync(graphQLRequest, () => new { Human = new { Name = string.Empty }}) - .ConfigureAwait(false); - - Assert.Null(response.Errors); - Assert.Equal(name, response.Data.Human.Name); - } + Assert.Null(response.Errors); + Assert.Equal(name, response.Data.Human.Name); } [Theory] @@ -43,13 +51,10 @@ public async void QueryTheory(int id, string name) { public async void QueryWithDynamicReturnTypeTheory(int id, string name) { var graphQLRequest = new GraphQLRequest($"{{ human(id: \"{id}\") {{ name }} }}"); - using (var setup = SetupTest()) { - var response = await setup.Client.SendQueryAsync(graphQLRequest) - .ConfigureAwait(false); + var response = await StarWarsClient.SendQueryAsync(graphQLRequest); - Assert.Null(response.Errors); - Assert.Equal(name, response.Data.human.name.ToString()); - } + Assert.Null(response.Errors); + Assert.Equal(name, response.Data.human.name.ToString()); } [Theory] @@ -63,13 +68,10 @@ query Human($id: String!){ }", new {id = id.ToString()}); - using (var setup = SetupTest()) { - var response = await setup.Client.SendQueryAsync(graphQLRequest, () => new { Human = new { Name = string.Empty } }) - .ConfigureAwait(false); + var response = await StarWarsClient.SendQueryAsync(graphQLRequest, () => new { Human = new { Name = string.Empty } }); - Assert.Null(response.Errors); - Assert.Equal(name, response.Data.Human.Name); - } + Assert.Null(response.Errors); + Assert.Equal(name, response.Data.Human.Name); } [Theory] @@ -90,13 +92,10 @@ query Droid($id: String!) { new { id = id.ToString() }, "Human"); - using (var setup = SetupTest()) { - var response = await setup.Client.SendQueryAsync(graphQLRequest, () => new { Human = new { Name = string.Empty } }) - .ConfigureAwait(false); + var response = await StarWarsClient.SendQueryAsync(graphQLRequest, () => new { Human = new { Name = string.Empty } }); - Assert.Null(response.Errors); - Assert.Equal(name, response.Data.Human.Name); - } + Assert.Null(response.Errors); + Assert.Equal(name, response.Data.Human.Name); } [Fact] @@ -118,27 +117,23 @@ query Human($id: String!){ } }"); - using (var setup = SetupTest()) { - var mutationResponse = await setup.Client.SendMutationAsync(mutationRequest, () => new { - createHuman = new { - Id = "", - Name = "", - HomePlanet = "" - } - }) - .ConfigureAwait(false); - - Assert.Null(mutationResponse.Errors); - Assert.Equal("Han Solo", mutationResponse.Data.createHuman.Name); - Assert.Equal("Corellia", mutationResponse.Data.createHuman.HomePlanet); - - queryRequest.Variables = new {id = mutationResponse.Data.createHuman.Id}; - var queryResponse = await setup.Client.SendQueryAsync(queryRequest, () => new { Human = new { Name = string.Empty } }) - .ConfigureAwait(false); - - Assert.Null(queryResponse.Errors); - Assert.Equal("Han Solo", queryResponse.Data.Human.Name); - } + var mutationResponse = await StarWarsClient.SendMutationAsync(mutationRequest, () => new { + createHuman = new { + Id = "", + Name = "", + HomePlanet = "" + } + }); + + Assert.Null(mutationResponse.Errors); + Assert.Equal("Han Solo", mutationResponse.Data.createHuman.Name); + Assert.Equal("Corellia", mutationResponse.Data.createHuman.HomePlanet); + + queryRequest.Variables = new {id = mutationResponse.Data.createHuman.Id}; + var queryResponse = await StarWarsClient.SendQueryAsync(queryRequest, () => new { Human = new { Name = string.Empty } }); + + Assert.Null(queryResponse.Errors); + Assert.Equal("Han Solo", queryResponse.Data.Human.Name); } [Fact] @@ -148,16 +143,11 @@ public async void PreprocessHttpRequestMessageIsCalled() { PreprocessHttpRequestMessage = callbackTester.Invoke }; - using (var setup = SetupTest()) { - var defaultHeaders = setup.Client.HttpClient.DefaultRequestHeaders; - var response = await setup.Client.SendQueryAsync(graphQLRequest, () => new { Human = new { Name = string.Empty } }) - .ConfigureAwait(false); - callbackTester.CallbackShouldHaveBeenInvoked(message => { - Assert.Equal(defaultHeaders, message.Headers); - }); - Assert.Null(response.Errors); - Assert.Equal("Luke", response.Data.Human.Name); - } + var defaultHeaders = StarWarsClient.HttpClient.DefaultRequestHeaders; + var response = await StarWarsClient.SendQueryAsync(graphQLRequest, () => new { Human = new { Name = string.Empty } }); + callbackTester.Should().HaveBeenInvokedWithPayload().Which.Headers.Should().BeEquivalentTo(defaultHeaders); + Assert.Null(response.Errors); + Assert.Equal("Luke", response.Data.Human.Name); } [Fact] @@ -167,15 +157,35 @@ query Long { longRunning }"); - using (var setup = WebHostHelpers.SetupTest(false, serializer)) { - var cancellationTimeout = TimeSpan.FromSeconds(1); - var cts = new CancellationTokenSource(cancellationTimeout); - - Func requestTask = () => setup.Client.SendQueryAsync(graphQLRequest, () => new {longRunning = string.Empty}, cts.Token); - Action timeMeasurement = () => requestTask.Should().Throw(); - - timeMeasurement.ExecutionTime().Should().BeCloseTo(cancellationTimeout, TimeSpan.FromMilliseconds(50)); - } + var chatQuery = Fixture.Server.Services.GetService(); + var cts = new CancellationTokenSource(); + + var request = + ConcurrentTaskWrapper.New(() => ChatClient.SendQueryAsync(graphQLRequest, () => new { longRunning = string.Empty }, cts.Token)); + + // Test regular request + // start request + request.Start(); + // wait until the query has reached the server + chatQuery.WaitingOnQueryBlocker.Wait(1000).Should().BeTrue("because the request should have reached the server by then"); + // unblock the query + chatQuery.LongRunningQueryBlocker.Set(); + // check execution time + request.Invoke().Result.Data.longRunning.Should().Be("finally returned"); + + // reset stuff + chatQuery.LongRunningQueryBlocker.Reset(); + request.Clear(); + + // cancellation test + request.Start(); + chatQuery.WaitingOnQueryBlocker.Wait(1000).Should().BeTrue("because the request should have reached the server by then"); + cts.Cancel(); + request.Invoking().Should().Throw("because the request was cancelled"); + + // let the server finish its query + chatQuery.LongRunningQueryBlocker.Set(); } + } } diff --git a/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Newtonsoft.cs b/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Newtonsoft.cs index 1046ed6d..57f86f64 100644 --- a/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Newtonsoft.cs +++ b/tests/GraphQL.Integration.Tests/QueryAndMutationTests/Newtonsoft.cs @@ -1,8 +1,10 @@ using GraphQL.Client.Serializer.Newtonsoft; +using GraphQL.Integration.Tests.Helpers; +using Xunit; namespace GraphQL.Integration.Tests.QueryAndMutationTests { - public class Newtonsoft: Base { - public Newtonsoft() : base(new NewtonsoftJsonSerializer()) + public class Newtonsoft: Base, IClassFixture { + public Newtonsoft(NewtonsoftIntegrationServerTestFixture fixture) : base(fixture) { } } diff --git a/tests/GraphQL.Integration.Tests/QueryAndMutationTests/SystemTextJson.cs b/tests/GraphQL.Integration.Tests/QueryAndMutationTests/SystemTextJson.cs index dd725b4d..e74d5012 100644 --- a/tests/GraphQL.Integration.Tests/QueryAndMutationTests/SystemTextJson.cs +++ b/tests/GraphQL.Integration.Tests/QueryAndMutationTests/SystemTextJson.cs @@ -1,8 +1,10 @@ using GraphQL.Client.Serializer.SystemTextJson; +using GraphQL.Integration.Tests.Helpers; +using Xunit; namespace GraphQL.Integration.Tests.QueryAndMutationTests { - public class SystemTextJson: Base { - public SystemTextJson() : base(new SystemTextJsonSerializer()) + public class SystemTextJson: Base, IClassFixture { + public SystemTextJson(SystemTextJsonIntegrationServerTestFixture fixture) : base(fixture) { } } diff --git a/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs b/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs index 77045a92..59f57ceb 100644 --- a/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs +++ b/tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs @@ -5,79 +5,102 @@ using System.Threading; using System.Threading.Tasks; using FluentAssertions; +using FluentAssertions.Extensions; using GraphQL.Client.Abstractions; using GraphQL.Client.Abstractions.Websocket; +using GraphQL.Client.Http; using GraphQL.Client.Tests.Common.Chat; +using GraphQL.Client.Tests.Common.Chat.Schema; using GraphQL.Client.Tests.Common.Helpers; using GraphQL.Integration.Tests.Helpers; -using IntegrationTestServer; -using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; using Xunit; using Xunit.Abstractions; namespace GraphQL.Integration.Tests.WebsocketTests { - public abstract class Base { + public abstract class Base: IAsyncLifetime { protected readonly ITestOutputHelper Output; - protected readonly IGraphQLWebsocketJsonSerializer Serializer; - protected IWebHost CreateServer(int port) => WebHostHelpers.CreateServer(port); - - protected Base(ITestOutputHelper output, IGraphQLWebsocketJsonSerializer serializer) { + protected readonly IntegrationServerTestFixture Fixture; + protected GraphQLHttpClient ChatClient; + + protected Base(ITestOutputHelper output, IntegrationServerTestFixture fixture) { this.Output = output; - this.Serializer = serializer; + this.Fixture = fixture; } - - [Fact] - public async void AssertTestingHarness() { - var port = NetworkHelpers.GetFreeTcpPortNumber(); - using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); - const string message = "some random testing message"; - var response = await client.AddMessageAsync(message).ConfigureAwait(false); + protected static ReceivedMessage InitialMessage = new ReceivedMessage { + Content = "initial message", + SentAt = DateTime.Now, + FromId = "1" + }; + + public async Task InitializeAsync() { + await Fixture.CreateServer(); + // make sure the buffer always contains the same message + Fixture.Server.Services.GetService().AddMessage(InitialMessage); - Assert.Equal(message, response.Data.AddMessage.Content); + if (ChatClient == null) { + // then create the chat client + ChatClient = Fixture.GetChatClient(true); } } + public Task DisposeAsync() { + ChatClient?.Dispose(); + return Task.CompletedTask; + } + [Fact] public async void CanSendRequestViaWebsocket() { - var port = NetworkHelpers.GetFreeTcpPortNumber(); - using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, true, Serializer); - const string message = "some random testing message"; - var response = await client.AddMessageAsync(message).ConfigureAwait(false); - - Assert.Equal(message, response.Data.AddMessage.Content); - } + await ChatClient.InitializeWebsocketConnection(); + const string message = "some random testing message"; + var response = await ChatClient.AddMessageAsync(message); + response.Data.AddMessage.Content.Should().Be(message); } [Fact] - public void WebsocketRequestCanBeCancelled() { + public async void WebsocketRequestCanBeCancelled() { var graphQLRequest = new GraphQLRequest(@" query Long { longRunning }"); - using (var setup = WebHostHelpers.SetupTest(true, Serializer)) { - var cancellationTimeout = TimeSpan.FromSeconds(1); - var cts = new CancellationTokenSource(cancellationTimeout); - - Func requestTask = () => setup.Client.SendQueryAsync(graphQLRequest, () => new {longRunning = string.Empty}, cts.Token); - Action timeMeasurement = () => requestTask.Should().Throw(); - - timeMeasurement.ExecutionTime().Should().BeCloseTo(cancellationTimeout, TimeSpan.FromMilliseconds(50)); - } + var chatQuery = Fixture.Server.Services.GetService(); + var cts = new CancellationTokenSource(); + + await ChatClient.InitializeWebsocketConnection(); + var request = + ConcurrentTaskWrapper.New(() => ChatClient.SendQueryAsync(graphQLRequest, () => new { longRunning = string.Empty }, cts.Token)); + + // Test regular request + // start request + request.Start(); + // wait until the query has reached the server + chatQuery.WaitingOnQueryBlocker.Wait(1000).Should().BeTrue("because the request should have reached the server by then"); + // unblock the query + chatQuery.LongRunningQueryBlocker.Set(); + // check execution time + request.Invoke().Result.Data.longRunning.Should().Be("finally returned"); + + // reset stuff + chatQuery.LongRunningQueryBlocker.Reset(); + request.Clear(); + + // cancellation test + request.Start(); + chatQuery.WaitingOnQueryBlocker.Wait(1000).Should().BeTrue("because the request should have reached the server by then"); + cts.Cancel(); + request.Invoking().Should().Throw("because the request was cancelled"); + + // let the server finish its query + chatQuery.LongRunningQueryBlocker.Set(); } - + [Fact] public async void CanHandleRequestErrorViaWebsocket() { - var port = NetworkHelpers.GetFreeTcpPortNumber(); - using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, true, Serializer); - var response = await client.SendQueryAsync("this query is formatted quite badly").ConfigureAwait(false); - - Assert.Single(response.Errors); - } + await ChatClient.InitializeWebsocketConnection(); + var response = await ChatClient.SendQueryAsync("this query is formatted quite badly"); + response.Errors.Should().ContainSingle("because the query is invalid"); } private const string SubscriptionQuery = @" @@ -92,36 +115,30 @@ public async void CanHandleRequestErrorViaWebsocket() { [Fact] public async void CanCreateObservableSubscription() { - var port = NetworkHelpers.GetFreeTcpPortNumber(); - using (CreateServer(port)){ - var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); - var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); - await client.InitializeWebsocketConnection(); - callbackMonitor.Should().HaveBeenInvokedWithPayload(); - - Debug.WriteLine("creating subscription stream"); - IObservable> observable = client.CreateSubscriptionStream(SubscriptionRequest); - - Debug.WriteLine("subscribing..."); - using (var tester = observable.Monitor()) { - const string message1 = "Hello World"; - - var response = await client.AddMessageAsync(message1).ConfigureAwait(false); - response.Data.AddMessage.Content.Should().Be(message1); - tester.Should().HaveReceivedPayload(TimeSpan.FromSeconds(3)) - .Which.Data.MessageAdded.Content.Should().Be(message1); - - const string message2 = "lorem ipsum dolor si amet"; - response = await client.AddMessageAsync(message2).ConfigureAwait(false); - response.Data.AddMessage.Content.Should().Be(message2); - tester.Should().HaveReceivedPayload() - .Which.Data.MessageAdded.Content.Should().Be(message2); - - // disposing the client should throw a TaskCanceledException on the subscription - client.Dispose(); - tester.Should().HaveCompleted(); - } - } + var callbackMonitor = ChatClient.ConfigureMonitorForOnWebsocketConnected(); + await ChatClient.InitializeWebsocketConnection(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); + + Debug.WriteLine("creating subscription stream"); + var observable = ChatClient.CreateSubscriptionStream(SubscriptionRequest); + + Debug.WriteLine("subscribing..."); + using var tester = observable.Monitor(); + tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(InitialMessage.Content); + + const string message1 = "Hello World"; + var response = await ChatClient.AddMessageAsync(message1); + response.Data.AddMessage.Content.Should().Be(message1); + tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message1); + + const string message2 = "lorem ipsum dolor si amet"; + response = await ChatClient.AddMessageAsync(message2); + response.Data.AddMessage.Content.Should().Be(message2); + tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message2); + + // disposing the client should throw a TaskCanceledException on the subscription + ChatClient.Dispose(); + tester.Should().HaveCompleted(); } public class MessageAddedSubscriptionResult { @@ -135,48 +152,46 @@ public class MessageAddedContent { [Fact] public async void CanReconnectWithSameObservable() { - var port = NetworkHelpers.GetFreeTcpPortNumber(); - using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); - var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); - - Debug.WriteLine("creating subscription stream"); - var observable = client.CreateSubscriptionStream(SubscriptionRequest); - - Debug.WriteLine("subscribing..."); - var tester = observable.Monitor(); - callbackMonitor.Should().HaveBeenInvokedWithPayload(); - - const string message1 = "Hello World"; - var response = await client.AddMessageAsync(message1).ConfigureAwait(false); - response.Data.AddMessage.Content.Should().Be(message1); - tester.Should().HaveReceivedPayload() - .Which.Data.MessageAdded.Content.Should().Be(message1); - - const string message2 = "How are you?"; - response = await client.AddMessageAsync(message2).ConfigureAwait(false); - response.Data.AddMessage.Content.Should().Be(message2); - tester.Should().HaveReceivedPayload() - .Which.Data.MessageAdded.Content.Should().Be(message2); - - Debug.WriteLine("disposing subscription..."); - tester.Dispose(); // does not close the websocket connection - - Debug.WriteLine("creating new subscription..."); - tester = observable.Monitor(); - tester.Should().HaveReceivedPayload(TimeSpan.FromSeconds(10)) - .Which.Data.MessageAdded.Content.Should().Be(message2); - - const string message3 = "lorem ipsum dolor si amet"; - response = await client.AddMessageAsync(message3).ConfigureAwait(false); - response.Data.AddMessage.Content.Should().Be(message3); - tester.Should().HaveReceivedPayload() - .Which.Data.MessageAdded.Content.Should().Be(message3); - - // disposing the client should complete the subscription - client.Dispose(); - tester.Should().HaveCompleted(); - } + var callbackMonitor = ChatClient.ConfigureMonitorForOnWebsocketConnected(); + + Debug.WriteLine("creating subscription stream"); + var observable = ChatClient.CreateSubscriptionStream(SubscriptionRequest); + + Debug.WriteLine("subscribing..."); + var tester = observable.Monitor(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); + await ChatClient.InitializeWebsocketConnection(); + Debug.WriteLine("websocket connection initialized"); + tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(InitialMessage.Content); + + const string message1 = "Hello World"; + Debug.WriteLine($"adding message {message1}"); + var response = await ChatClient.AddMessageAsync(message1).ConfigureAwait(true); + response.Data.AddMessage.Content.Should().Be(message1); + tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message1); + + const string message2 = "How are you?"; + response = await ChatClient.AddMessageAsync(message2).ConfigureAwait(true); + response.Data.AddMessage.Content.Should().Be(message2); + tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message2); + + Debug.WriteLine("disposing subscription..."); + tester.Dispose(); // does not close the websocket connection + + Debug.WriteLine($"creating new subscription from thread {Thread.CurrentThread.ManagedThreadId} ..."); + var tester2 = observable.Monitor(); + Debug.WriteLine($"waiting for payload on {Thread.CurrentThread.ManagedThreadId} ..."); + tester2.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message2); + + const string message3 = "lorem ipsum dolor si amet"; + response = await ChatClient.AddMessageAsync(message3).ConfigureAwait(true); + response.Data.AddMessage.Content.Should().Be(message3); + tester2.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message3); + + // disposing the client should complete the subscription + ChatClient.Dispose(); + tester2.Should().HaveCompleted(); + tester2.Dispose(); } private const string SubscriptionQuery2 = @" @@ -205,75 +220,73 @@ public async void CanConnectTwoSubscriptionsSimultaneously() { var port = NetworkHelpers.GetFreeTcpPortNumber(); var callbackTester = new CallbackMonitor(); var callbackTester2 = new CallbackMonitor(); - using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); - var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); - await client.InitializeWebsocketConnection(); - callbackMonitor.Should().HaveBeenInvokedWithPayload(); - Debug.WriteLine("creating subscription stream"); - IObservable> observable1 = - client.CreateSubscriptionStream(SubscriptionRequest, callbackTester.Invoke); - IObservable> observable2 = - client.CreateSubscriptionStream(SubscriptionRequest2, callbackTester2.Invoke); - - Debug.WriteLine("subscribing..."); - var tester = observable1.Monitor(); - var tester2 = observable2.Monitor(); - - const string message1 = "Hello World"; - var response = await client.AddMessageAsync(message1).ConfigureAwait(false); - response.Data.AddMessage.Content.Should().Be(message1); - tester.Should().HaveReceivedPayload() - .Which.Data.MessageAdded.Content.Should().Be(message1); - - var joinResponse = await client.JoinDeveloperUser().ConfigureAwait(false); - joinResponse.Data.Join.DisplayName.Should().Be("developer", "because that's the display name of user \"1\""); - - var payload = tester2.Should().HaveReceivedPayload().Subject; - payload.Data.UserJoined.Id.Should().Be("1", "because that's the id we sent with our mutation request"); - payload.Data.UserJoined.DisplayName.Should().Be("developer", "because that's the display name of user \"1\""); - - Debug.WriteLine("disposing subscription..."); - tester2.Dispose(); - - const string message3 = "lorem ipsum dolor si amet"; - response = await client.AddMessageAsync(message3).ConfigureAwait(false); - response.Data.AddMessage.Content.Should().Be(message3); - tester.Should().HaveReceivedPayload() - .Which.Data.MessageAdded.Content.Should().Be(message3); - - // disposing the client should complete the subscription - client.Dispose(); - tester.Should().HaveCompleted(); - } + var callbackMonitor = ChatClient.ConfigureMonitorForOnWebsocketConnected(); + await ChatClient.InitializeWebsocketConnection(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); + + Debug.WriteLine("creating subscription stream"); + var observable1 = ChatClient.CreateSubscriptionStream(SubscriptionRequest, callbackTester.Invoke); + var observable2 = ChatClient.CreateSubscriptionStream(SubscriptionRequest2, callbackTester2.Invoke); + + Debug.WriteLine("subscribing..."); + var messagesMonitor = observable1.Monitor(); + var joinedMonitor = observable2.Monitor(); + + messagesMonitor.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(InitialMessage.Content); + + const string message1 = "Hello World"; + var response = await ChatClient.AddMessageAsync(message1); + response.Data.AddMessage.Content.Should().Be(message1); + messagesMonitor.Should().HaveReceivedPayload() + .Which.Data.MessageAdded.Content.Should().Be(message1); + joinedMonitor.Should().NotHaveReceivedPayload(); + + var joinResponse = await ChatClient.JoinDeveloperUser(); + joinResponse.Data.Join.DisplayName.Should().Be("developer", "because that's the display name of user \"1\""); + + var payload = joinedMonitor.Should().HaveReceivedPayload().Subject; + payload.Data.UserJoined.Id.Should().Be("1", "because that's the id we sent with our mutation request"); + payload.Data.UserJoined.DisplayName.Should().Be("developer", "because that's the display name of user \"1\""); + messagesMonitor.Should().NotHaveReceivedPayload(); + + Debug.WriteLine("disposing subscription..."); + joinedMonitor.Dispose(); + + const string message3 = "lorem ipsum dolor si amet"; + response = await ChatClient.AddMessageAsync(message3); + response.Data.AddMessage.Content.Should().Be(message3); + messagesMonitor.Should().HaveReceivedPayload() + .Which.Data.MessageAdded.Content.Should().Be(message3); + + // disposing the client should complete the subscription + ChatClient.Dispose(); + messagesMonitor.Should().HaveCompleted(); } [Fact] public async void CanHandleConnectionTimeout() { - var port = NetworkHelpers.GetFreeTcpPortNumber(); - var server = CreateServer(port); var errorMonitor = new CallbackMonitor(); var reconnectBlocker = new ManualResetEventSlim(false); - var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); - var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); + var callbackMonitor = ChatClient.ConfigureMonitorForOnWebsocketConnected(); // configure back-off strategy to allow it to be controlled from within the unit test - client.Options.BackOffStrategy = i => { + ChatClient.Options.BackOffStrategy = i => { + Debug.WriteLine("back-off strategy: waiting on reconnect blocker"); reconnectBlocker.Wait(); + Debug.WriteLine("back-off strategy: reconnecting..."); return TimeSpan.Zero; }; var websocketStates = new ConcurrentQueue(); - using (client.WebsocketConnectionState.Subscribe(websocketStates.Enqueue)) { + using (ChatClient.WebsocketConnectionState.Subscribe(websocketStates.Enqueue)) { websocketStates.Should().ContainSingle(state => state == GraphQLWebsocketConnectionState.Disconnected); + Debug.WriteLine($"Test method thread id: {Thread.CurrentThread.ManagedThreadId}"); Debug.WriteLine("creating subscription stream"); - IObservable> observable = - client.CreateSubscriptionStream(SubscriptionRequest, - errorMonitor.Invoke); + var observable = ChatClient.CreateSubscriptionStream(SubscriptionRequest, errorMonitor.Invoke); Debug.WriteLine("subscribing..."); var tester = observable.Monitor(); @@ -286,95 +299,91 @@ public async void CanHandleConnectionTimeout() { // clear the collection so the next tests on the collection work as expected websocketStates.Clear(); + tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(InitialMessage.Content); + const string message1 = "Hello World"; - var response = await client.AddMessageAsync(message1).ConfigureAwait(false); + var response = await ChatClient.AddMessageAsync(message1); response.Data.AddMessage.Content.Should().Be(message1); - tester.Should().HaveReceivedPayload() - .Which.Data.MessageAdded.Content.Should().Be(message1); + tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message1); Debug.WriteLine("stopping web host..."); - await server.StopAsync(CancellationToken.None).ConfigureAwait(false); - server.Dispose(); - Debug.WriteLine("web host stopped..."); + await Fixture.ShutdownServer(); + Debug.WriteLine("web host stopped"); - errorMonitor.Should().HaveBeenInvokedWithPayload(TimeSpan.FromSeconds(10)) + errorMonitor.Should().HaveBeenInvokedWithPayload(10.Seconds()) .Which.Should().BeOfType(); websocketStates.Should().Contain(GraphQLWebsocketConnectionState.Disconnected); - server = CreateServer(port); + Debug.WriteLine("restarting web host..."); + await InitializeAsync(); + Debug.WriteLine("web host started"); reconnectBlocker.Set(); - callbackMonitor.Should().HaveBeenInvokedWithPayload(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(3.Seconds()); + tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(InitialMessage.Content); + websocketStates.Should().ContainInOrder( GraphQLWebsocketConnectionState.Disconnected, GraphQLWebsocketConnectionState.Connecting, GraphQLWebsocketConnectionState.Connected); // disposing the client should complete the subscription - client.Dispose(); - tester.Should().HaveCompleted(TimeSpan.FromSeconds(5)); - server.Dispose(); + ChatClient.Dispose(); + tester.Should().HaveCompleted(5.Seconds()); } } [Fact] public async void CanHandleSubscriptionError() { - var port = NetworkHelpers.GetFreeTcpPortNumber(); - using (CreateServer(port)) { - var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); - var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); - await client.InitializeWebsocketConnection(); - callbackMonitor.Should().HaveBeenInvokedWithPayload(); - Debug.WriteLine("creating subscription stream"); - IObservable> observable = client.CreateSubscriptionStream( - new GraphQLRequest(@" - subscription { - failImmediately { - content - } - }") - ); + var callbackMonitor = ChatClient.ConfigureMonitorForOnWebsocketConnected(); + await ChatClient.InitializeWebsocketConnection(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); + Debug.WriteLine("creating subscription stream"); + IObservable> observable = ChatClient.CreateSubscriptionStream( + new GraphQLRequest(@" + subscription { + failImmediately { + content + } + }") + ); - Debug.WriteLine("subscribing..."); - using (var tester = observable.Monitor()) { - tester.Should().HaveReceivedPayload(TimeSpan.FromSeconds(3)) - .Which.Errors.Should().ContainSingle(); - tester.Should().HaveCompleted(); - client.Dispose(); - } + Debug.WriteLine("subscribing..."); + using (var tester = observable.Monitor()) { + tester.Should().HaveReceivedPayload(TimeSpan.FromSeconds(3)) + .Which.Errors.Should().ContainSingle(); + tester.Should().HaveCompleted(); + ChatClient.Dispose(); } + } [Fact] public async void CanHandleQueryErrorInSubscription() { - var port = NetworkHelpers.GetFreeTcpPortNumber(); - using (CreateServer(port)) { - - var test = new GraphQLRequest("tset", new { test = "blaa" }); - - var client = WebHostHelpers.GetGraphQLClient(port, serializer: Serializer); - var callbackMonitor = client.ConfigureMonitorForOnWebsocketConnected(); - await client.InitializeWebsocketConnection(); - callbackMonitor.Should().HaveBeenInvokedWithPayload(); - Debug.WriteLine("creating subscription stream"); - IObservable> observable = client.CreateSubscriptionStream( - new GraphQLRequest(@" - subscription { - fieldDoesNotExist { - content - } - }") - ); - - Debug.WriteLine("subscribing..."); - using (var tester = observable.Monitor()) { - tester.Should().HaveReceivedPayload() - .Which.Errors.Should().ContainSingle(); - tester.Should().HaveCompleted(); - client.Dispose(); - } + var test = new GraphQLRequest("tset", new { test = "blaa" }); + + var callbackMonitor = ChatClient.ConfigureMonitorForOnWebsocketConnected(); + await ChatClient.InitializeWebsocketConnection(); + callbackMonitor.Should().HaveBeenInvokedWithPayload(); + Debug.WriteLine("creating subscription stream"); + IObservable> observable = ChatClient.CreateSubscriptionStream( + new GraphQLRequest(@" + subscription { + fieldDoesNotExist { + content + } + }") + ); + + Debug.WriteLine("subscribing..."); + using (var tester = observable.Monitor()) { + tester.Should().HaveReceivedPayload() + .Which.Errors.Should().ContainSingle(); + tester.Should().HaveCompleted(); + ChatClient.Dispose(); } } + } } diff --git a/tests/GraphQL.Integration.Tests/WebsocketTests/Newtonsoft.cs b/tests/GraphQL.Integration.Tests/WebsocketTests/Newtonsoft.cs index 02a0b030..76169663 100644 --- a/tests/GraphQL.Integration.Tests/WebsocketTests/Newtonsoft.cs +++ b/tests/GraphQL.Integration.Tests/WebsocketTests/Newtonsoft.cs @@ -1,10 +1,11 @@ using GraphQL.Client.Serializer.Newtonsoft; +using GraphQL.Integration.Tests.Helpers; +using Xunit; using Xunit.Abstractions; namespace GraphQL.Integration.Tests.WebsocketTests { - public class Newtonsoft: Base { - public Newtonsoft(ITestOutputHelper output) : base(output, new NewtonsoftJsonSerializer()) - { - } + public class Newtonsoft: Base, IClassFixture { + public Newtonsoft(ITestOutputHelper output, NewtonsoftIntegrationServerTestFixture fixture) : base(output, fixture) + { } } } diff --git a/tests/GraphQL.Integration.Tests/WebsocketTests/SystemTextJson.cs b/tests/GraphQL.Integration.Tests/WebsocketTests/SystemTextJson.cs index 3a7882e4..fc7698ed 100644 --- a/tests/GraphQL.Integration.Tests/WebsocketTests/SystemTextJson.cs +++ b/tests/GraphQL.Integration.Tests/WebsocketTests/SystemTextJson.cs @@ -1,9 +1,10 @@ -using GraphQL.Client.Serializer.SystemTextJson; +using GraphQL.Integration.Tests.Helpers; +using Xunit; using Xunit.Abstractions; namespace GraphQL.Integration.Tests.WebsocketTests { - public class SystemTextJson: Base { - public SystemTextJson(ITestOutputHelper output) : base(output, new SystemTextJsonSerializer()) + public class SystemTextJson: Base, IClassFixture { + public SystemTextJson(ITestOutputHelper output, SystemTextJsonIntegrationServerTestFixture fixture) : base(output, fixture) { } } diff --git a/tests/IntegrationTestServer/Program.cs b/tests/IntegrationTestServer/Program.cs index 67cbd96b..ec3530e1 100644 --- a/tests/IntegrationTestServer/Program.cs +++ b/tests/IntegrationTestServer/Program.cs @@ -10,7 +10,7 @@ public static void Main(string[] args) { public static IWebHostBuilder CreateWebHostBuilder(string[] args) => WebHost.CreateDefaultBuilder(args) - .UseStartup() + .UseStartup() .ConfigureLogging((ctx, logging) => logging.SetMinimumLevel(LogLevel.Debug)); } } diff --git a/tests/IntegrationTestServer/Startup.cs b/tests/IntegrationTestServer/Startup.cs index 0ae7280e..2f25f733 100644 --- a/tests/IntegrationTestServer/Startup.cs +++ b/tests/IntegrationTestServer/Startup.cs @@ -1,8 +1,11 @@ using GraphQL; +using GraphQL.Client.Tests.Common; +using GraphQL.Client.Tests.Common.Chat.Schema; using GraphQL.Server; using GraphQL.Server.Ui.GraphiQL; -using GraphQL.Server.Ui.Voyager; using GraphQL.Server.Ui.Playground; +using GraphQL.StarWars; +using GraphQL.Types; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Server.Kestrel.Core; @@ -11,8 +14,8 @@ using Microsoft.Extensions.Hosting; namespace IntegrationTestServer { - public abstract class Startup { - protected Startup(IConfiguration configuration, IWebHostEnvironment environment) { + public class Startup { + public Startup(IConfiguration configuration, IWebHostEnvironment environment) { Configuration = configuration; Environment = environment; } @@ -29,9 +32,8 @@ public void ConfigureServices(IServiceCollection services) { }); services.AddTransient(provider => new FuncDependencyResolver(provider.GetService)); - - ConfigureGraphQLSchemaServices(services); - + services.AddChatSchema(); + services.AddStarWarsSchema(); services.AddGraphQL(options => { options.EnableMetrics = true; options.ExposeExceptions = Environment.IsDevelopment(); @@ -39,9 +41,6 @@ public void ConfigureServices(IServiceCollection services) { .AddWebSockets(); } - public abstract void ConfigureGraphQLSchemaServices(IServiceCollection services); - - // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { @@ -50,21 +49,23 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { app.UseWebSockets(); - ConfigureGraphQLSchema(app); + ConfigureGraphQLSchema(app, Common.ChatEndpoint); + ConfigureGraphQLSchema(app, Common.StarWarsEndpoint); app.UseGraphiQLServer(new GraphiQLOptions { GraphiQLPath = "/ui/graphiql", - GraphQLEndPoint = "/graphql" - }); - app.UseGraphQLVoyager(new GraphQLVoyagerOptions() { - GraphQLEndPoint = "/graphql", - Path = "/ui/voyager" + GraphQLEndPoint = Common.StarWarsEndpoint }); app.UseGraphQLPlayground(new GraphQLPlaygroundOptions { - Path = "/ui/playground" + Path = "/ui/playground", + GraphQLEndPoint = Common.ChatEndpoint }); } - public abstract void ConfigureGraphQLSchema(IApplicationBuilder app); + private void ConfigureGraphQLSchema(IApplicationBuilder app, string endpoint) where TSchema: Schema + { + app.UseGraphQLWebSockets(endpoint); + app.UseGraphQL(endpoint); + } } } diff --git a/tests/IntegrationTestServer/StartupChat.cs b/tests/IntegrationTestServer/StartupChat.cs deleted file mode 100644 index 2a782991..00000000 --- a/tests/IntegrationTestServer/StartupChat.cs +++ /dev/null @@ -1,22 +0,0 @@ -using GraphQL.Client.Tests.Common; -using GraphQL.Client.Tests.Common.Chat.Schema; -using GraphQL.Server; -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; - -namespace IntegrationTestServer { - public class StartupChat: Startup { - public StartupChat(IConfiguration configuration, IWebHostEnvironment environment): base(configuration, environment) { } - - public override void ConfigureGraphQLSchemaServices(IServiceCollection services) { - services.AddChatSchema(); - } - - public override void ConfigureGraphQLSchema(IApplicationBuilder app) { - app.UseGraphQLWebSockets("/graphql"); - app.UseGraphQL("/graphql"); - } - } -} diff --git a/tests/IntegrationTestServer/StartupStarWars.cs b/tests/IntegrationTestServer/StartupStarWars.cs deleted file mode 100644 index 15609515..00000000 --- a/tests/IntegrationTestServer/StartupStarWars.cs +++ /dev/null @@ -1,22 +0,0 @@ -using GraphQL.Client.Tests.Common; -using GraphQL.Server; -using GraphQL.StarWars; -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; - -namespace IntegrationTestServer { - public class StartupStarWars: Startup { - public StartupStarWars(IConfiguration configuration, IWebHostEnvironment environment): base(configuration, environment) { } - - public override void ConfigureGraphQLSchemaServices(IServiceCollection services) { - services.AddStarWarsSchema(); - } - - public override void ConfigureGraphQLSchema(IApplicationBuilder app) { - app.UseGraphQLWebSockets("/graphql"); - app.UseGraphQL("/graphql"); - } - } -}