Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public static class GraphQLWebSocketMessageType
public const string GQL_CONNECTION_KEEP_ALIVE = "ka"; // Server -> Client

/// <summary>
/// Client sends this message in order to stop a running GraphQL operation execution (for example: unsubscribe)
/// id: string : operation id
/// Client sends this message to terminate the connection.
/// </summary>
public const string GQL_CONNECTION_TERMINATE = "connection_terminate"; // Client -> Server

Expand Down
72 changes: 46 additions & 26 deletions src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GraphQL.Client.Abstractions.Websocket;
Expand Down Expand Up @@ -111,12 +112,6 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
Id = startRequest.Id,
Type = GraphQLWebSocketMessageType.GQL_STOP
};
var initRequest = new GraphQLWebSocketRequest
{
Id = startRequest.Id,
Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT,
Payload = Options.ConfigureWebSocketConnectionInitPayload(Options)
};

var observable = Observable.Create<GraphQLResponse<TResponse>>(o =>
IncomingMessageStream
Expand Down Expand Up @@ -187,20 +182,8 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
catch (OperationCanceledException) { }
})
);

// send connection init
Debug.WriteLine($"sending connection init on subscription {startRequest.Id}");
try
{
await QueueWebSocketRequest(initRequest);
}
catch (Exception e)
{
Debug.WriteLine(e);
throw;
}

Debug.WriteLine($"sending initial message on subscription {startRequest.Id}");

Debug.WriteLine($"sending start message on subscription {startRequest.Id}");
// send subscription request
try
{
Expand Down Expand Up @@ -354,12 +337,7 @@ private async Task<Unit> SendWebSocketRequestAsync(GraphQLWebSocketRequest reque
}

await InitializeWebSocket();
var requestBytes = _client.JsonSerializer.SerializeToBytes(request);
await _clientWebSocket.SendAsync(
new ArraySegment<byte>(requestBytes),
WebSocketMessageType.Text,
true,
_internalCancellationToken);
await SendWebSocketMessageAsync(request, _internalCancellationToken);
request.SendCompleted();
}
catch (Exception e)
Expand All @@ -369,6 +347,16 @@ await _clientWebSocket.SendAsync(
return Unit.Default;
}

private async Task SendWebSocketMessageAsync(GraphQLWebSocketRequest request, CancellationToken cancellationToken = default)
{
var requestBytes = _client.JsonSerializer.SerializeToBytes(request);
await _clientWebSocket.SendAsync(
new ArraySegment<byte>(requestBytes),
WebSocketMessageType.Text,
true,
cancellationToken);
}

#endregion

public Task InitializeWebSocket()
Expand Down Expand Up @@ -469,9 +457,38 @@ private async Task ConnectAsync(CancellationToken token)
Debug.WriteLine($"new incoming message stream {_incomingMessages.GetHashCode()} created");

_incomingMessagesConnection = new CompositeDisposable(maintenanceSubscription, connection);

var initRequest = new GraphQLWebSocketRequest
{
Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT,
Payload = Options.ConfigureWebSocketConnectionInitPayload(Options)
};

// setup task to await connection_ack message
var ackTask = _incomingMessages
.Where(response => response != null )
.TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ACK ||
response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ERROR)
.FirstAsync()
.ToTask();

// send connection init
Debug.WriteLine($"sending connection init message");
await SendWebSocketMessageAsync(initRequest);
var response = await ackTask;

if (response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ACK)
Debug.WriteLine($"connection acknowledged: {Encoding.UTF8.GetString(response.MessageBytes)}");
else
{
var errorPayload = Encoding.UTF8.GetString(response.MessageBytes);
Debug.WriteLine($"connection error received: {errorPayload}");
throw new GraphQLWebsocketConnectionException(errorPayload);
}
}
catch (Exception e)
{
Debug.WriteLine($"failed to establish websocket connection");
_stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected);
_exceptionSubject.OnNext(e);
throw;
Expand Down Expand Up @@ -600,6 +617,9 @@ private async Task CloseAsync()
return;
}

Debug.WriteLine($"send \"connection_terminate\" message");
await SendWebSocketMessageAsync(new GraphQLWebSocketRequest{Type = GraphQLWebSocketMessageType.GQL_CONNECTION_TERMINATE});

Debug.WriteLine($"closing websocket {_clientWebSocket.GetHashCode()}");
await _clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
_stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using System.Runtime.Serialization;

namespace GraphQL.Client.Http.Websocket
{
[Serializable]
public class GraphQLWebsocketConnectionException: Exception
{
public GraphQLWebsocketConnectionException()
{
}

protected GraphQLWebsocketConnectionException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}

public GraphQLWebsocketConnectionException(string message) : base(message)
{
}

public GraphQLWebsocketConnectionException(string message, Exception innerException) : base(message, innerException)
{
}
}
}