-
-
Notifications
You must be signed in to change notification settings - Fork 724
/
MultiPartResponseStreamFormatter.cs
155 lines (137 loc) · 5.73 KB
/
MultiPartResponseStreamFormatter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
using System;
using System.IO;
using System.Text.Encodings.Web;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using HotChocolate.Utilities;
namespace HotChocolate.Execution.Serialization;
// https://github.com/graphql/graphql-over-http/blob/master/rfcs/IncrementalDelivery.md
public sealed partial class MultiPartResponseStreamFormatter : IResponseStreamFormatter
{
private readonly IQueryResultFormatter _payloadFormatter;
/// <summary>
/// Creates a new instance of <see cref="MultiPartResponseStreamFormatter" />.
/// </summary>
/// <param name="indented">
/// Defines whether the underlying <see cref="Utf8JsonWriter"/>
/// should pretty print the JSON which includes:
/// indenting nested JSON tokens, adding new lines, and adding
/// white space between property names and values.
/// By default, the JSON is written without any extra white space.
/// </param>
/// <param name="encoder">
/// Gets or sets the encoder to use when escaping strings, or null to use the default encoder.
/// </param>
public MultiPartResponseStreamFormatter(
bool indented = false,
JavaScriptEncoder? encoder = null)
{
_payloadFormatter = new JsonQueryResultFormatter(indented, encoder);
}
/// <summary>
/// Creates a new instance of <see cref="MultiPartResponseStreamFormatter" />.
/// </summary>
/// <param name="queryResultFormatter">
/// The serializer that shall be used to serialize query results.
/// </param>
/// <exception cref="ArgumentNullException">
/// <paramref name="queryResultFormatter"/> is <c>null</c>.
/// </exception>
public MultiPartResponseStreamFormatter(
IQueryResultFormatter queryResultFormatter)
{
_payloadFormatter = queryResultFormatter ??
throw new ArgumentNullException(nameof(queryResultFormatter));
}
public Task FormatAsync(
IResponseStream responseStream,
Stream outputStream,
CancellationToken cancellationToken = default)
{
if (responseStream is null)
{
throw new ArgumentNullException(nameof(responseStream));
}
if (outputStream is null)
{
throw new ArgumentNullException(nameof(outputStream));
}
return WriteResponseStreamAsync(responseStream, outputStream, cancellationToken);
}
private async Task WriteResponseStreamAsync(
IResponseStream responseStream,
Stream outputStream,
CancellationToken ct = default)
{
await WriteNextAsync(outputStream, ct).ConfigureAwait(false);
await foreach (IQueryResult result in
responseStream.ReadResultsAsync().WithCancellation(ct).ConfigureAwait(false))
{
try
{
await WriteResultAsync(result, outputStream, ct).ConfigureAwait(false);
if (result.HasNext ?? false)
{
await WriteNextAsync(outputStream, ct).ConfigureAwait(false);
await outputStream.FlushAsync(ct).ConfigureAwait(false);
}
else
{
// we will exit the foreach even if there are more items left
// since we were signaled that there are no more items
break;
}
}
finally
{
await result.DisposeAsync().ConfigureAwait(false);
}
}
await WriteEndAsync(outputStream, ct).ConfigureAwait(false);
await outputStream.FlushAsync(ct).ConfigureAwait(false);
}
private async Task WriteResultAsync(
IQueryResult result,
Stream outputStream,
CancellationToken ct)
{
using var writer = new ArrayWriter();
_payloadFormatter.Format(result, writer);
await WriteResultHeaderAsync(outputStream, ct).ConfigureAwait(false);
// The payload is sent, followed by a CRLF.
var buffer = writer.GetInternalBuffer();
await outputStream.WriteAsync(buffer, 0, writer.Length, ct).ConfigureAwait(false);
await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false);
}
private static async Task WriteResultHeaderAsync(
Stream outputStream,
CancellationToken ct)
{
// Each part of the multipart response must contain a Content-Type header.
// Similar to the GraphQL specification this specification does not require
// a specific serialization format. For consistency and ease of notation,
// examples of the response are given in JSON throughout the spec.
await outputStream.WriteAsync(ContentType, 0, ContentType.Length, ct).ConfigureAwait(false);
await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false);
// After all headers, an additional CRLF is sent.
await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false);
}
private static async Task WriteNextAsync(
Stream outputStream,
CancellationToken ct)
{
// Each part of the multipart response must start with --- and a CRLF
await outputStream.WriteAsync(Start, 0, Start.Length, ct).ConfigureAwait(false);
await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false);
}
private static async Task WriteEndAsync(
Stream outputStream,
CancellationToken ct)
{
// After the last part of the multipart response is sent, the terminating
// boundary ----- is sent, followed by a CRLF
await outputStream.WriteAsync(End, 0, End.Length, ct).ConfigureAwait(false);
await outputStream.WriteAsync(CrLf, 0, CrLf.Length, ct).ConfigureAwait(false);
}
}