Skip to content

Commit

Permalink
Added GraphQL-SSE support (#5390)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib committed Sep 12, 2022
1 parent 44a7ced commit a5b84b3
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 17 deletions.
Expand Up @@ -25,4 +25,9 @@ public enum HttpRequestKind
/// HTTP POST GraphQL MultiPart Request.
/// </summary>
HttpMultiPart,

/// <summary>
/// HTTP POST GraphQL-SSE
/// </summary>
HttpSse
}
Expand Up @@ -19,9 +19,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter
{
private readonly JsonQueryResultFormatter _jsonFormatter;
private readonly MultiPartResponseStreamFormatter _multiPartFormatter;

// TODO : implement this one!
private readonly IExecutionResultFormatter _eventStreamFormatter = default!;
private readonly EventStreamFormatter _eventStreamFormatter;

/// <summary>
/// Creates a new instance of <see cref="DefaultHttpResponseFormatter" />.
Expand All @@ -42,6 +40,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter
{
_jsonFormatter = new JsonQueryResultFormatter(indented, encoder);
_multiPartFormatter = new MultiPartResponseStreamFormatter(_jsonFormatter);
_eventStreamFormatter = new EventStreamFormatter(indented, encoder);
}

public GraphQLRequestFlags CreateRequestFlags(
Expand Down Expand Up @@ -320,7 +319,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter
}

if (resultKind is ResultKind.Stream or ResultKind.Single &&
mediaType.Kind is MultiPartMixed or AllMultiPart)
mediaType.Kind is MultiPartMixed or AllMultiPart or All)
{
formatInfo = new FormatInfo(
ContentType.MultiPartMixed,
Expand Down Expand Up @@ -374,7 +373,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter
}

if (resultKind is ResultKind.Stream or ResultKind.Single &&
mediaType.Kind is MultiPartMixed or AllMultiPart)
mediaType.Kind is MultiPartMixed or AllMultiPart or All)
{
// if the result is a stream we consider this a perfect match and
// will use this format.
Expand All @@ -400,7 +399,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter
}
}

if (mediaType.Kind is EventStream)
if (mediaType.Kind is EventStream or All)
{
// if the result is a subscription we consider this a perfect match and
// will use this format.
Expand Down
Expand Up @@ -6,7 +6,7 @@

namespace HotChocolate.AspNetCore;

public class WebSocketSubscriptionMiddleware : MiddlewareBase
public sealed class WebSocketSubscriptionMiddleware : MiddlewareBase
{
private readonly IServerDiagnosticEvents _diagnosticEvents;

Expand Down
Expand Up @@ -149,6 +149,55 @@ public async Task Legacy_Query_No_Streams_3()
"}}]}");
}

/// <summary>
/// This request does not specify a accept header.
/// expected response content-type: multipart/mixed
/// expected status code: 200
/// </summary>
[Fact]
public async Task Legacy_With_Stream_1()
{
// arrange
var server = CreateStarWarsServer();
var client = server.CreateClient();

// act
using var request = new HttpRequestMessage(HttpMethod.Post, _url)
{
Content = JsonContent.Create(
new ClientQueryRequest
{
Query = "{ ... @defer { __typename } }"
})
};

using var response = await client.SendAsync(request);

// assert
// expected response content-type: multipart/mixed
// expected status code: 200
Snapshot
.Create()
.Add(response)
.MatchInline(
@"Headers:
Content-Type: multipart/mixed; boundary=""-""; charset=utf-8
-------------------------->
Status Code: OK
-------------------------->
---
Content-Type: application/json; charset=utf-8
{""data"":{},""hasNext"":true}
---
Content-Type: application/json; charset=utf-8
{""path"":[],""data"":{""__typename"":""Query""},""hasNext"":false}
-----
");
}

/// <summary>
/// This request specifies the application/graphql-response+json accept header.
/// expected response content-type: application/graphql-response+json
Expand Down Expand Up @@ -679,5 +728,63 @@ public async Task New_Query_With_Streams_2()
-------------------------->
{""errors"":[{""message"":""The specified operation kind is not allowed.""}]}");
}

/// <summary>
/// This request specifies the application/graphql-response+json and
/// the multipart/mixed content type as accept header value.
/// expected response content-type: multipart/mixed
/// expected status code: 200
/// </summary>
[Fact]
public async Task New_Query_With_Streams_3()
{
// arrange
var server = CreateStarWarsServer();
var client = server.CreateClient();

// act
using var request = new HttpRequestMessage(HttpMethod.Post, _url)
{
Content = JsonContent.Create(
new ClientQueryRequest
{
Query = "{ ... @defer { __typename } }"
}),
Headers =
{
{ "Accept", new[]
{
ContentType.EventStream
}
}
}
};

using var response = await client.SendAsync(request, ResponseHeadersRead);

// assert
// expected response content-type: multipart/mixed
// expected status code: 200
Snapshot
.Create()
.Add(response)
.MatchInline(
@"Headers:
Content-Type: multipart/mixed; boundary=""-""; charset=utf-8
-------------------------->
Status Code: OK
-------------------------->
---
Content-Type: application/json; charset=utf-8
{""data"":{},""hasNext"":true}
---
Content-Type: application/json; charset=utf-8
{""path"":[],""data"":{""__typename"":""Query""},""hasNext"":false}
-----
");
}
}
#endif
Expand Up @@ -183,7 +183,7 @@ await ExecuteOperationAsync(context, batchDispatcher, context.Operation)
context.Schema.MutationType!,
ref _cachedMutation);

private bool IsOperationAllowed(IOperation operation, IQueryRequest request)
private static bool IsOperationAllowed(IOperation operation, IQueryRequest request)
{
if (request.Flags is AllowAll)
{
Expand Down
@@ -0,0 +1,145 @@
using System;
using System.IO;
using System.Text.Encodings.Web;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using static HotChocolate.Execution.ExecutionResultKind;

namespace HotChocolate.Execution.Serialization;

/// <summary>
/// The default GraphQL-SSE formatter for <see cref="IExecutionResult"/>.
/// https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md
/// </summary>
public sealed class EventStreamFormatter : IExecutionResultFormatter
{
private static ReadOnlySpan<byte> EventField
=> new[] { (byte)'e', (byte)'v', (byte)'e', (byte)'n', (byte)'t' };
private static ReadOnlySpan<byte> DataField
=> new[] { (byte)'d', (byte)'a', (byte)'t', (byte)'a' };
private static ReadOnlySpan<byte> NextEvent
=> new[] { (byte)'n', (byte)'e', (byte)'x', (byte)'t' };
private static ReadOnlySpan<byte> CompleteEvent
=> new[]
{
(byte)'c', (byte)'o', (byte)'m', (byte)'p',
(byte)'l', (byte)'e', (byte)'t', (byte)'e'
};
private static readonly byte[] _newLine = new byte[] { (byte)'\n' };

private readonly JsonQueryResultFormatter _payloadFormatter;
private readonly JsonWriterOptions _options;

/// <summary>
/// Creates a new instance of <see cref="EventStreamFormatter" />.
/// </summary>
/// <param name="indented">
/// Defines whether the underlying <see cref="Utf8JsonWriter"/>
/// should pretty print the JSON which includes:
/// indenting nested JSON tokens, adding new lines, and adding
/// white space between property names and values.
/// By default, the JSON is written without any extra white space.
/// </param>
/// <param name="encoder">
/// Gets or sets the encoder to use when escaping strings, or null to use the default encoder.
/// </param>
public EventStreamFormatter(
bool indented = false,
JavaScriptEncoder? encoder = null)
{
_options = new JsonWriterOptions { Indented = indented, Encoder = encoder };
_payloadFormatter = new JsonQueryResultFormatter(indented, encoder);
}

/// <inheritdoc cref="IExecutionResultFormatter.FormatAsync(IExecutionResult, Stream, CancellationToken)" />
public ValueTask FormatAsync(
IExecutionResult result,
Stream outputStream,
CancellationToken cancellationToken = default)
{
if (result is null)
{
throw new ArgumentNullException(nameof(result));
}

if (outputStream is null)
{
throw new ArgumentNullException(nameof(outputStream));
}

return FormatInternalAsync(result, outputStream, cancellationToken);
}

private async ValueTask FormatInternalAsync(
IExecutionResult result,
Stream outputStream,
CancellationToken ct)
{
if (result.Kind is SingleResult)
{
await WriteNextMessageAsync((IQueryResult)result, outputStream).ConfigureAwait(false);
await WriteCompleteMessage(outputStream).ConfigureAwait(false);
}
else if (result.Kind is DeferredResult or BatchResult or SubscriptionResult)
{
var responseStream = (IResponseStream)result;

await foreach (var queryResult in responseStream.ReadResultsAsync()
.WithCancellation(ct).ConfigureAwait(false))
{
try
{
await WriteNextMessageAsync(queryResult, outputStream)
.ConfigureAwait(false);
}
finally
{
await queryResult.DisposeAsync().ConfigureAwait(false);
}

await WriteNewLineAndFlushAsync(outputStream, ct);
}

await WriteCompleteMessage(outputStream).ConfigureAwait(false);
await WriteNewLineAndFlushAsync(outputStream, ct);
}
else
{
throw new NotSupportedException();
}
}

private async ValueTask WriteNextMessageAsync(IQueryResult result, Stream outputStream)
{
await using var writer = new Utf8JsonWriter(outputStream, _options);

writer.WriteStartObject();

writer.WriteString(EventField, NextEvent);

writer.WritePropertyName(DataField);
_payloadFormatter.Format(result, writer);

writer.WriteEndObject();
}

private async ValueTask WriteCompleteMessage(Stream outputStream)
{
await using var writer = new Utf8JsonWriter(outputStream, _options);

writer.WriteStartObject();

writer.WriteString(EventField, CompleteEvent);

writer.WriteEndObject();
}

private static async ValueTask WriteNewLineAndFlushAsync(
Stream outputStream,
CancellationToken ct)
{
await outputStream.WriteAsync(_newLine, ct).ConfigureAwait(false);
await outputStream.FlushAsync(ct);
}
}

0 comments on commit a5b84b3

Please sign in to comment.