diff --git a/src/GraphQL.Client.Abstractions.Websocket/GraphQLWebSocketMessageType.cs b/src/GraphQL.Client.Abstractions.Websocket/GraphQLWebSocketMessageType.cs index daa83114..3b0e0499 100644 --- a/src/GraphQL.Client.Abstractions.Websocket/GraphQLWebSocketMessageType.cs +++ b/src/GraphQL.Client.Abstractions.Websocket/GraphQLWebSocketMessageType.cs @@ -38,8 +38,7 @@ public static class GraphQLWebSocketMessageType public const string GQL_CONNECTION_KEEP_ALIVE = "ka"; // Server -> Client /// - /// Client sends this message in order to stop a running GraphQL operation execution (for example: unsubscribe) - /// id: string : operation id + /// Client sends this message to terminate the connection. /// public const string GQL_CONNECTION_TERMINATE = "connection_terminate"; // Client -> Server diff --git a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs index 28af82d6..350aae6a 100644 --- a/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs +++ b/src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs @@ -8,6 +8,7 @@ using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; +using System.Text; using System.Threading; using System.Threading.Tasks; using GraphQL.Client.Abstractions.Websocket; @@ -111,12 +112,6 @@ public IObservable> CreateSubscriptionStream>(o => IncomingMessageStream @@ -187,20 +182,8 @@ public IObservable> CreateSubscriptionStream SendWebSocketRequestAsync(GraphQLWebSocketRequest reque } await InitializeWebSocket(); - var requestBytes = _client.JsonSerializer.SerializeToBytes(request); - await _clientWebSocket.SendAsync( - new ArraySegment(requestBytes), - WebSocketMessageType.Text, - true, - _internalCancellationToken); + await SendWebSocketMessageAsync(request, _internalCancellationToken); request.SendCompleted(); } catch (Exception e) @@ -369,6 +347,16 @@ await _clientWebSocket.SendAsync( return Unit.Default; } + private async Task SendWebSocketMessageAsync(GraphQLWebSocketRequest request, CancellationToken cancellationToken = default) + { + var requestBytes = _client.JsonSerializer.SerializeToBytes(request); + await _clientWebSocket.SendAsync( + new ArraySegment(requestBytes), + WebSocketMessageType.Text, + true, + cancellationToken); + } + #endregion public Task InitializeWebSocket() @@ -469,9 +457,38 @@ private async Task ConnectAsync(CancellationToken token) Debug.WriteLine($"new incoming message stream {_incomingMessages.GetHashCode()} created"); _incomingMessagesConnection = new CompositeDisposable(maintenanceSubscription, connection); + + var initRequest = new GraphQLWebSocketRequest + { + Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT, + Payload = Options.ConfigureWebSocketConnectionInitPayload(Options) + }; + + // setup task to await connection_ack message + var ackTask = _incomingMessages + .Where(response => response != null ) + .TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ACK || + response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ERROR) + .FirstAsync() + .ToTask(); + + // send connection init + Debug.WriteLine($"sending connection init message"); + await SendWebSocketMessageAsync(initRequest); + var response = await ackTask; + + if (response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ACK) + Debug.WriteLine($"connection acknowledged: {Encoding.UTF8.GetString(response.MessageBytes)}"); + else + { + var errorPayload = Encoding.UTF8.GetString(response.MessageBytes); + Debug.WriteLine($"connection error received: {errorPayload}"); + throw new GraphQLWebsocketConnectionException(errorPayload); + } } catch (Exception e) { + Debug.WriteLine($"failed to establish websocket connection"); _stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); _exceptionSubject.OnNext(e); throw; @@ -600,6 +617,9 @@ private async Task CloseAsync() return; } + Debug.WriteLine($"send \"connection_terminate\" message"); + await SendWebSocketMessageAsync(new GraphQLWebSocketRequest{Type = GraphQLWebSocketMessageType.GQL_CONNECTION_TERMINATE}); + Debug.WriteLine($"closing websocket {_clientWebSocket.GetHashCode()}"); await _clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); _stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected); diff --git a/src/GraphQL.Client/Websocket/GraphQLWebsocketConnectionException.cs b/src/GraphQL.Client/Websocket/GraphQLWebsocketConnectionException.cs new file mode 100644 index 00000000..c6b2e831 --- /dev/null +++ b/src/GraphQL.Client/Websocket/GraphQLWebsocketConnectionException.cs @@ -0,0 +1,25 @@ +using System; +using System.Runtime.Serialization; + +namespace GraphQL.Client.Http.Websocket +{ + [Serializable] + public class GraphQLWebsocketConnectionException: Exception + { + public GraphQLWebsocketConnectionException() + { + } + + protected GraphQLWebsocketConnectionException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } + + public GraphQLWebsocketConnectionException(string message) : base(message) + { + } + + public GraphQLWebsocketConnectionException(string message, Exception innerException) : base(message, innerException) + { + } + } +}