-
-
Notifications
You must be signed in to change notification settings - Fork 725
/
RemoteRequestExecutor.cs
154 lines (134 loc) · 5.33 KB
/
RemoteRequestExecutor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GreenDonut;
using HotChocolate.Execution;
namespace HotChocolate.Stitching.Requests
{
internal sealed class RemoteRequestExecutor
: IRemoteRequestExecutor
, IDisposable
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly List<BufferedRequest> _bufferedRequests = new List<BufferedRequest>();
private readonly IBatchScheduler _batchScheduler;
private readonly IRequestExecutor _executor;
private bool _taskRegistered;
public RemoteRequestExecutor(
IBatchScheduler batchScheduler,
IRequestExecutor executor)
{
_batchScheduler = batchScheduler ??
throw new ArgumentNullException(nameof(batchScheduler));
_executor = executor ??
throw new ArgumentNullException(nameof(executor));
}
/// <inheritdoc />
public ISchema Schema => _executor.Schema;
/// <inheritdoc />
public IServiceProvider Services => _executor.Services;
/// <inheritdoc />
public Task<IExecutionResult> ExecuteAsync(
IQueryRequest request,
CancellationToken cancellationToken = default)
{
var bufferRequest = BufferedRequest.Create(request, Schema);
_semaphore.Wait(cancellationToken);
try
{
_bufferedRequests.Add(bufferRequest);
if (!_taskRegistered)
{
_batchScheduler.Schedule(() => ExecuteRequestsInternal(cancellationToken));
_taskRegistered = true;
}
}
finally
{
_semaphore.Release();
}
return bufferRequest.Promise.Task;
}
private async ValueTask ExecuteRequestsInternal(CancellationToken cancellationToken)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_bufferedRequests.Count == 1)
{
await ExecuteSingleRequestAsync(cancellationToken).ConfigureAwait(false);
}
if (_bufferedRequests.Count > 1)
{
await ExecuteBufferedRequestBatchAsync(cancellationToken).ConfigureAwait(false);
}
// reset the states so that we are ready for new requests to be buffered.
_taskRegistered = false;
_bufferedRequests.Clear();
}
finally
{
_semaphore.Release();
}
}
private async ValueTask ExecuteSingleRequestAsync(
CancellationToken cancellationToken)
{
BufferedRequest request = _bufferedRequests[0];
IExecutionResult result = await _executor
.ExecuteAsync(request.Request, cancellationToken)
.ConfigureAwait(false);
if (result is IQueryResult queryResult)
{
request.Promise.SetResult(queryResult);
}
else
{
// since we only support query/mutation at this point we will just fail
// in the event that something else was returned.
request.Promise.SetException(new NotSupportedException(
"Only IQueryResult is supported when batching."));
}
}
private async ValueTask ExecuteBufferedRequestBatchAsync(
CancellationToken cancellationToken)
{
// first we take all buffered requests and merge them into a single request.
// we however have to group requests by operation type. This means we should
// end up with one or two requests (query and mutation).
foreach ((IQueryRequest Merged, IEnumerable<BufferedRequest> Requests) batch in
MergeRequestHelper.MergeRequests(_bufferedRequests))
{
// now we take this merged request and run it against the executor.
IExecutionResult result = await _executor
.ExecuteAsync(batch.Merged, cancellationToken)
.ConfigureAwait(false);
if (result is IQueryResult queryResult)
{
// last we will extract the results for the original buffered requests
// and fulfil the promises.
MergeRequestHelper.DispatchResults(queryResult, batch.Requests);
}
else
{
// since we only support query/mutation at this point we will just fail
// in the event that something else was returned.
foreach (BufferedRequest request in batch.Requests)
{
request.Promise.SetException(new NotSupportedException(
"Only IQueryResult is supported when batching."));
}
}
}
}
public void Dispose()
{
_semaphore.Dispose();
if (_executor is IDisposable d)
{
d.Dispose();
}
}
}
}