Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
546b304
add StringEnumConverter to JsonSerializerSettings
rose-a Nov 29, 2018
a8cff74
fix dependencies
rose-a Nov 29, 2018
3b51211
Merge branch 'serialize-enums-to-string-by-default' into subscription…
rose-a Nov 29, 2018
e3a2154
change version
rose-a Nov 29, 2018
eca5d24
fix version
rose-a Nov 29, 2018
2911345
fix CloseAsync
rose-a Dec 3, 2018
af78196
lock dispose
rose-a Dec 3, 2018
43cb3dd
implement external exception handling for subscription streams
rose-a Dec 4, 2018
f1deb7f
fix doc comment
rose-a Dec 4, 2018
1a28d69
add back-off delay for automatic reconnects
rose-a Dec 4, 2018
7b19ab5
create GraphQLHttpWebSocket class
rose-a Dec 4, 2018
ad2c31f
get single socket working
rose-a Dec 4, 2018
ae491f2
fix subscription test, cleanup code
rose-a Dec 5, 2018
98190e9
more code cleanup
rose-a Dec 5, 2018
9aec685
add test CanConnectMultipleSubscriptionsSimultaneously
rose-a Dec 5, 2018
24be6a5
dynamically find free ports for integration test
rose-a Dec 5, 2018
4189799
add WebSocketExceptionHandler to clientOptions
rose-a Dec 5, 2018
492c440
move reconnect-logic into subscriptions
rose-a Dec 7, 2018
293c351
move external exception handler declaration to CreateSubscriptionStre…
rose-a Dec 7, 2018
e6c469a
fix exception on disposing disconnected subscription
rose-a Dec 7, 2018
7bcf0e4
fix tests, add console app for testing reconnect after server restart
rose-a Dec 10, 2018
b6e84bf
Update GraphQL.Client.Tests.csproj
Dec 11, 2018
cadd54f
Nullable check (#86)
Dec 11, 2018
3b761e2
Update README.md
Dec 11, 2018
a4c7e60
Merge branch 'subscriptions-api' of github.com:graphql-dotnet/graphql…
rose-a Dec 18, 2018
16a9bd8
create "queue" for requests so that only one send operation is perfor…
rose-a Dec 18, 2018
c37fe83
fix websockets on win 7
rose-a Dec 18, 2018
0562eb8
remove websockets4net dependency
rose-a Dec 18, 2018
881f36d
Update root.props
Jan 7, 2019
6da6ea2
Update config.yml
Jan 7, 2019
a555224
IsTestProject
Jan 7, 2019
b6a26c7
Benchmar
Jan 7, 2019
7f8c05c
benchmark
Jan 10, 2019
ea4a0e8
Update GraphQLLocation.cs
Jan 10, 2019
53c9f81
new Spec
Jan 10, 2019
1add607
mrege from upstream/master
rose-a Jan 22, 2019
bd9bf5e
fix suff for dotnet core 3.0
rose-a Jan 22, 2019
72ba90d
check response for 'null' in where clause
rose-a Jan 22, 2019
f8e1865
add obsolete parameterless constructor to GraphQLRequest
rose-a Jan 22, 2019
632a45d
add debug output in websocket receive
rose-a Jan 22, 2019
f8021af
reuse observables for identical requests
rose-a Jan 22, 2019
95d3319
store changes to consoleapp
rose-a Jan 23, 2019
abd9caf
fix StackOverflowException in GraphQLRequest.GetHashCode()
rose-a Jan 23, 2019
6ab3205
mrege fix-gethashcode
rose-a Jan 23, 2019
93fae18
fix receiving long messages via websocket
rose-a Jan 24, 2019
34db7a4
add obsolete constructor
rose-a Jan 24, 2019
ca4216d
make ReceiveResultStream observable Hot
rose-a Jan 25, 2019
acb36a1
fix multiple subscriptions by using subject in GraphQLHttpWebSocket
rose-a Jan 25, 2019
e70f7a4
fix overlapping subscriptions
rose-a Jan 25, 2019
2fbd83d
fix subscription test to work arround https://github.com/graphql-dotn…
rose-a Jan 28, 2019
e1ba20a
add an observable which publishes all websocket exceptions
rose-a Jan 28, 2019
7984248
fix OnNext calls of exceptionSubject
rose-a Jan 28, 2019
ef37543
fix GraphQLHttpClient(Uri endPoint, GraphQLHttpClientOptions options)…
rose-a Jan 28, 2019
309a8e3
test requests via websockets
rose-a Jan 28, 2019
b9c1443
bump pre-release version
rose-a Jan 28, 2019
535e1e0
Merge branch 'subscriptions-api' of github.com:graphql-dotnet/graphql…
rose-a Feb 1, 2019
30bd0b8
Merge branch 'subscriptions-api' of github.com:graphql-dotnet/graphql…
rose-a Feb 14, 2019
81bf039
use JsonSerializerSettings from options in websocket requests
rose-a Mar 6, 2019
b0ab9b9
fix query errors via websocket
rose-a Mar 6, 2019
234923d
fix sequence termination and error handling in websocket streams
rose-a Mar 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion root.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/graphql-dotnet/graphql-client.git</RepositoryUrl>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
<Version>2.0.0-alpha.4.subscription-api.8</Version>
<Version>2.0.0-alpha.4.subscription-api.10</Version>
<WarningLevel>4</WarningLevel>
</PropertyGroup>

Expand Down
79 changes: 38 additions & 41 deletions src/GraphQL.Client/Http/GraphQLHttpSubscriptionHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Net.WebSockets;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using GraphQL.Common;
Expand Down Expand Up @@ -35,31 +36,39 @@ internal static IObservable<GraphQLResponse> CreateSubscriptionStream(
Id = startRequest.Id,
Type = GQLWebSocketMessageType.GQL_STOP
};
var observable = graphQlHttpWebSocket.ResponseStream
.Where(response => {
return response != null && response.Id == startRequest.Id;
})
.SelectMany(response =>
{
switch (response.Type)

var observable = Observable.Create<GraphQLResponse>(o =>
graphQlHttpWebSocket.ResponseStream.Subscribe(response =>
{
case GQLWebSocketMessageType.GQL_COMPLETE:
// ignore null values and messages for other requests
if (response == null || response.Id != startRequest.Id) return;

// terminate the sequence when a 'complete' message is received
if (response.Type == GQLWebSocketMessageType.GQL_COMPLETE)
{
Debug.WriteLine($"received 'complete' message on subscription {startRequest.Id}");
return Observable.Empty<GraphQLResponse>();
case GQLWebSocketMessageType.GQL_ERROR:
Debug.WriteLine($"received 'error' message on subscription {startRequest.Id}");
return Observable.Throw<GraphQLResponse>(
new GraphQLSubscriptionException(response.Payload));
default:
Debug.WriteLine($"received payload on subscription {startRequest.Id}");
return Observable.Return(((JObject) response?.Payload)
?.ToObject<GraphQLResponse>());
}
});
o.OnCompleted();
return;
}

// post the GraphQLResponse to the stream (even if a GraphQL error occurred)
Debug.WriteLine($"received payload on subscription {startRequest.Id}");
o.OnNext(((JObject)response.Payload)?.ToObject<GraphQLResponse>());

// in case of a GraphQL error, terminate the sequence after the response has been posted
if (response.Type == GQLWebSocketMessageType.GQL_ERROR)
{
Debug.WriteLine($"terminating subscription {startRequest.Id} because of a GraphQL error");
o.OnCompleted();
}
},
o.OnError,
o.OnCompleted)
);

try
{
// intialize websocket (completes immediately if socket is already open)
// initialize websocket (completes immediately if socket is already open)
await graphQlHttpWebSocket.InitializeWebSocket().ConfigureAwait(false);
}
catch (Exception e)
Expand Down Expand Up @@ -148,12 +157,12 @@ internal static IObservable<GraphQLResponse> CreateSubscriptionStream(
.Publish().RefCount();
}

internal static async Task<GraphQLResponse> Request(
internal static Task<GraphQLResponse> Request(
this GraphQLHttpWebSocket graphQlHttpWebSocket,
GraphQLRequest request,
CancellationToken cancellationToken = default)
{
return await Observable.Create<GraphQLResponse>(async observer =>
return Observable.Create<GraphQLResponse>(async observer =>
{
var websocketRequest = new GraphQLWebSocketRequest
{
Expand All @@ -162,25 +171,12 @@ internal static async Task<GraphQLResponse> Request(
Payload = request
};
var observable = graphQlHttpWebSocket.ResponseStream
.Where(response => {
return response != null && response.Id == websocketRequest.Id;
})
.SelectMany(response =>
.Where(response => response != null && response.Id == websocketRequest.Id)
.TakeUntil(response => response.Type == GQLWebSocketMessageType.GQL_COMPLETE)
.Select(response =>
{
switch (response.Type)
{
case GQLWebSocketMessageType.GQL_COMPLETE:
Debug.WriteLine($"received 'complete' message on request {websocketRequest.Id}");
return Observable.Empty<GraphQLResponse>();
case GQLWebSocketMessageType.GQL_ERROR:
Debug.WriteLine($"received 'error' message on request {websocketRequest.Id}");
return Observable.Throw<GraphQLResponse>(
new GraphQLSubscriptionException(response.Payload));
default:
Debug.WriteLine($"received response for request {websocketRequest.Id}");
return Observable.Return(((JObject)response?.Payload)
?.ToObject<GraphQLResponse>());
}
Debug.WriteLine($"received response for request {websocketRequest.Id}");
return ((JObject) response?.Payload)?.ToObject<GraphQLResponse>();
});

try
Expand Down Expand Up @@ -215,7 +211,8 @@ internal static async Task<GraphQLResponse> Request(
// complete sequence on OperationCanceledException, this is triggered by the cancellation token
.Catch<GraphQLResponse, OperationCanceledException>(exception =>
Observable.Empty<GraphQLResponse>())
.FirstOrDefaultAsync();
.FirstOrDefaultAsync()
.ToTask(cancellationToken);
}
}
}
4 changes: 2 additions & 2 deletions src/GraphQL.Client/Http/GraphQLHttpWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private async Task _sendWebSocketRequest(GraphQLWebSocketRequest request)
}

await InitializeWebSocket().ConfigureAwait(false);
var webSocketRequestString = JsonConvert.SerializeObject(request);
var webSocketRequestString = JsonConvert.SerializeObject(request, _options.JsonSerializerSettings);
await this.clientWebSocket.SendAsync(
new ArraySegment<byte>(Encoding.UTF8.GetBytes(webSocketRequestString)),
WebSocketMessageType.Text,
Expand Down Expand Up @@ -245,7 +245,7 @@ private async Task<GraphQLWebSocketResponse> _receiveResultAsync()
{
var stringResult = await reader.ReadToEndAsync();
Debug.WriteLine($"data received on websocket {clientWebSocket.GetHashCode()}: {stringResult}");
return JsonConvert.DeserializeObject<GraphQLWebSocketResponse>(stringResult);
return JsonConvert.DeserializeObject<GraphQLWebSocketResponse>(stringResult, _options.JsonSerializerSettings);
}
}
else
Expand Down
89 changes: 88 additions & 1 deletion tests/GraphQL.Integration.Tests/SubscriptionsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ public SubscriptionsTest()
{
}

private GraphQLHttpClient GetGraphQLClient(int port)
private GraphQLHttpClient GetGraphQLClient(int port, bool requestsViaWebsocket = false)
=> new GraphQLHttpClient(new GraphQLHttpClientOptions
{
EndPoint = new Uri($"http://localhost:{port}/graphql"),
UseWebSocketForQueriesAndMutations = requestsViaWebsocket
});


Expand All @@ -65,6 +66,34 @@ public async void AssertTestingHarness()
}
}

[Fact]
public async void CanSendRequestViaWebsocket()
{
var port = NetworkHelpers.GetFreeTcpPortNumber();
using (CreateServer(port))
{
var client = GetGraphQLClient(port, true);
const string message = "some random testing message";
var response = await client.AddMessageAsync(message).ConfigureAwait(false);

Assert.Equal(message, (string)response.Data.addMessage.content);
}
}

[Fact]
public async void CanHandleRequestErrorViaWebsocket()
{
var port = NetworkHelpers.GetFreeTcpPortNumber();
using (CreateServer(port))
{
var client = GetGraphQLClient(port, true);
const string message = "some random testing message";
var response = await client.SendQueryAsync(new GraphQLRequest("this query is formatted quite badly")).ConfigureAwait(false);

Assert.Single(response.Errors);
}
}

private const string SubscriptionQuery = @"
subscription {
messageAdded{
Expand Down Expand Up @@ -272,5 +301,63 @@ public async void CanHandleConnectionTimeout()
tester.ShouldHaveCompleted(TimeSpan.FromSeconds(5));
server.Dispose();
}

[Fact]
public async void CanHandleSubscriptionError()
{
var port = NetworkHelpers.GetFreeTcpPortNumber();
using (CreateServer(port))
{
var client = GetGraphQLClient(port);
Debug.WriteLine("creating subscription stream");
IObservable<GraphQLResponse> observable = client.CreateSubscriptionStream(
new GraphQLRequest(@"
subscription {
failImmediately {
content
}
}")
);

Debug.WriteLine("subscribing...");
var tester = observable.SubscribeTester();
tester.ShouldHaveReceivedUpdate(gqlResponse =>
{
Assert.Single(gqlResponse.Errors);
});
tester.ShouldHaveCompleted();

client.Dispose();
}
}

[Fact]
public async void CanHandleQueryErrorInSubscription()
{
var port = NetworkHelpers.GetFreeTcpPortNumber();
using (CreateServer(port))
{
var client = GetGraphQLClient(port);
Debug.WriteLine("creating subscription stream");
IObservable<GraphQLResponse> observable = client.CreateSubscriptionStream(
new GraphQLRequest(@"
subscription {
fieldDoesNotExist {
content
}
}")
);

Debug.WriteLine("subscribing...");
var tester = observable.SubscribeTester();
tester.ShouldHaveReceivedUpdate(gqlResponse =>
{
Assert.Single(gqlResponse.Errors);
});
tester.ShouldHaveCompleted();

client.Dispose();
}
}
}
}
9 changes: 9 additions & 0 deletions tests/IntegrationTestServer/ChatSchema/ChatSubscriptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public ChatSubscriptions(IChat chat)
Resolver = new FuncFieldResolver<MessageFrom>(context => context.Source as MessageFrom),
Subscriber = new EventStreamResolver<MessageFrom>(context => _chat.UserJoined())
});


AddField(new EventStreamFieldType
{
Name = "failImmediately",
Type = typeof(MessageType),
Resolver = new FuncFieldResolver<Message>(ResolveMessage),
Subscriber = new EventStreamResolver<Message>(context => throw new NotSupportedException("this is supposed to fail"))
});
}

private IObservable<Message> SubscribeById(ResolveEventStreamContext context)
Expand Down