-
Notifications
You must be signed in to change notification settings - Fork 263
/
DurabilityProvider.cs
564 lines (489 loc) · 25 KB
/
DurabilityProvider.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Host.Scale;
#endif
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
/// <summary>
/// The backend storage provider that provides the actual durability of Durable Functions.
/// This is functionally a superset of <see cref="IOrchestrationService"/> and <see cref="IOrchestrationServiceClient"/>.
/// If the storage provider does not any of the Durable Functions specific operations, they can use this class
/// directly with the expectation that only those interfaces will be implemented. All of the Durable Functions specific
/// methods/operations are virtual and can be overwritten by creating a subclass.
/// </summary>
public class DurabilityProvider :
IOrchestrationService,
IOrchestrationServiceClient,
IOrchestrationServiceQueryClient,
IOrchestrationServicePurgeClient
{
internal const string NoConnectionDetails = "default";
private static readonly JObject EmptyConfig = new JObject();
private readonly string name;
private readonly IOrchestrationService innerService;
private readonly IOrchestrationServiceClient innerServiceClient;
private readonly string connectionName;
/// <summary>
/// Creates the default <see cref="DurabilityProvider"/>.
/// </summary>
/// <param name="storageProviderName">The name of the storage backend providing the durability.</param>
/// <param name="service">The internal <see cref="IOrchestrationService"/> that provides functionality
/// for this classes implementions of <see cref="IOrchestrationService"/>.</param>
/// <param name="serviceClient">The internal <see cref="IOrchestrationServiceClient"/> that provides functionality
/// for this classes implementions of <see cref="IOrchestrationServiceClient"/>.</param>
/// <param name="connectionName">The name of the app setting that stores connection details for the storage provider.</param>
public DurabilityProvider(string storageProviderName, IOrchestrationService service, IOrchestrationServiceClient serviceClient, string connectionName)
{
this.name = storageProviderName ?? throw new ArgumentNullException(nameof(storageProviderName));
this.innerService = service ?? throw new ArgumentNullException(nameof(service));
this.innerServiceClient = serviceClient ?? throw new ArgumentNullException(nameof(serviceClient));
this.connectionName = connectionName ?? throw new ArgumentNullException(connectionName);
}
/// <summary>
/// The name of the environment variable that contains connection details for how to connect to storage providers.
/// Corresponds to the <see cref="DurableClientAttribute.ConnectionName"/> for binding data.
/// </summary>
public virtual string ConnectionName => this.connectionName;
/// <summary>
/// Specifies whether the durability provider supports Durable Entities.
/// </summary>
public virtual bool SupportsEntities => false;
/// <summary>
/// Specifies whether the backend's WaitForOrchestration is implemented without polling.
/// </summary>
public virtual bool SupportsPollFreeWait => false;
/// <summary>
/// Specifies whether this backend delivers messages in order.
/// </summary>
public virtual bool GuaranteesOrderedDelivery => false;
/// <summary>
/// Specifies whether this backend supports implicit deletion of entities.
/// </summary>
public virtual bool SupportsImplicitEntityDeletion => false;
/// <summary>
/// JSON representation of configuration to emit in telemetry.
/// </summary>
public virtual JObject ConfigurationJson => EmptyConfig;
/// <summary>
/// Value of maximum durable timer delay. Used for long running durable timers.
/// </summary>
public virtual TimeSpan MaximumDelayTime { get; set; }
/// <summary>
/// Interval time used for long running timers.
/// </summary>
public virtual TimeSpan LongRunningTimerIntervalLength { get; set; }
/// <summary>
/// Event source name (e.g. DurableTask-AzureStorage).
/// </summary>
public virtual string EventSourceName { get; set; }
/// <inheritdoc/>
public int TaskOrchestrationDispatcherCount => this.GetOrchestrationService().TaskOrchestrationDispatcherCount;
/// <inheritdoc/>
public int MaxConcurrentTaskOrchestrationWorkItems => this.GetOrchestrationService().MaxConcurrentTaskOrchestrationWorkItems;
/// <inheritdoc/>
public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => this.GetOrchestrationService().EventBehaviourForContinueAsNew;
/// <inheritdoc/>
public int TaskActivityDispatcherCount => this.GetOrchestrationService().TaskOrchestrationDispatcherCount;
/// <inheritdoc/>
public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems;
internal string GetBackendInfo()
{
return this.GetOrchestrationService().ToString();
}
private IOrchestrationService GetOrchestrationService()
{
if (this.innerService == null)
{
throw new NotSupportedException($"This storage provider was not provided an {nameof(IOrchestrationService)} instance, so it cannot call methods on that interface.");
}
return this.innerService;
}
private IOrchestrationServiceClient GetOrchestrationServiceClient()
{
if (this.innerServiceClient == null)
{
throw new NotSupportedException($"This storage provider was not provided an {nameof(IOrchestrationServiceClient)} instance, so it cannot call methods on that interface.");
}
return this.innerServiceClient;
}
/// <inheritdoc/>
public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
{
return this.GetOrchestrationService().AbandonTaskActivityWorkItemAsync(workItem);
}
/// <inheritdoc/>
public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
return this.GetOrchestrationService().AbandonTaskOrchestrationWorkItemAsync(workItem);
}
/// <inheritdoc/>
public Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
{
return this.GetOrchestrationService().CompleteTaskActivityWorkItemAsync(workItem, responseMessage);
}
/// <inheritdoc/>
public Task CompleteTaskOrchestrationWorkItemAsync(
TaskOrchestrationWorkItem workItem,
OrchestrationRuntimeState newOrchestrationRuntimeState,
IList<TaskMessage> outboundMessages,
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
OrchestrationState orchestrationState)
{
return this.GetOrchestrationService().CompleteTaskOrchestrationWorkItemAsync(
workItem,
newOrchestrationRuntimeState,
outboundMessages,
orchestratorMessages,
timerMessages,
continuedAsNewMessage,
orchestrationState);
}
/// <inheritdoc/>
public Task CreateAsync()
{
return this.GetOrchestrationService().CreateAsync();
}
/// <inheritdoc/>
public Task CreateAsync(bool recreateInstanceStore)
{
return this.GetOrchestrationService().CreateAsync(recreateInstanceStore);
}
/// <inheritdoc/>
public Task CreateIfNotExistsAsync()
{
return this.GetOrchestrationService().CreateIfNotExistsAsync();
}
/// <inheritdoc/>
public Task DeleteAsync()
{
return this.GetOrchestrationService().DeleteAsync();
}
/// <inheritdoc/>
public Task DeleteAsync(bool deleteInstanceStore)
{
return this.GetOrchestrationService().DeleteAsync(deleteInstanceStore);
}
/// <inheritdoc/>
public int GetDelayInSecondsAfterOnFetchException(Exception exception)
{
return this.GetOrchestrationService().GetDelayInSecondsAfterOnFetchException(exception);
}
/// <inheritdoc/>
public int GetDelayInSecondsAfterOnProcessException(Exception exception)
{
return this.GetOrchestrationService().GetDelayInSecondsAfterOnProcessException(exception);
}
/// <inheritdoc/>
public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
{
return this.GetOrchestrationService().IsMaxMessageCountExceeded(currentMessageCount, runtimeState);
}
/// <inheritdoc/>
public Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
return this.GetOrchestrationService().LockNextTaskActivityWorkItem(receiveTimeout, cancellationToken);
}
/// <inheritdoc/>
public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
return this.GetOrchestrationService().LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
/// <inheritdoc/>
public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
return this.GetOrchestrationService().ReleaseTaskOrchestrationWorkItemAsync(workItem);
}
/// <inheritdoc/>
public Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
{
return this.GetOrchestrationService().RenewTaskActivityWorkItemLockAsync(workItem);
}
/// <inheritdoc/>
public Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
{
return this.GetOrchestrationService().RenewTaskOrchestrationWorkItemLockAsync(workItem);
}
/// <inheritdoc/>
public Task StartAsync()
{
return this.GetOrchestrationService().StartAsync();
}
/// <inheritdoc/>
public Task StopAsync()
{
return this.GetOrchestrationService().StopAsync();
}
/// <inheritdoc/>
public Task StopAsync(bool isForced)
{
return this.GetOrchestrationService().StopAsync(isForced);
}
private NotImplementedException GetNotImplementedException(string methodName)
{
return new NotImplementedException($"The method {methodName} is not supported by the {this.name} storage provider.");
}
/// <summary>
/// Gets the status of all orchestration instances.
/// </summary>
/// <param name="cancellationToken">A token to cancel the request.</param>
/// <returns>Returns a task which completes when the status has been fetched.</returns>
public virtual Task<IList<OrchestrationState>> GetAllOrchestrationStates(CancellationToken cancellationToken)
{
throw this.GetNotImplementedException(nameof(this.GetAllOrchestrationStates));
}
/// <summary>
/// Gets the status of all orchestration instances within the specified parameters.
/// </summary>
/// <param name="createdTimeFrom">Return orchestration instances which were created after this DateTime.</param>
/// <param name="createdTimeTo">Return orchestration instances which were created before this DateTime.</param>
/// <param name="runtimeStatus">Return orchestration instances which matches the runtimeStatus.</param>
/// <param name="cancellationToken">A token to cancel the request.</param>
/// <returns>Returns a task which completes when the status has been fetched.</returns>
[Obsolete]
public virtual Task<IList<OrchestrationState>> GetAllOrchestrationStatesWithFilters(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationRuntimeStatus> runtimeStatus, CancellationToken cancellationToken)
{
throw this.GetNotImplementedException(nameof(this.GetAllOrchestrationStatesWithFilters));
}
/// <summary>
/// Gets the state of the specified orchestration instance.
/// </summary>
/// <param name="instanceId">The ID of the orchestration instance to query.</param>
/// <param name="showInput">If set, fetch and return the input for the orchestration instance.</param>
/// <returns>Returns a task which completes when the state has been fetched.</returns>
public virtual Task<IList<OrchestrationState>> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithInputsAsync));
}
/// <summary>
/// Gets paginated result of all orchestration instances that match query status parameters.
/// </summary>
/// <param name="condition">The filtering conditions of the query.</param>
/// <param name="cancellationToken">A token to cancel the request.</param>
/// <returns>Paginated result of orchestration state.</returns>
public virtual Task<OrchestrationStatusQueryResult> GetOrchestrationStateWithPagination(OrchestrationStatusQueryCondition condition, CancellationToken cancellationToken)
{
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithPagination));
}
/// <summary>
/// Purges history that meet the required parameters.
/// </summary>
/// <param name="createdTimeFrom">Purge the history of orchestration instances which were created after this DateTime.</param>
/// <param name="createdTimeTo">Purge the history of orchestration instances which were created before this DateTime.</param>
/// <param name="runtimeStatus">Purge the history of orchestration instances which matches the runtimeStatus.</param>
/// <returns>The number of instances purged.</returns>
public virtual Task<int> PurgeHistoryByFilters(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus)
{
throw this.GetNotImplementedException(nameof(this.PurgeHistoryByFilters));
}
/// <summary>
/// Purges the instance history for the provided instance id.
/// </summary>
/// <param name="instanceId">The instance id for the instance history to purge.</param>
/// <returns>The number of instances purged.</returns>
public virtual Task<PurgeHistoryResult> PurgeInstanceHistoryByInstanceId(string instanceId)
{
throw this.GetNotImplementedException(nameof(this.PurgeInstanceHistoryByInstanceId));
}
/// <summary>
/// Retrieves the state for a serialized entity.
/// </summary>
/// <param name="entityId">Entity id to fetch state for.</param>
/// <param name="serializierSettings">JsonSerializerSettings for custom deserialization.</param>
/// <returns>State for the entity.</returns>
public virtual Task<string> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializierSettings)
{
throw this.GetNotImplementedException(nameof(this.RetrieveSerializedEntityState));
}
/// <summary>
/// Rewinds the specified failed orchestration instance with a reason.
/// </summary>
/// <param name="instanceId">The ID of the orchestration instance to rewind.</param>
/// <param name="reason">The reason for rewinding the orchestration instance.</param>
/// <returns>A task that completes when the rewind message is enqueued.</returns>
public virtual Task RewindAsync(string instanceId, string reason)
{
throw this.GetNotImplementedException(nameof(this.RewindAsync));
}
/// <summary>
/// Makes the current app the primary app, if it isn't already. Must be using the AppLease feature.
/// </summary>
/// <returns>A task that completes when the operation has started.</returns>
public virtual Task MakeCurrentAppPrimaryAsync()
{
throw this.GetNotImplementedException(nameof(this.MakeCurrentAppPrimaryAsync));
}
/// <inheritdoc />
public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
{
return this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage);
}
/// <inheritdoc />
public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
{
return this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses);
}
/// <inheritdoc />
public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
{
return this.GetOrchestrationServiceClient().SendTaskOrchestrationMessageAsync(message);
}
/// <inheritdoc />
public Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
{
return this.GetOrchestrationServiceClient().SendTaskOrchestrationMessageBatchAsync(messages);
}
/// <inheritdoc />
public Task<OrchestrationState> WaitForOrchestrationAsync(string instanceId, string executionId, TimeSpan timeout, CancellationToken cancellationToken)
{
return this.GetOrchestrationServiceClient().WaitForOrchestrationAsync(instanceId, executionId, timeout, cancellationToken);
}
/// <inheritdoc />
public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
{
return this.GetOrchestrationServiceClient().ForceTerminateTaskOrchestrationAsync(instanceId, reason);
}
/// <summary>
/// Suspend the specified orchestration instance with a reason.
/// </summary>
/// <param name="instanceId">Instance to suspend.</param>
/// <param name="reason">Reason for suspending the instance.</param>
/// <returns>A task that completes when the suspend message is enqueued.</returns>
public Task SuspendTaskOrchestrationAsync(string instanceId, string reason)
{
var taskMessage = new TaskMessage
{
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
Event = new ExecutionSuspendedEvent(-1, reason),
};
return this.GetOrchestrationServiceClient().SendTaskOrchestrationMessageAsync(taskMessage);
}
/// <summary>
/// Resume the specified orchestration instance with a reason.
/// </summary>
/// <param name="instanceId">Instance to resume.</param>
/// <param name="reason">Reason for resuming the instance.</param>
/// <returns>A task that completes when the resume message is enqueued.</returns>
public Task ResumeTaskOrchestrationAsync(string instanceId, string reason)
{
var taskMessage = new TaskMessage
{
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
Event = new ExecutionResumedEvent(-1, reason),
};
return this.GetOrchestrationServiceClient().SendTaskOrchestrationMessageAsync(taskMessage);
}
/// <inheritdoc />
public Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
{
return this.GetOrchestrationServiceClient().GetOrchestrationStateAsync(instanceId, allExecutions);
}
/// <inheritdoc />
public Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId)
{
return this.GetOrchestrationServiceClient().GetOrchestrationStateAsync(instanceId, executionId);
}
/// <inheritdoc />
public Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId)
{
return this.GetOrchestrationServiceClient().GetOrchestrationHistoryAsync(instanceId, executionId);
}
/// <inheritdoc />
public Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
return this.GetOrchestrationServiceClient().PurgeOrchestrationHistoryAsync(thresholdDateTimeUtc, timeRangeFilterType);
}
// The next few IOrchestrationServiceXXXClient methods are called by gRPC-based out-of-proc implementations
/// <inheritdoc />
Task<OrchestrationQueryResult> IOrchestrationServiceQueryClient.GetOrchestrationWithQueryAsync(
OrchestrationQuery query,
CancellationToken cancellationToken)
{
if (this.innerServiceClient is IOrchestrationServiceQueryClient queryClient)
{
return queryClient.GetOrchestrationWithQueryAsync(query, cancellationToken);
}
else
{
throw new NotSupportedException($"{this.innerServiceClient.GetType().Name} doesn't support query operations.");
}
}
/// <inheritdoc />
Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId)
{
if (this.innerServiceClient is IOrchestrationServicePurgeClient purgeClient)
{
return purgeClient.PurgeInstanceStateAsync(instanceId);
}
else
{
throw new NotSupportedException($"{this.innerServiceClient.GetType().Name} doesn't support purge operations.");
}
}
/// <inheritdoc />
Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
{
if (this.innerServiceClient is IOrchestrationServicePurgeClient purgeClient)
{
return purgeClient.PurgeInstanceStateAsync(purgeInstanceFilter);
}
else
{
throw new NotSupportedException($"{this.innerServiceClient.GetType().Name} doesn't support purge operations.");
}
}
/// <summary>
/// Uses durability provider specific logic to verify whether a timespan for a timer, timeout
/// or retry interval is allowed by the provider.
/// </summary>
/// <param name="timespan">The timespan that the code will have to wait for.</param>
/// <param name="errorMessage">The error message if the timespan is invalid.</param>
/// <returns>A boolean indicating whether the time interval is valid.</returns>
public virtual bool ValidateDelayTime(TimeSpan timespan, out string errorMessage)
{
errorMessage = null;
return true;
}
/// <summary>
/// Returns true if the stored connection string, ConnectionName, matches the input DurabilityProvider ConnectionName.
/// </summary>
/// <param name="durabilityProvider">The DurabilityProvider used to check for matching connection string names.</param>
/// <returns>A boolean indicating whether the connection names match.</returns>
internal virtual bool ConnectionNameMatches(DurabilityProvider durabilityProvider)
{
return this.ConnectionName.Equals(durabilityProvider.ConnectionName);
}
#if !FUNCTIONS_V1
/// <summary>
/// Tries to obtain a scale monitor for autoscaling.
/// </summary>
/// <param name="functionId">Function id.</param>
/// <param name="functionName">Function name.</param>
/// <param name="hubName">Task hub name.</param>
/// <param name="connectionName">The name of the storage-specific connection settings.</param>
/// <param name="scaleMonitor">The scale monitor.</param>
/// <returns>True if autoscaling is supported, false otherwise.</returns>
public virtual bool TryGetScaleMonitor(
string functionId,
string functionName,
string hubName,
string connectionName,
out IScaleMonitor scaleMonitor)
{
scaleMonitor = null;
return false;
}
#endif
}
}