-
Notifications
You must be signed in to change notification settings - Fork 273
/
Copy pathAzureStorageDurabilityProvider.cs
274 lines (238 loc) · 11.3 KB
/
AzureStorageDurabilityProvider.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Host.Scale;
#endif
using AzureStorage = DurableTask.AzureStorage;
using DTCore = DurableTask.Core;
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
/// <summary>
/// The Azure Storage implementation of additional methods not required by IOrchestrationService.
/// </summary>
internal class AzureStorageDurabilityProvider : DurabilityProvider
{
private readonly AzureStorageOrchestrationService serviceClient;
private readonly IStorageAccountProvider storageAccountProvider;
private readonly string connectionName;
private readonly JObject storageOptionsJson;
private readonly ILogger logger;
public AzureStorageDurabilityProvider(
AzureStorageOrchestrationService service,
IStorageAccountProvider storageAccountProvider,
string connectionName,
AzureStorageOptions options,
ILogger logger)
: base("Azure Storage", service, service, connectionName)
{
this.serviceClient = service;
this.storageAccountProvider = storageAccountProvider;
this.connectionName = connectionName;
this.storageOptionsJson = JObject.FromObject(
options,
new JsonSerializer
{
Converters = { new StringEnumConverter() },
ContractResolver = new CamelCasePropertyNamesContractResolver(),
});
this.logger = logger;
}
public override bool CheckStatusBeforeRaiseEvent => true;
/// <summary>
/// The app setting containing the Azure Storage connection string.
/// </summary>
public override string ConnectionName => this.connectionName;
public override JObject ConfigurationJson => this.storageOptionsJson;
public override TimeSpan MaximumDelayTime { get; set; } = TimeSpan.FromDays(6);
public override TimeSpan LongRunningTimerIntervalLength { get; set; } = TimeSpan.FromDays(3);
public override string EventSourceName { get; set; } = "DurableTask-AzureStorage";
/// <inheritdoc/>
public async override Task<IList<OrchestrationState>> GetAllOrchestrationStates(CancellationToken cancellationToken)
{
return await this.serviceClient.GetOrchestrationStateAsync(cancellationToken);
}
/// <inheritdoc/>
public async override Task<IList<OrchestrationState>> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
return await this.serviceClient.GetOrchestrationStateAsync(instanceId, allExecutions: false, fetchInput: showInput);
}
/// <inheritdoc/>
public async override Task RewindAsync(string instanceId, string reason)
{
await this.serviceClient.RewindTaskOrchestrationAsync(instanceId, reason);
}
/// <inheritdoc/>
[Obsolete]
public async override Task<IList<OrchestrationState>> GetAllOrchestrationStatesWithFilters(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationRuntimeStatus> runtimeStatus, CancellationToken cancellationToken)
{
return await this.serviceClient.GetOrchestrationStateAsync(createdTimeFrom, createdTimeTo, runtimeStatus.Select(x => (OrchestrationStatus)x), cancellationToken);
}
/// <inheritdoc/>
public async override Task<string> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
EntityBackendQueries entityBackendQueries = (this.serviceClient as IEntityOrchestrationService)?.EntityBackendQueries;
if (entityBackendQueries != null) // entity queries are natively supported
{
var entity = await entityBackendQueries.GetEntityAsync(new DTCore.Entities.EntityId(entityId.EntityName, entityId.EntityKey), cancellation: default);
if (entity == null)
{
return null;
}
else
{
return entity.Value.SerializedState;
}
}
else // fall back to old implementation
{
return await this.LegacyImplementationOfRetrieveSerializedEntityState(entityId, serializerSettings);
}
}
private async Task<string> LegacyImplementationOfRetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId);
IList<OrchestrationState> stateList = await this.serviceClient.GetOrchestrationStateAsync(instanceId, false);
OrchestrationState state = stateList?.FirstOrDefault();
if (state != null
&& state.OrchestrationInstance != null
&& state.Input != null)
{
string serializedState;
if (state.Input.StartsWith("http"))
{
serializedState = await this.serviceClient.DownloadBlobAsync(state.Input);
}
else
{
serializedState = state.Input;
}
var schedulerState = JsonConvert.DeserializeObject<SchedulerState>(serializedState, serializerSettings);
if (schedulerState.EntityExists)
{
return schedulerState.EntityState;
}
}
return null;
}
/// <inheritdoc/>
public async override Task<PurgeHistoryResult> PurgeInstanceHistoryByInstanceId(string instanceId)
{
AzureStorage.PurgeHistoryResult purgeHistoryResult =
await this.serviceClient.PurgeInstanceHistoryAsync(instanceId);
return new PurgeHistoryResult(purgeHistoryResult.InstancesDeleted);
}
/// <inheritdoc/>
public async override Task<int> PurgeHistoryByFilters(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus)
{
AzureStorage.PurgeHistoryResult purgeHistoryResult =
await this.serviceClient.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus);
return purgeHistoryResult.InstancesDeleted;
}
/// <inheritdoc/>
public async override Task<OrchestrationStatusQueryResult> GetOrchestrationStateWithPagination(OrchestrationStatusQueryCondition condition, CancellationToken cancellationToken)
{
var statusContext = await this.serviceClient.GetOrchestrationStateAsync(ConvertWebjobsDurableConditionToAzureStorageCondition(condition), condition.PageSize, condition.ContinuationToken, cancellationToken);
return this.ConvertFrom(statusContext);
}
public override bool ValidateDelayTime(TimeSpan timespan, out string errorMessage)
{
if (timespan > this.MaximumDelayTime)
{
errorMessage = $"The Azure Storage provider supports a maximum of {this.MaximumDelayTime.TotalDays} days for time-based delays";
return false;
}
return base.ValidateDelayTime(timespan, out errorMessage);
}
/// <inheritdoc/>
public async override Task MakeCurrentAppPrimaryAsync()
{
await this.serviceClient.ForceChangeAppLeaseAsync();
}
private OrchestrationStatusQueryResult ConvertFrom(DurableStatusQueryResult statusContext)
{
var results = new List<DurableOrchestrationStatus>();
foreach (var state in statusContext.OrchestrationState)
{
results.Add(DurableClient.ConvertOrchestrationStateToStatus(state));
}
var result = new OrchestrationStatusQueryResult
{
DurableOrchestrationState = results,
ContinuationToken = statusContext.ContinuationToken,
};
return result;
}
internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableConditionToAzureStorageCondition(OrchestrationStatusQueryCondition condition)
{
return new OrchestrationInstanceStatusQueryCondition
{
RuntimeStatus = condition.RuntimeStatus?.Select(
p => (OrchestrationStatus)Enum.Parse(typeof(OrchestrationStatus), p.ToString())),
CreatedTimeFrom = condition.CreatedTimeFrom,
CreatedTimeTo = condition.CreatedTimeTo,
TaskHubNames = condition.TaskHubNames,
InstanceIdPrefix = condition.InstanceIdPrefix,
FetchInput = condition.ShowInput,
};
}
#if !FUNCTIONS_V1
internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount);
}
/// <inheritdoc/>
public override bool TryGetScaleMonitor(
string functionId,
string functionName,
string hubName,
string connectionName,
out IScaleMonitor scaleMonitor)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
storageAccount,
this.logger,
metricsProvider);
return true;
}
#endif
#if FUNCTIONS_V3_OR_GREATER
public override bool TryGetTargetScaler(
string functionId,
string functionName,
string hubName,
string connectionName,
out ITargetScaler targetScaler)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
}
#endif
}
}