-
-
Notifications
You must be signed in to change notification settings - Fork 723
/
OperationExecutor.Observable.cs
142 lines (125 loc) · 5.12 KB
/
OperationExecutor.Observable.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
using System;
using System.Threading;
using System.Threading.Tasks;
namespace StrawberryShake;
public partial class OperationExecutor<TData, TResult>
{
private sealed class OperationExecutorObservable : IObservable<IOperationResult<TResult>>
{
private readonly IConnection<TData> _connection;
private readonly IOperationStore _operationStore;
private readonly Func<IOperationResultBuilder<TData, TResult>> _resultBuilder;
private readonly Func<IResultPatcher<TData>> _resultPatcher;
private readonly OperationRequest _request;
private readonly ExecutionStrategy _strategy;
public OperationExecutorObservable(
IConnection<TData> connection,
IOperationStore operationStore,
Func<IOperationResultBuilder<TData, TResult>> resultBuilder,
Func<IResultPatcher<TData>> resultPatcher,
OperationRequest request,
ExecutionStrategy strategy)
{
_connection = connection;
_operationStore = operationStore;
_resultBuilder = resultBuilder;
_resultPatcher = resultPatcher;
_request = request;
_strategy = strategy;
}
public IDisposable Subscribe(IObserver<IOperationResult<TResult>> observer)
{
if (_strategy is ExecutionStrategy.NetworkOnly ||
_request.Document.Kind is OperationKind.Subscription)
{
var observerSession = new ObserverSession();
BeginExecute(observer, observerSession);
return observerSession;
}
var hasResultInStore = false;
if ((_strategy == ExecutionStrategy.CacheFirst ||
_strategy == ExecutionStrategy.CacheAndNetwork) &&
_operationStore.TryGet(_request, out IOperationResult<TResult>? result))
{
hasResultInStore = true;
observer.OnNext(result);
}
IDisposable session = _operationStore.Watch<TResult>(_request).Subscribe(observer);
if (_strategy is not ExecutionStrategy.CacheFirst || !hasResultInStore)
{
var observerSession = new ObserverSession();
observerSession.SetStoreSession(session);
BeginExecute(observer, observerSession);
return observerSession;
}
return session;
}
private void BeginExecute(
IObserver<IOperationResult<TResult>> observer,
ObserverSession session) =>
Task.Run(() => ExecuteAsync(observer, session));
private async Task ExecuteAsync(
IObserver<IOperationResult<TResult>> observer,
ObserverSession session)
{
try
{
CancellationToken token = session.RequestSession.Token;
IOperationResultBuilder<TData, TResult> resultBuilder = _resultBuilder();
IResultPatcher<TData> resultPatcher = _resultPatcher();
await foreach (Response<TData>? response in
_connection.ExecuteAsync(_request)
.WithCancellation(token)
.ConfigureAwait(false))
{
if (token.IsCancellationRequested)
{
return;
}
IOperationResult<TResult>? result;
if (response.IsPatch)
{
Response<TData> patched = resultPatcher.PatchResponse(response);
result = resultBuilder.Build(patched);
}
else
{
resultPatcher.SetResponse(response);
result = resultBuilder.Build(response);
_operationStore.Set(_request, result);
}
if (!session.HasStoreSession)
{
observer.OnNext(result);
IDisposable storeSession =
_operationStore
.Watch<TResult>(_request)
.Subscribe(observer);
try
{
session.SetStoreSession(storeSession);
}
catch (ObjectDisposedException)
{
storeSession.Dispose();
throw;
}
}
}
}
catch (Exception ex)
{
observer.OnError(ex);
}
finally
{
// call observer's OnCompleted method to notify observer
// there is no further data is available.
observer.OnCompleted();
// after all the transport logic is finished we will dispose
// the request session.
session.RequestSession.Dispose();
}
}
}
}