Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion GraphQL.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".circleci", ".circleci", "{
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{9413EC62-CDDE-4E77-9784-E1136EA5D1EE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GraphQL.Client.Sample", "samples\GraphQL.Client.Sample\GraphQL.Client.Sample.csproj", "{B21E97C3-F328-473F-A054-A4BF272B63F0}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GraphQL.Client.Sample", "samples\GraphQL.Client.Sample\GraphQL.Client.Sample.csproj", "{B21E97C3-F328-473F-A054-A4BF272B63F0}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GraphQL.Integration.Tests", "tests\GraphQL.Integration.Tests\GraphQL.Integration.Tests.csproj", "{86BC3878-6549-4EF1-9672-B7C15A3FDF46}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "IntegrationTestServer", "tests\IntegrationTestServer\IntegrationTestServer.csproj", "{618653E5-41C2-4F17-BE4F-F904267500D4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -71,6 +75,14 @@ Global
{B21E97C3-F328-473F-A054-A4BF272B63F0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B21E97C3-F328-473F-A054-A4BF272B63F0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B21E97C3-F328-473F-A054-A4BF272B63F0}.Release|Any CPU.Build.0 = Release|Any CPU
{86BC3878-6549-4EF1-9672-B7C15A3FDF46}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{86BC3878-6549-4EF1-9672-B7C15A3FDF46}.Debug|Any CPU.Build.0 = Debug|Any CPU
{86BC3878-6549-4EF1-9672-B7C15A3FDF46}.Release|Any CPU.ActiveCfg = Release|Any CPU
{86BC3878-6549-4EF1-9672-B7C15A3FDF46}.Release|Any CPU.Build.0 = Release|Any CPU
{618653E5-41C2-4F17-BE4F-F904267500D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{618653E5-41C2-4F17-BE4F-F904267500D4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{618653E5-41C2-4F17-BE4F-F904267500D4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{618653E5-41C2-4F17-BE4F-F904267500D4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -83,6 +95,8 @@ Global
{6326E0E2-3F48-4BAF-80D3-47AED5EB647C} = {63F75859-4698-4EDE-8B70-4ACBB8BC425A}
{C1406F03-650F-4633-887D-312943251919} = {63F75859-4698-4EDE-8B70-4ACBB8BC425A}
{B21E97C3-F328-473F-A054-A4BF272B63F0} = {9413EC62-CDDE-4E77-9784-E1136EA5D1EE}
{86BC3878-6549-4EF1-9672-B7C15A3FDF46} = {0B0EDB0F-FF67-4B78-A8DB-B5C23E1FEE8C}
{618653E5-41C2-4F17-BE4F-F904267500D4} = {0B0EDB0F-FF67-4B78-A8DB-B5C23E1FEE8C}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {387AC1AC-F90C-4EF8-955A-04D495C75AF4}
Expand Down
5 changes: 3 additions & 2 deletions src/GraphQL.Client/GraphQL.Client.csproj
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="../src.props" />

<PropertyGroup>
<Description>A GraphQL Client</Description>
<TargetFrameworks>netstandard1.3;netstandard2.0</TargetFrameworks>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
Expand All @@ -14,6 +14,7 @@

<ItemGroup>
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.2" />
<PackageReference Include="System.Reactive" Version="4.1.2" />
</ItemGroup>

<!-- Allow method ReadAsAsync<T>
Expand Down
37 changes: 30 additions & 7 deletions src/GraphQL.Client/Http/GraphQLHttpClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using GraphQL.Client.Http.Internal;
Expand Down Expand Up @@ -40,6 +41,7 @@ public GraphQLHttpClientOptions Options {
#endregion

internal readonly GraphQLHttpHandler graphQLHttpHandler;
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();

/// <summary>
/// Initializes a new instance
Expand Down Expand Up @@ -117,23 +119,44 @@ public Task<IGraphQLSubscriptionResult> SendSubscribeAsync(string query, Cancell
this.SendSubscribeAsync(new GraphQLRequest { Query = query }, cancellationToken);

[Obsolete("EXPERIMENTAL API")]
public Task<IGraphQLSubscriptionResult> SendSubscribeAsync(GraphQLRequest request, CancellationToken cancellationToken = default) {
public Task<IGraphQLSubscriptionResult> SendSubscribeAsync(GraphQLRequest request, CancellationToken cancellationToken = default)
{
GraphQLHttpSubscriptionResult graphQLSubscriptionResult = _createSubscription(request, cancellationToken);
return Task.FromResult<IGraphQLSubscriptionResult>(graphQLSubscriptionResult);
}

private GraphQLHttpSubscriptionResult _createSubscription(GraphQLRequest request, CancellationToken cancellationToken)
{
if (request == null) { throw new ArgumentNullException(nameof(request)); }
if (request.Query == null) { throw new ArgumentNullException(nameof(request.Query)); }

var webSocketSchema = this.EndPoint.Scheme == "https" ? "wss" : "ws";
var webSocketUri = new Uri($"{webSocketSchema}://{this.EndPoint.Host}:{this.EndPoint.Port}{this.EndPoint.AbsolutePath}");
var graphQLSubscriptionResult = new GraphQLHttpSubscriptionResult(webSocketUri, request);
var graphQLSubscriptionResult = new GraphQLHttpSubscriptionResult(_getWebSocketUri(), request);
graphQLSubscriptionResult.StartAsync(cancellationToken);
return Task.FromResult<IGraphQLSubscriptionResult>(graphQLSubscriptionResult);
return graphQLSubscriptionResult;
}

private Uri _getWebSocketUri()
{
var webSocketSchema = this.EndPoint.Scheme == "https" ? "wss" : "ws";
return new Uri($"{webSocketSchema}://{this.EndPoint.Host}:{this.EndPoint.Port}{this.EndPoint.AbsolutePath}");
}

[Obsolete("EXPERIMENTAL API")]
public IObservable<GraphQLResponse> CreateSubscriptionStream(GraphQLRequest request)
{
var observable = GraphQLHttpObservableSubscription.GetSubscriptionStream(_getWebSocketUri(), request, _cancellationTokenSource.Token);
return observable;
}

/// <summary>
/// Releases unmanaged resources
/// </summary>
public void Dispose() =>
public void Dispose()
{
this.graphQLHttpHandler.Dispose();

_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();
}
}

}
268 changes: 268 additions & 0 deletions src/GraphQL.Client/Http/GraphQLHttpObservableSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
using System;
using System.Diagnostics;
using System.Net.WebSockets;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GraphQL.Common.Request;
using GraphQL.Common.Response;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace GraphQL.Client.Http {

/// <summary>
/// Represents the result of a subscription query
/// </summary>
[Obsolete("EXPERIMENTAL API")]
public class GraphQLHttpObservableSubscription : IDisposable {

private readonly ClientWebSocket clientWebSocket = new ClientWebSocket();
private readonly Uri webSocketUri;
private readonly GraphQLRequest graphQLRequest;
private readonly byte[] buffer = new byte[1024 * 1024];
private readonly ArraySegment<byte> arraySegment;
private readonly CancellationTokenSource _cancellationTokenSource;

private GraphQLHttpObservableSubscription(Uri webSocketUri, GraphQLRequest graphQLRequest, CancellationToken cancellationToken = default) {
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_cancellationTokenSource.Token.Register(Dispose);
this.webSocketUri = webSocketUri;
this.graphQLRequest = graphQLRequest;
this.clientWebSocket.Options.AddSubProtocol("graphql-ws");
arraySegment = new ArraySegment<byte>(buffer);
}

public async Task ConnectAsync(CancellationToken token)
{
Debug.WriteLine($"opening websocket on subscription {this.GetHashCode()}");
await clientWebSocket.ConnectAsync(webSocketUri, token).ConfigureAwait(false);
Debug.WriteLine($"connection established on subscription {this.GetHashCode()}");
}

public async Task<GraphQLResponse> ReceiveResultAsync()
{
var webSocketReceiveResult = await clientWebSocket.ReceiveAsync(arraySegment, _cancellationTokenSource.Token).ConfigureAwait(false);
var stringResult = Encoding.UTF8.GetString(arraySegment.Array, 0, webSocketReceiveResult.Count);
var webSocketResponse = JsonConvert.DeserializeObject<GraphQLSubscriptionResponse>(stringResult);
switch (webSocketResponse.Type)
{
case GQLWebSocketMessageType.GQL_COMPLETE:
Debug.WriteLine($"received 'complete' message on subscription {this.GetHashCode()}");
Dispose();
break;
case GQLWebSocketMessageType.GQL_ERROR:
Debug.WriteLine($"received 'error' message on subscription {this.GetHashCode()}");
throw new GQLSubscriptionException(webSocketResponse.Payload);
default:
Debug.WriteLine($"received payload on subscription {this.GetHashCode()}");
break;
}

return ((JObject)webSocketResponse?.Payload).ToObject<GraphQLResponse>();
}

public async Task CloseAsync(CancellationToken cancellationToken = default)
{
Debug.WriteLine($"closing websocket on subscription {this.GetHashCode()}");
if (this.clientWebSocket.State == WebSocketState.Open) {
await SendCloseMessageAsync(cancellationToken).ConfigureAwait(false);
}
await this.clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", cancellationToken).ConfigureAwait(false);
}

private Task SendInitialMessageAsync(CancellationToken cancellationToken = default)
{
Debug.WriteLine($"sending initial message on subscription {this.GetHashCode()}");
var webSocketRequest = new GraphQLSubscriptionRequest
{
Id = "1",
Type = GQLWebSocketMessageType.GQL_START,
Payload = this.graphQLRequest
};
return this.SendGraphQLSubscriptionRequest(webSocketRequest, cancellationToken);
}

private Task SendCloseMessageAsync(CancellationToken cancellationToken = default)
{
Debug.WriteLine($"sending close message on subscription {this.GetHashCode()}");
var webSocketRequest = new GraphQLSubscriptionRequest
{
Id = "1",
Type = GQLWebSocketMessageType.GQL_STOP,
Payload = this.graphQLRequest
};
return this.SendGraphQLSubscriptionRequest(webSocketRequest, cancellationToken);
}

private Task SendGraphQLSubscriptionRequest(GraphQLSubscriptionRequest graphQLSubscriptionRequest, CancellationToken cancellationToken = default)
{
var webSocketRequestString = JsonConvert.SerializeObject(graphQLSubscriptionRequest);
var arraySegmentWebSocketRequest = new ArraySegment<byte>(Encoding.UTF8.GetBytes(webSocketRequestString));
return this.clientWebSocket.SendAsync(arraySegmentWebSocketRequest, WebSocketMessageType.Text, true, cancellationToken);
}

#region IDisposable

public void Dispose()
{
// Async disposal as recommended by Stephen Cleary (https://blog.stephencleary.com/2013/03/async-oop-6-disposal.html)
if(Disposed == null) Disposed = DisposeAsync();
}

public Task Disposed { get; private set; }

private async Task DisposeAsync()
{
Debug.WriteLine($"disposing subscription {this.GetHashCode()}...");
if(!_cancellationTokenSource.IsCancellationRequested)
_cancellationTokenSource.Cancel();
await CloseAsync().ConfigureAwait(false);
clientWebSocket?.Dispose();
_cancellationTokenSource.Dispose();
Debug.WriteLine($"subscription {this.GetHashCode()} disposed");
}

#endregion

#region Static Factories

public static IObservable<GraphQLResponse> GetSubscriptionStream(Uri webSocketUri, GraphQLRequest graphQLRequest, CancellationToken cancellationToken = default)
{
return Observable.Using(
token => CreateSubscription(webSocketUri, graphQLRequest, cancellationToken),
InitializeSubscription
).Publish().RefCount();
}


private static Task<GraphQLHttpObservableSubscription> CreateSubscription(Uri webSocketUri, GraphQLRequest graphQLRequest, CancellationToken cancellationToken = default)
{
var subscription = new GraphQLHttpObservableSubscription(webSocketUri, graphQLRequest, cancellationToken);
return Task.FromResult(subscription);
}

private static async Task<IObservable<GraphQLResponse>> InitializeSubscription(GraphQLHttpObservableSubscription observableSubscription, CancellationToken cancelToken)
{
await observableSubscription.ConnectAsync(cancelToken).ConfigureAwait(false);
await observableSubscription.SendInitialMessageAsync(cancelToken).ConfigureAwait(false);
return Observable.Defer(() => observableSubscription.ReceiveResultAsync().ToObservable()).Repeat();
}

#endregion

[Serializable]
public class GQLSubscriptionException : Exception
{
//
// For guidelines regarding the creation of new exception types, see
// http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp
// and
// http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp
//

public GQLSubscriptionException()
{
}

public GQLSubscriptionException(object error) : base(error.ToString())
{
}

protected GQLSubscriptionException(
SerializationInfo info,
StreamingContext context) : base(info, context)
{
}
}

public static class GQLWebSocketMessageType
{

/// <summary>
/// Client sends this message after plain websocket connection to start the communication with the server
/// The server will response only with GQL_CONNECTION_ACK + GQL_CONNECTION_KEEP_ALIVE(if used) or GQL_CONNECTION_ERROR
/// to this message.
/// payload: Object : optional parameters that the client specifies in connectionParams
/// </summary>
public const string GQL_CONNECTION_INIT = "connection_init";

/// <summary>
/// The server may responses with this message to the GQL_CONNECTION_INIT from client, indicates the server accepted
/// the connection.
/// </summary>
public const string GQL_CONNECTION_ACK = "connection_ack"; // Server -> Client

/// <summary>
/// The server may responses with this message to the GQL_CONNECTION_INIT from client, indicates the server rejected
/// the connection.
/// It server also respond with this message in case of a parsing errors of the message (which does not disconnect the
/// client, just ignore the message).
/// payload: Object: the server side error
/// </summary>
public const string GQL_CONNECTION_ERROR = "connection_error"; // Server -> Client

/// <summary>
/// Server message that should be sent right after each GQL_CONNECTION_ACK processed and then periodically to keep the
/// client connection alive.
/// The client starts to consider the keep alive message only upon the first received keep alive message from the
/// server.
/// <remarks>
/// NOTE: This one here don't follow the standard due to connection optimization
/// </remarks>
/// </summary>
public const string GQL_CONNECTION_KEEP_ALIVE = "ka"; // Server -> Client

/// <summary>
/// Client sends this message in order to stop a running GraphQL operation execution (for example: unsubscribe)
/// id: string : operation id
/// </summary>
public const string GQL_CONNECTION_TERMINATE = "connection_terminate"; // Client -> Server

/// <summary>
/// Client sends this message to execute GraphQL operation
/// id: string : The id of the GraphQL operation to start
/// payload: Object:
/// query: string : GraphQL operation as string or parsed GraphQL document node
/// variables?: Object : Object with GraphQL variables
/// operationName?: string : GraphQL operation name
/// </summary>
public const string GQL_START = "start";

/// <summary>
/// The server sends this message to transfer the GraphQL execution result from the server to the client, this message
/// is a response for GQL_START message.
/// For each GraphQL operation send with GQL_START, the server will respond with at least one GQL_DATA message.
/// id: string : ID of the operation that was successfully set up
/// payload: Object :
/// data: any: Execution result
/// errors?: Error[] : Array of resolvers errors
/// </summary>
public const string GQL_DATA = "data"; // Server -> Client

/// <summary>
/// Server sends this message upon a failing operation, before the GraphQL execution, usually due to GraphQL validation
/// errors (resolver errors are part of GQL_DATA message, and will be added as errors array)
/// payload: Error : payload with the error attributed to the operation failing on the server
/// id: string : operation ID of the operation that failed on the server
/// </summary>
public const string GQL_ERROR = "error"; // Server -> Client

/// <summary>
/// Server sends this message to indicate that a GraphQL operation is done, and no more data will arrive for the
/// specific operation.
/// id: string : operation ID of the operation that completed
/// </summary>
public const string GQL_COMPLETE = "complete"; // Server -> Client

/// <summary>
/// Client sends this message in order to stop a running GraphQL operation execution (for example: unsubscribe)
/// id: string : operation id
/// </summary>
public const string GQL_STOP = "stop"; // Client -> Server
}
}
}
Loading