-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
TaskExecution.cs
245 lines (205 loc) · 8.44 KB
/
TaskExecution.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
using Polly.Hedging.Utils;
using Polly.Telemetry;
namespace Polly.Hedging.Controller;
#pragma warning disable CA1031 // Do not catch general exception types
/// <summary>
/// Represents a single hedging attempt execution alongside all the necessary resources. These are:
///
/// <list type="bullet">
/// <item>
/// Distinct <see cref="ResilienceContext"/> instance for this execution.
/// One exception are primary task where the main context is reused.
/// </item>
/// <item>
/// The cancellation token associated with the execution.
/// </item>
/// </list>
/// </summary>
internal sealed class TaskExecution<T>
{
private readonly ResilienceContext _cachedContext = ResilienceContextPool.Shared.Get();
private readonly CancellationTokenSourcePool _cancellationTokenSourcePool;
private readonly TimeProvider _timeProvider;
private readonly ResilienceStrategyTelemetry _telemetry;
private readonly HedgingHandler<T> _handler;
private CancellationTokenSource? _cancellationSource;
private CancellationTokenRegistration? _cancellationRegistration;
private ResilienceContext? _activeContext;
private long _startExecutionTimestamp;
private long _stopExecutionTimestamp;
public TaskExecution(HedgingHandler<T> handler, CancellationTokenSourcePool cancellationTokenSourcePool, TimeProvider timeProvider, ResilienceStrategyTelemetry telemetry)
{
_handler = handler;
_cancellationTokenSourcePool = cancellationTokenSourcePool;
_timeProvider = timeProvider;
_telemetry = telemetry;
}
/// <summary>
/// Gets the task that represents the execution of the hedged task.
/// </summary>
/// <remarks>
/// This property is not-null once the <see cref="TaskExecution{T}"/> is initialized.
/// Awaiting this task will never throw as all exceptions are caught and stored
/// into <see cref="Outcome"/> property.
/// </remarks>
public Task? ExecutionTaskSafe { get; private set; }
public Outcome<T> Outcome { get; private set; }
public bool IsHandled { get; private set; }
public bool IsAccepted { get; private set; }
public ResilienceContext Context => _activeContext ?? throw new InvalidOperationException("TaskExecution is not initialized.");
public HedgedTaskType Type { get; set; }
public Action<TaskExecution<T>>? OnReset { get; set; }
public TimeSpan ExecutionTime => _timeProvider.GetElapsedTime(_startExecutionTimestamp, _stopExecutionTimestamp);
public int AttemptNumber { get; private set; }
public void AcceptOutcome()
{
if (ExecutionTaskSafe?.IsCompleted == true)
{
IsAccepted = true;
}
else
{
throw new InvalidOperationException("Unable to accept outcome for a task that is not completed.");
}
}
public void Cancel()
{
if (!IsAccepted)
{
_cancellationSource!.Cancel();
}
}
public async ValueTask<bool> InitializeAsync<TState>(
HedgedTaskType type,
ResilienceContext primaryContext,
Func<ResilienceContext, TState, ValueTask<Outcome<T>>> primaryCallback,
TState state,
int attemptNumber)
{
AttemptNumber = attemptNumber;
Type = type;
_cancellationSource = _cancellationTokenSourcePool.Get(System.Threading.Timeout.InfiniteTimeSpan);
_startExecutionTimestamp = _timeProvider.GetTimestamp();
_activeContext = _cachedContext;
_activeContext.InitializeFrom(primaryContext, _cancellationSource!.Token);
if (primaryContext.CancellationToken.CanBeCanceled)
{
_cancellationRegistration = primaryContext.CancellationToken.Register(o => ((CancellationTokenSource)o!).Cancel(), _cancellationSource);
}
if (type == HedgedTaskType.Secondary)
{
Func<ValueTask<Outcome<T>>>? action = null;
try
{
action = _handler.GenerateAction(CreateArguments(primaryCallback, primaryContext, state, attemptNumber));
if (action == null)
{
await ResetAsync().ConfigureAwait(false);
return false;
}
}
catch (Exception e)
{
_stopExecutionTimestamp = _timeProvider.GetTimestamp();
ExecutionTaskSafe = ExecuteCreateActionException(e);
return true;
}
await HandleOnHedgingAsync(primaryContext, attemptNumber - 1).ConfigureAwait(Context.ContinueOnCapturedContext);
ExecutionTaskSafe = ExecuteSecondaryActionAsync(action);
}
else
{
ExecutionTaskSafe = ExecutePrimaryActionAsync(primaryCallback, state);
}
return true;
}
private async Task HandleOnHedgingAsync(ResilienceContext primaryContext, int attemptNumber)
{
var args = new OnHedgingArguments<T>(
primaryContext,
Context,
attemptNumber);
_telemetry.Report(new(ResilienceEventSeverity.Warning, HedgingConstants.OnHedgingEventName), Context, args);
if (_handler.OnHedging is { } onHedging)
{
await onHedging(args).ConfigureAwait(Context.ContinueOnCapturedContext);
}
}
private HedgingActionGeneratorArguments<TResult> CreateArguments<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> primaryCallback,
ResilienceContext primaryContext,
TState state,
int attempt) => new(primaryContext, Context, attempt, (context) => primaryCallback(context, state));
public async ValueTask ResetAsync()
{
OnReset?.Invoke(this);
if (_cancellationRegistration is { } registration)
{
#if NETCOREAPP
await registration.DisposeAsync().ConfigureAwait(false);
#else
registration.Dispose();
#endif
}
_cancellationRegistration = null;
if (!IsAccepted)
{
await DisposeHelper.TryDisposeSafeAsync(Outcome.Result!, Context.IsSynchronous).ConfigureAwait(false);
// not accepted executions are always cancelled, so the cancellation source must be
// disposed instead of returning it to the pool
_cancellationSource!.Dispose();
}
else
{
// accepted outcome means that the cancellation source can be be returned to the pool
// since it was most likely not cancelled
_cancellationTokenSourcePool.Return(_cancellationSource!);
}
IsAccepted = false;
Outcome = default;
IsHandled = false;
OnReset = null;
AttemptNumber = 0;
_activeContext = null;
_cachedContext.Reset();
_cancellationSource = null!;
_startExecutionTimestamp = 0;
_stopExecutionTimestamp = 0;
}
private async Task ExecuteSecondaryActionAsync(Func<ValueTask<Outcome<T>>> action)
{
Outcome<T> outcome;
try
{
outcome = await action().ConfigureAwait(Context.ContinueOnCapturedContext);
}
catch (Exception e)
{
outcome = Polly.Outcome.FromException<T>(e);
}
_stopExecutionTimestamp = _timeProvider.GetTimestamp();
await UpdateOutcomeAsync(outcome).ConfigureAwait(Context.ContinueOnCapturedContext);
}
private async Task ExecuteCreateActionException(Exception e) => await UpdateOutcomeAsync(Polly.Outcome.FromException<T>(e)).ConfigureAwait(Context.ContinueOnCapturedContext);
private async Task ExecutePrimaryActionAsync<TState>(Func<ResilienceContext, TState, ValueTask<Outcome<T>>> primaryCallback, TState state)
{
Outcome<T> outcome;
try
{
outcome = await primaryCallback(Context, state).ConfigureAwait(Context.ContinueOnCapturedContext);
}
catch (Exception e)
{
outcome = Polly.Outcome.FromException<T>(e);
}
_stopExecutionTimestamp = _timeProvider.GetTimestamp();
await UpdateOutcomeAsync(outcome).ConfigureAwait(Context.ContinueOnCapturedContext);
}
private async Task UpdateOutcomeAsync(Outcome<T> outcome)
{
var args = new HedgingPredicateArguments<T>(Context, outcome);
Outcome = outcome;
IsHandled = await _handler.ShouldHandle(args).ConfigureAwait(Context.ContinueOnCapturedContext);
TelemetryUtil.ReportExecutionAttempt(_telemetry, Context, outcome, AttemptNumber, ExecutionTime, IsHandled);
}
}