From a5b84b38e9c72f65f420d3dd18243cb7088663e4 Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Mon, 12 Sep 2022 09:01:25 +0200 Subject: [PATCH] Added GraphQL-SSE support (#5390) --- .../Instrumentation/HttpRequestKind.cs | 5 + .../DefaultHttpResponseFormatter.cs | 11 +- .../WebSocketSubscriptionMiddleware.cs | 2 +- .../GraphQLOverHttpSpecTests.cs | 107 +++++++++++++ .../Pipeline/OperationExecutionMiddleware.cs | 2 +- .../Serialization/EventStreamFormatter.cs | 145 ++++++++++++++++++ .../MultiPartResponseStreamFormatter.cs | 42 +++-- 7 files changed, 297 insertions(+), 17 deletions(-) create mode 100644 src/HotChocolate/Core/src/Execution/Serialization/EventStreamFormatter.cs diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/Instrumentation/HttpRequestKind.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/Instrumentation/HttpRequestKind.cs index 1919fb58e50..b3b6094f45f 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/Instrumentation/HttpRequestKind.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/Instrumentation/HttpRequestKind.cs @@ -25,4 +25,9 @@ public enum HttpRequestKind /// HTTP POST GraphQL MultiPart Request. /// HttpMultiPart, + + /// + /// HTTP POST GraphQL-SSE + /// + HttpSse } diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/Serialization/DefaultHttpResponseFormatter.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/Serialization/DefaultHttpResponseFormatter.cs index 71626e21035..b06f98fc099 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/Serialization/DefaultHttpResponseFormatter.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/Serialization/DefaultHttpResponseFormatter.cs @@ -19,9 +19,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter { private readonly JsonQueryResultFormatter _jsonFormatter; private readonly MultiPartResponseStreamFormatter _multiPartFormatter; - - // TODO : implement this one! - private readonly IExecutionResultFormatter _eventStreamFormatter = default!; + private readonly EventStreamFormatter _eventStreamFormatter; /// /// Creates a new instance of . @@ -42,6 +40,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter { _jsonFormatter = new JsonQueryResultFormatter(indented, encoder); _multiPartFormatter = new MultiPartResponseStreamFormatter(_jsonFormatter); + _eventStreamFormatter = new EventStreamFormatter(indented, encoder); } public GraphQLRequestFlags CreateRequestFlags( @@ -320,7 +319,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter } if (resultKind is ResultKind.Stream or ResultKind.Single && - mediaType.Kind is MultiPartMixed or AllMultiPart) + mediaType.Kind is MultiPartMixed or AllMultiPart or All) { formatInfo = new FormatInfo( ContentType.MultiPartMixed, @@ -374,7 +373,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter } if (resultKind is ResultKind.Stream or ResultKind.Single && - mediaType.Kind is MultiPartMixed or AllMultiPart) + mediaType.Kind is MultiPartMixed or AllMultiPart or All) { // if the result is a stream we consider this a perfect match and // will use this format. @@ -400,7 +399,7 @@ public class DefaultHttpResponseFormatter : IHttpResponseFormatter } } - if (mediaType.Kind is EventStream) + if (mediaType.Kind is EventStream or All) { // if the result is a subscription we consider this a perfect match and // will use this format. diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/WebSocketSubscriptionMiddleware.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/WebSocketSubscriptionMiddleware.cs index 6fa208583f2..d9cf268a27e 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/WebSocketSubscriptionMiddleware.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/WebSocketSubscriptionMiddleware.cs @@ -6,7 +6,7 @@ namespace HotChocolate.AspNetCore; -public class WebSocketSubscriptionMiddleware : MiddlewareBase +public sealed class WebSocketSubscriptionMiddleware : MiddlewareBase { private readonly IServerDiagnosticEvents _diagnosticEvents; diff --git a/src/HotChocolate/AspNetCore/test/AspNetCore.Tests/GraphQLOverHttpSpecTests.cs b/src/HotChocolate/AspNetCore/test/AspNetCore.Tests/GraphQLOverHttpSpecTests.cs index 4e74302bdcb..8f08c4c5d8b 100644 --- a/src/HotChocolate/AspNetCore/test/AspNetCore.Tests/GraphQLOverHttpSpecTests.cs +++ b/src/HotChocolate/AspNetCore/test/AspNetCore.Tests/GraphQLOverHttpSpecTests.cs @@ -149,6 +149,55 @@ public async Task Legacy_Query_No_Streams_3() "}}]}"); } + /// + /// This request does not specify a accept header. + /// expected response content-type: multipart/mixed + /// expected status code: 200 + /// + [Fact] + public async Task Legacy_With_Stream_1() + { + // arrange + var server = CreateStarWarsServer(); + var client = server.CreateClient(); + + // act + using var request = new HttpRequestMessage(HttpMethod.Post, _url) + { + Content = JsonContent.Create( + new ClientQueryRequest + { + Query = "{ ... @defer { __typename } }" + }) + }; + + using var response = await client.SendAsync(request); + + // assert + // expected response content-type: multipart/mixed + // expected status code: 200 + Snapshot + .Create() + .Add(response) + .MatchInline( + @"Headers: + Content-Type: multipart/mixed; boundary=""-""; charset=utf-8 + --------------------------> + Status Code: OK + --------------------------> + + --- + Content-Type: application/json; charset=utf-8 + + {""data"":{},""hasNext"":true} + --- + Content-Type: application/json; charset=utf-8 + + {""path"":[],""data"":{""__typename"":""Query""},""hasNext"":false} + ----- + "); + } + /// /// This request specifies the application/graphql-response+json accept header. /// expected response content-type: application/graphql-response+json @@ -679,5 +728,63 @@ public async Task New_Query_With_Streams_2() --------------------------> {""errors"":[{""message"":""The specified operation kind is not allowed.""}]}"); } + + /// + /// This request specifies the application/graphql-response+json and + /// the multipart/mixed content type as accept header value. + /// expected response content-type: multipart/mixed + /// expected status code: 200 + /// + [Fact] + public async Task New_Query_With_Streams_3() + { + // arrange + var server = CreateStarWarsServer(); + var client = server.CreateClient(); + + // act + using var request = new HttpRequestMessage(HttpMethod.Post, _url) + { + Content = JsonContent.Create( + new ClientQueryRequest + { + Query = "{ ... @defer { __typename } }" + }), + Headers = + { + { "Accept", new[] + { + ContentType.EventStream + } + } + } + }; + + using var response = await client.SendAsync(request, ResponseHeadersRead); + + // assert + // expected response content-type: multipart/mixed + // expected status code: 200 + Snapshot + .Create() + .Add(response) + .MatchInline( + @"Headers: + Content-Type: multipart/mixed; boundary=""-""; charset=utf-8 + --------------------------> + Status Code: OK + --------------------------> + + --- + Content-Type: application/json; charset=utf-8 + + {""data"":{},""hasNext"":true} + --- + Content-Type: application/json; charset=utf-8 + + {""path"":[],""data"":{""__typename"":""Query""},""hasNext"":false} + ----- + "); + } } #endif diff --git a/src/HotChocolate/Core/src/Execution/Pipeline/OperationExecutionMiddleware.cs b/src/HotChocolate/Core/src/Execution/Pipeline/OperationExecutionMiddleware.cs index 9298127c9fd..6d19c7d916f 100644 --- a/src/HotChocolate/Core/src/Execution/Pipeline/OperationExecutionMiddleware.cs +++ b/src/HotChocolate/Core/src/Execution/Pipeline/OperationExecutionMiddleware.cs @@ -183,7 +183,7 @@ await ExecuteOperationAsync(context, batchDispatcher, context.Operation) context.Schema.MutationType!, ref _cachedMutation); - private bool IsOperationAllowed(IOperation operation, IQueryRequest request) + private static bool IsOperationAllowed(IOperation operation, IQueryRequest request) { if (request.Flags is AllowAll) { diff --git a/src/HotChocolate/Core/src/Execution/Serialization/EventStreamFormatter.cs b/src/HotChocolate/Core/src/Execution/Serialization/EventStreamFormatter.cs new file mode 100644 index 00000000000..441f3c1c226 --- /dev/null +++ b/src/HotChocolate/Core/src/Execution/Serialization/EventStreamFormatter.cs @@ -0,0 +1,145 @@ +using System; +using System.IO; +using System.Text.Encodings.Web; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using static HotChocolate.Execution.ExecutionResultKind; + +namespace HotChocolate.Execution.Serialization; + +/// +/// The default GraphQL-SSE formatter for . +/// https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md +/// +public sealed class EventStreamFormatter : IExecutionResultFormatter +{ + private static ReadOnlySpan EventField + => new[] { (byte)'e', (byte)'v', (byte)'e', (byte)'n', (byte)'t' }; + private static ReadOnlySpan DataField + => new[] { (byte)'d', (byte)'a', (byte)'t', (byte)'a' }; + private static ReadOnlySpan NextEvent + => new[] { (byte)'n', (byte)'e', (byte)'x', (byte)'t' }; + private static ReadOnlySpan CompleteEvent + => new[] + { + (byte)'c', (byte)'o', (byte)'m', (byte)'p', + (byte)'l', (byte)'e', (byte)'t', (byte)'e' + }; + private static readonly byte[] _newLine = new byte[] { (byte)'\n' }; + + private readonly JsonQueryResultFormatter _payloadFormatter; + private readonly JsonWriterOptions _options; + + /// + /// Creates a new instance of . + /// + /// + /// Defines whether the underlying + /// should pretty print the JSON which includes: + /// indenting nested JSON tokens, adding new lines, and adding + /// white space between property names and values. + /// By default, the JSON is written without any extra white space. + /// + /// + /// Gets or sets the encoder to use when escaping strings, or null to use the default encoder. + /// + public EventStreamFormatter( + bool indented = false, + JavaScriptEncoder? encoder = null) + { + _options = new JsonWriterOptions { Indented = indented, Encoder = encoder }; + _payloadFormatter = new JsonQueryResultFormatter(indented, encoder); + } + + /// + public ValueTask FormatAsync( + IExecutionResult result, + Stream outputStream, + CancellationToken cancellationToken = default) + { + if (result is null) + { + throw new ArgumentNullException(nameof(result)); + } + + if (outputStream is null) + { + throw new ArgumentNullException(nameof(outputStream)); + } + + return FormatInternalAsync(result, outputStream, cancellationToken); + } + + private async ValueTask FormatInternalAsync( + IExecutionResult result, + Stream outputStream, + CancellationToken ct) + { + if (result.Kind is SingleResult) + { + await WriteNextMessageAsync((IQueryResult)result, outputStream).ConfigureAwait(false); + await WriteCompleteMessage(outputStream).ConfigureAwait(false); + } + else if (result.Kind is DeferredResult or BatchResult or SubscriptionResult) + { + var responseStream = (IResponseStream)result; + + await foreach (var queryResult in responseStream.ReadResultsAsync() + .WithCancellation(ct).ConfigureAwait(false)) + { + try + { + await WriteNextMessageAsync(queryResult, outputStream) + .ConfigureAwait(false); + } + finally + { + await queryResult.DisposeAsync().ConfigureAwait(false); + } + + await WriteNewLineAndFlushAsync(outputStream, ct); + } + + await WriteCompleteMessage(outputStream).ConfigureAwait(false); + await WriteNewLineAndFlushAsync(outputStream, ct); + } + else + { + throw new NotSupportedException(); + } + } + + private async ValueTask WriteNextMessageAsync(IQueryResult result, Stream outputStream) + { + await using var writer = new Utf8JsonWriter(outputStream, _options); + + writer.WriteStartObject(); + + writer.WriteString(EventField, NextEvent); + + writer.WritePropertyName(DataField); + _payloadFormatter.Format(result, writer); + + writer.WriteEndObject(); + } + + private async ValueTask WriteCompleteMessage(Stream outputStream) + { + await using var writer = new Utf8JsonWriter(outputStream, _options); + + writer.WriteStartObject(); + + writer.WriteString(EventField, CompleteEvent); + + writer.WriteEndObject(); + } + + private static async ValueTask WriteNewLineAndFlushAsync( + Stream outputStream, + CancellationToken ct) + { + await outputStream.WriteAsync(_newLine, ct).ConfigureAwait(false); + await outputStream.FlushAsync(ct); + } +} diff --git a/src/HotChocolate/Core/src/Execution/Serialization/MultiPartResponseStreamFormatter.cs b/src/HotChocolate/Core/src/Execution/Serialization/MultiPartResponseStreamFormatter.cs index b709778dce4..7766bd24ce9 100644 --- a/src/HotChocolate/Core/src/Execution/Serialization/MultiPartResponseStreamFormatter.cs +++ b/src/HotChocolate/Core/src/Execution/Serialization/MultiPartResponseStreamFormatter.cs @@ -125,23 +125,39 @@ public sealed partial class MultiPartResponseStreamFormatter : IExecutionResultF Stream outputStream, CancellationToken ct = default) { - await foreach (var result in - responseStream.ReadResultsAsync().WithCancellation(ct).ConfigureAwait(false)) + // first we create the iterator. + await using var enumerator = responseStream.ReadResultsAsync().GetAsyncEnumerator(ct); + + // next we write a leading CRLF + await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false); + + while (await enumerator.MoveNextAsync().ConfigureAwait(false)) { try { + // Before each part of the multi-part response, a boundary (---, CRLF) + // is sent. await WriteNextAsync(outputStream, ct).ConfigureAwait(false); - await WriteResultAsync(result, outputStream, ct).ConfigureAwait(false); + + // Now we can write the header and body of the part. + await WriteResultAsync(enumerator.Current, outputStream, ct).ConfigureAwait(false); + + // after each result we write a CRLF signaling the next or final part. + await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false); + + // we flush to make sure that the result is written to the network stream. await outputStream.FlushAsync(ct).ConfigureAwait(false); } finally { // The result objects use pooled memory so we need to ensure that they // return the memory by disposing them. - await result.DisposeAsync().ConfigureAwait(false); + await enumerator.Current.DisposeAsync().ConfigureAwait(false); } } + // After the final payload, the terminating boundary of + // ----- followed by CRLF is sent. await WriteEndAsync(outputStream, ct).ConfigureAwait(false); await outputStream.FlushAsync(ct).ConfigureAwait(false); } @@ -151,17 +167,30 @@ public sealed partial class MultiPartResponseStreamFormatter : IExecutionResultF Stream outputStream, CancellationToken ct = default) { + // first we write a leading CRLF + await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false); + + // Before each part of the multi-part response, a boundary (---, CRLF) + // is sent. await WriteNextAsync(outputStream, ct).ConfigureAwait(false); try { + // Now we can write the header and body of the part. await WriteResultAsync(queryResult, outputStream, ct).ConfigureAwait(false); + + // after each result we write a CRLF signaling the next or final part. + await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false); } finally { + // The result objects use pooled memory so we need to ensure that they + // return the memory by disposing them. await queryResult.DisposeAsync().ConfigureAwait(false); } + // After the final payload, the terminating boundary of + // ----- followed by CRLF is sent. await WriteEndAsync(outputStream, ct).ConfigureAwait(false); await outputStream.FlushAsync(ct).ConfigureAwait(false); } @@ -200,8 +229,6 @@ public sealed partial class MultiPartResponseStreamFormatter : IExecutionResultF Stream outputStream, CancellationToken ct) { - // Before each part of the multi-part response, a boundary (CRLF, ---, CRLF) is sent. - await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false); await outputStream.WriteAsync(Start, 0, Start.Length, ct).ConfigureAwait(false); await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false); } @@ -210,9 +237,6 @@ public sealed partial class MultiPartResponseStreamFormatter : IExecutionResultF Stream outputStream, CancellationToken ct) { - // After the final payload, the terminating boundary of CRLF followed by - // ----- followed by CRLF is sent. - await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false); await outputStream.WriteAsync(End, 0, End.Length, ct).ConfigureAwait(false); await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false); }