/
GraphQLHttpSubscriptionResult.cs
174 lines (148 loc) · 7.49 KB
/
GraphQLHttpSubscriptionResult.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
using System;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GraphQL.Common.Request;
using GraphQL.Common.Response;
using Newtonsoft.Json;
namespace GraphQL.Client.Http {
/// <summary>
/// Represents the result of a subscription query
/// </summary>
[Obsolete("EXPERIMENTAL API")]
public class GraphQLHttpSubscriptionResult : IGraphQLSubscriptionResult {
public event Action<GraphQLResponse> OnReceive;
public GraphQLResponse LastResponse { get; private set; }
private readonly ClientWebSocket clientWebSocket = new ClientWebSocket();
private readonly Uri webSocketUri;
private readonly GraphQLRequest graphQLRequest;
private readonly byte[] buffer = new byte[1024 * 1024];
internal GraphQLHttpSubscriptionResult(Uri webSocketUri, GraphQLRequest graphQLRequest) {
this.webSocketUri = webSocketUri;
this.graphQLRequest = graphQLRequest;
this.clientWebSocket.Options.AddSubProtocol("graphql-ws");
}
public async void StartAsync(CancellationToken cancellationToken = default) {
await this.clientWebSocket.ConnectAsync(this.webSocketUri, cancellationToken).ConfigureAwait(false);
if (this.clientWebSocket.State == WebSocketState.Open) {
var arraySegment = new ArraySegment<byte>(this.buffer);
await this.SendInitialMessageAsync(cancellationToken).ConfigureAwait(false);
while (this.clientWebSocket.State == WebSocketState.Open) {
var webSocketReceiveResult = await this.clientWebSocket.ReceiveAsync(arraySegment, cancellationToken);
var stringResult = Encoding.UTF8.GetString(arraySegment.Array, 0, webSocketReceiveResult.Count);
var webSocketResponse = JsonConvert.DeserializeObject<GraphQLSubscriptionResponse>(stringResult);
if (webSocketResponse != null) {
this.LastResponse = webSocketResponse.Payload;
this.OnReceive?.Invoke(webSocketResponse.Payload);
}
}
}
}
public async Task StopAsync(CancellationToken cancellationToken = default) {
if (this.clientWebSocket.State == WebSocketState.Open) {
await this.SendCloseMessageAsync();
}
await this.clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", cancellationToken);
}
public void Dispose() {
this.StopAsync().Wait();
this.clientWebSocket.Dispose();
}
private Task SendInitialMessageAsync(CancellationToken cancellationToken = default) {
var webSocketRequest = new GraphQLSubscriptionRequest {
Id = "1",
Type = GQLWebSocketMessageType.GQL_START,
Payload = this.graphQLRequest
};
return this.SendGraphQLSubscriptionRequest(webSocketRequest);
}
private Task SendCloseMessageAsync(CancellationToken cancellationToken = default) {
var webSocketRequest = new GraphQLSubscriptionRequest {
Id = "1",
Type = GQLWebSocketMessageType.GQL_STOP,
Payload = this.graphQLRequest
};
return this.SendGraphQLSubscriptionRequest(webSocketRequest);
}
private Task SendGraphQLSubscriptionRequest(GraphQLSubscriptionRequest graphQLSubscriptionRequest,CancellationToken cancellationToken = default) {
var webSocketRequestString = JsonConvert.SerializeObject(graphQLSubscriptionRequest);
var arraySegmentWebSocketRequest = new ArraySegment<byte>(Encoding.UTF8.GetBytes(webSocketRequestString));
return this.clientWebSocket.SendAsync(arraySegmentWebSocketRequest, WebSocketMessageType.Text, true, cancellationToken);
}
private static class GQLWebSocketMessageType {
/// <summary>
/// Client sends this message after plain websocket connection to start the communication with the server
/// The server will response only with GQL_CONNECTION_ACK + GQL_CONNECTION_KEEP_ALIVE(if used) or GQL_CONNECTION_ERROR
/// to this message.
/// payload: Object : optional parameters that the client specifies in connectionParams
/// </summary>
public const string GQL_CONNECTION_INIT = "connection_init";
/// <summary>
/// The server may responses with this message to the GQL_CONNECTION_INIT from client, indicates the server accepted
/// the connection.
/// </summary>
public const string GQL_CONNECTION_ACK = "connection_ack"; // Server -> Client
/// <summary>
/// The server may responses with this message to the GQL_CONNECTION_INIT from client, indicates the server rejected
/// the connection.
/// It server also respond with this message in case of a parsing errors of the message (which does not disconnect the
/// client, just ignore the message).
/// payload: Object: the server side error
/// </summary>
public const string GQL_CONNECTION_ERROR = "connection_error"; // Server -> Client
/// <summary>
/// Server message that should be sent right after each GQL_CONNECTION_ACK processed and then periodically to keep the
/// client connection alive.
/// The client starts to consider the keep alive message only upon the first received keep alive message from the
/// server.
/// <remarks>
/// NOTE: This one here don't follow the standard due to connection optimization
/// </remarks>
/// </summary>
public const string GQL_CONNECTION_KEEP_ALIVE = "ka"; // Server -> Client
/// <summary>
/// Client sends this message in order to stop a running GraphQL operation execution (for example: unsubscribe)
/// id: string : operation id
/// </summary>
public const string GQL_CONNECTION_TERMINATE = "connection_terminate"; // Client -> Server
/// <summary>
/// Client sends this message to execute GraphQL operation
/// id: string : The id of the GraphQL operation to start
/// payload: Object:
/// query: string : GraphQL operation as string or parsed GraphQL document node
/// variables?: Object : Object with GraphQL variables
/// operationName?: string : GraphQL operation name
/// </summary>
public const string GQL_START = "start";
/// <summary>
/// The server sends this message to transfer the GraphQL execution result from the server to the client, this message
/// is a response for GQL_START message.
/// For each GraphQL operation send with GQL_START, the server will respond with at least one GQL_DATA message.
/// id: string : ID of the operation that was successfully set up
/// payload: Object :
/// data: any: Execution result
/// errors?: Error[] : Array of resolvers errors
/// </summary>
public const string GQL_DATA = "data"; // Server -> Client
/// <summary>
/// Server sends this message upon a failing operation, before the GraphQL execution, usually due to GraphQL validation
/// errors (resolver errors are part of GQL_DATA message, and will be added as errors array)
/// payload: Error : payload with the error attributed to the operation failing on the server
/// id: string : operation ID of the operation that failed on the server
/// </summary>
public const string GQL_ERROR = "error"; // Server -> Client
/// <summary>
/// Server sends this message to indicate that a GraphQL operation is done, and no more data will arrive for the
/// specific operation.
/// id: string : operation ID of the operation that completed
/// </summary>
public const string GQL_COMPLETE = "complete"; // Server -> Client
/// <summary>
/// Client sends this message in order to stop a running GraphQL operation execution (for example: unsubscribe)
/// id: string : operation id
/// </summary>
public const string GQL_STOP = "stop"; // Client -> Server
}
}
}