diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs
index 4c73ec38f..d4d6d1140 100644
--- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs
+++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs
@@ -71,6 +71,11 @@ internal class DurableClient : IDurableClient,
string IDurableEntityClient.TaskHubName => this.TaskHubName;
+ public override string ToString()
+ {
+ return $"DurableClient[backend={this.config.GetBackendInfo()}]";
+ }
+
///
HttpResponseMessage IDurableOrchestrationClient.CreateCheckStatusResponse(HttpRequestMessage request, string instanceId, bool returnInternalServerErrorOnFailure)
{
@@ -530,7 +535,7 @@ async Task IDurableEntityClient.CleanEntityStorageAsyn
}
if (removeEmptyEntities && !status.EntityExists && status.LockedBy == null && status.QueueSize == 0
- && now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.config.Options.EntityMessageReorderWindowInMinutes))
+ && now - state.LastUpdatedTime > this.config.MessageReorderWindow)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
}
diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs
index 8e4a5fdca..36705a80d 100644
--- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs
+++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs
@@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core.History;
@@ -22,7 +23,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)
{
this.Config = config ?? throw new ArgumentNullException(nameof(config));
this.FunctionName = functionName;
- this.EntityMessageReorderWindow = TimeSpan.FromMinutes(config.Options.EntityMessageReorderWindowInMinutes);
}
internal DurableTaskExtension Config { get; }
@@ -41,8 +41,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)
internal string Name => this.FunctionName;
- internal TimeSpan EntityMessageReorderWindow { get; private set; }
-
internal bool ExecutorCalledBack { get; set; }
internal void AddDeferredTask(Func function)
@@ -55,5 +53,11 @@ internal async Task RunDeferredTasks()
await Task.WhenAll(this.deferredTasks.Select(x => x()));
this.deferredTasks.Clear();
}
+
+ [Conditional("false")]
+ internal void TraceWorkItemProgress(string format, object arg)
+ {
+ // TODO hook this up with tracing in the backend when it is implemented
+ }
}
}
\ No newline at end of file
diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs
index 14e5751fb..9c7711117 100644
--- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs
+++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs
@@ -539,7 +539,7 @@ internal void SendOutbox(OrchestrationContext innerContext, bool writeBackSucces
{
if (!operationMessage.EventContent.ScheduledTime.HasValue)
{
- this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.EntityMessageReorderWindow);
+ this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.Config.MessageReorderWindow);
}
this.Config.TraceHelper.SendingEntityMessage(
diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs
index f87a5bfed..883794e8c 100644
--- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs
+++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs
@@ -1132,7 +1132,7 @@ internal void SendEntityMessage(OrchestrationInstance target, object eventConten
requestMessage,
target.InstanceId,
this.InnerContext.CurrentUtcDateTime,
- TimeSpan.FromMinutes(this.Config.Options.EntityMessageReorderWindowInMinutes));
+ this.Config.MessageReorderWindow);
eventName = EntityMessageEventNames.RequestMessageEventName;
}
diff --git a/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationContext.cs b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationContext.cs
index bff3125f3..93b5ffbed 100644
--- a/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationContext.cs
+++ b/src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableOrchestrationContext.cs
@@ -437,8 +437,7 @@ public interface IDurableOrchestrationContext
///
/// The collection of owned locks.
///
- /// Note that the collection of owned locks can be empty even if the context is locked. This happens
- /// if an orchestration calls a suborchestration without lending any locks.
+ /// Note that the collection of owned locks can be empty even if the context is locked.
///
/// true if the context already holds some locks.
bool IsLocked(out IReadOnlyList ownedLocks);
diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
index 60d2ef972..0395364c3 100644
--- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
+++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
@@ -57,6 +57,16 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
///
public virtual bool SupportsEntities => false;
+ ///
+ /// Specifies whether the backend's WaitForOrchestration is implemented without polling.
+ ///
+ public virtual bool SupportsPollFreeWait => false;
+
+ ///
+ /// Specifies whether this backend delivers messages in order.
+ ///
+ public virtual bool GuaranteesOrderedDelivery => false;
+
///
/// JSON representation of configuration to emit in telemetry.
///
@@ -87,6 +97,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
///
public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems;
+ internal string GetBackendInfo()
+ {
+ return this.GetOrchestrationService().ToString();
+ }
+
private IOrchestrationService GetOrchestrationService()
{
if (this.innerService == null)
@@ -236,7 +251,7 @@ public Task StopAsync()
}
///
- public Task StopAsync(bool isForced)
+ public virtual Task StopAsync(bool isForced)
{
return this.GetOrchestrationService().StopAsync(isForced);
}
@@ -278,7 +293,7 @@ public virtual Task> GetAllOrchestrationStatesWithFilt
/// Returns a task which completes when the state has been fetched.
public virtual Task> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
- throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateAsync));
+ throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithInputsAsync));
}
///
diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
index 56bb497b6..ae8fc4886 100644
--- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
+++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
@@ -189,6 +189,9 @@ public string HubName
internal MessagePayloadDataConverter ErrorDataConverter { get; private set; }
+ internal TimeSpan MessageReorderWindow
+ => this.defaultDurabilityProvider.GuaranteesOrderedDelivery ? TimeSpan.Zero : TimeSpan.FromMinutes(this.Options.EntityMessageReorderWindowInMinutes);
+
private MessagePayloadDataConverter CreateMessageDataConverter(IMessageSerializerSettingsFactory messageSerializerSettingsFactory)
{
bool isDefault;
@@ -215,6 +218,11 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}
+ internal string GetBackendInfo()
+ {
+ return this.defaultDurabilityProvider.GetBackendInfo();
+ }
+
///
/// Internal initialization call from the WebJobs host.
///
@@ -725,7 +733,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
else
{
// run this through the message sorter to help with reordering and duplicate filtering
- deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, entityContext.EntityMessageReorderWindow);
+ deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow);
}
foreach (var message in deliverNow)
@@ -733,6 +741,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
if (entityContext.State.LockedBy == message.ParentInstanceId)
{
// operation requests from the lock holder are processed immediately
+ entityContext.TraceWorkItemProgress("processes {entityMessage}", message);
entityShim.AddOperationToBatch(message);
}
else
@@ -749,6 +758,8 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
if (entityContext.State.LockedBy == message.ParentInstanceId)
{
+ entityContext.TraceWorkItemProgress("processes {entityMessage}", message);
+
this.TraceHelper.EntityLockReleased(
entityContext.HubName,
entityContext.Name,
@@ -759,6 +770,10 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
entityContext.State.LockedBy = null;
}
+ else
+ {
+ entityContext.TraceWorkItemProgress("!!!! drops {entityMessage}", message);
+ }
}
break;
@@ -769,6 +784,8 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
while (entityContext.State.LockedBy == null
&& entityContext.State.TryDequeue(out var request))
{
+ entityContext.TraceWorkItemProgress("processes {entityMessage}", request);
+
if (request.IsLockRequest)
{
entityShim.AddLockRequestToBatch(request);
diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs
index 2f9113bfd..681243df2 100644
--- a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs
+++ b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs
@@ -94,6 +94,14 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder, Actio
return builder;
}
+ ///
+ /// Override the AzureStorageDurabilityProvider specification that was done in AddDurableTask.
+ ///
+ /// The to configure, usually from the Functions app's FunctionsStartup.
+ /// Returns the provided .
+ public static IServiceCollection UseEventSourcedDurabilityProvider(this IServiceCollection services)
+ => services.AddSingleton();
+
#else
///
/// Enable running durable orchestrations implemented as functions.
diff --git a/src/WebJobs.Extensions.DurableTask/EventSourcedDurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/EventSourcedDurabilityProvider.cs
new file mode 100644
index 000000000..74f9f9b7b
--- /dev/null
+++ b/src/WebJobs.Extensions.DurableTask/EventSourcedDurabilityProvider.cs
@@ -0,0 +1,121 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See LICENSE in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using DurableTask.Core;
+using DurableTask.EventSourced;
+using Newtonsoft.Json;
+
+namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
+{
+ internal class EventSourcedDurabilityProvider : DurabilityProvider
+ {
+ private readonly EventSourcedOrchestrationService serviceClient;
+
+ internal EventSourcedDurabilityProvider(EventSourcedOrchestrationService service, EventSourcedOrchestrationServiceSettings settings)
+ : base("EventSourced", service, service, "StorageConnectionString")
+ {
+ this.serviceClient = service;
+ this.Settings = settings;
+ }
+
+ public EventSourcedOrchestrationServiceSettings Settings { get; private set; }
+
+ public override bool SupportsEntities => true;
+
+ public override bool SupportsPollFreeWait => true;
+
+ public override bool GuaranteesOrderedDelivery => true;
+
+ public override TimeSpan MaximumDelayTime { get; set; } = TimeSpan.MaxValue;
+
+ ///
+ /// The app setting containing the Azure Storage connection string.
+ ///
+ public override string ConnectionName => "StorageConnectionString"; // TODO this needs to be refactored to work across providers
+
+ ///
+ public override async Task StopAsync(bool isForced)
+ {
+ if (!this.Settings.KeepServiceRunning)
+ {
+ await this.serviceClient.StopAsync(isForced);
+ EventSourcedDurabilityProviderFactory.RemoveDurabilityProvider(this);
+ }
+ else
+ {
+ await this.PurgeHistoryByFilters(default, default, null);
+ }
+ }
+
+ ///
+ public async override Task RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
+ {
+ var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId);
+ OrchestrationState state = await this.serviceClient.GetOrchestrationStateAsync(instanceId, true, true);
+
+ if (state != null
+ && state.OrchestrationInstance != null
+ && state.Input != null)
+ {
+ var schedulerState = JsonConvert.DeserializeObject(state.Input, serializerSettings);
+
+ if (schedulerState.EntityExists)
+ {
+ return schedulerState.EntityState;
+ }
+ }
+
+ return null;
+ }
+
+ ///
+ public async override Task> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
+ {
+ var result = new List();
+ var state = await this.serviceClient.GetOrchestrationStateAsync(instanceId, showInput, true);
+ if (state != null)
+ {
+ result.Add(state);
+ }
+
+ return result;
+ }
+
+ ///
+ public async override Task PurgeInstanceHistoryByInstanceId(string instanceId)
+ {
+ var numberInstancesDeleted = await this.serviceClient.PurgeInstanceHistoryAsync(instanceId);
+ return new PurgeHistoryResult(numberInstancesDeleted);
+ }
+
+ ///
+ public override Task PurgeHistoryByFilters(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus)
+ {
+ return this.serviceClient.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus);
+ }
+
+ ///
+ public async override Task GetOrchestrationStateWithPagination(OrchestrationStatusQueryCondition condition, CancellationToken cancellationToken)
+ {
+ var instanceQuery = new InstanceQuery(
+ runtimeStatus: condition.RuntimeStatus?.Select(p => (OrchestrationStatus)Enum.Parse(typeof(OrchestrationStatus), p.ToString())).ToArray(),
+ createdTimeFrom: (condition.CreatedTimeFrom == default) ? (DateTime?)null : condition.CreatedTimeFrom.ToUniversalTime(),
+ createdTimeTo: (condition.CreatedTimeTo == default) ? (DateTime?)null : condition.CreatedTimeTo.ToUniversalTime(),
+ instanceIdPrefix: condition.InstanceIdPrefix,
+ fetchInput: condition.ShowInput);
+
+ InstanceQueryResult result = await this.serviceClient.QueryOrchestrationStatesAsync(instanceQuery, condition.PageSize, condition.ContinuationToken, cancellationToken);
+
+ return new OrchestrationStatusQueryResult()
+ {
+ DurableOrchestrationState = result.Instances.Select(ostate => DurableClient.ConvertOrchestrationStateToStatus(ostate)).ToList(),
+ ContinuationToken = result.ContinuationToken,
+ };
+ }
+ }
+}
diff --git a/src/WebJobs.Extensions.DurableTask/EventSourcedDurabilityProviderFactory.cs b/src/WebJobs.Extensions.DurableTask/EventSourcedDurabilityProviderFactory.cs
new file mode 100644
index 000000000..8dcc773c1
--- /dev/null
+++ b/src/WebJobs.Extensions.DurableTask/EventSourcedDurabilityProviderFactory.cs
@@ -0,0 +1,399 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See LICENSE in the project root for license information.
+
+using System;
+using System.Collections.Concurrent;
+using System.IO;
+using System.Threading;
+using DurableTask.EventSourced;
+using Microsoft.Azure.Storage;
+using Microsoft.Azure.Storage.Blob;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+
+namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
+{
+ internal class EventSourcedDurabilityProviderFactory : IDurabilityProviderFactory
+ {
+ private static ConcurrentDictionary cachedProviders = new ConcurrentDictionary();
+
+ private readonly DurableTaskOptions options;
+ private readonly IConnectionStringResolver connectionStringResolver;
+
+ private readonly bool reuseTaskHubForAllTests;
+ private readonly bool traceToConsole;
+ private readonly bool traceToEtwExtension;
+ private readonly bool traceToBlob;
+
+ private ILoggerFactory loggerFactory;
+ private static BlobLogger blobLogger;
+ private EventSourcedDurabilityProvider defaultProvider;
+
+ // the following are boolean options that can be specified in the json,
+ // but are not passed on to the backend
+ public const string ReuseTaskHubForTests = "ReuseTaskHubForTests";
+ public const string TraceToConsole = "TraceToConsole";
+ public const string TraceToEtwExtension = "TraceToEtwExtension";
+ public const string TraceToBlob = "TraceToBlob";
+
+ public EventSourcedDurabilityProviderFactory(
+ IOptions options,
+ IConnectionStringResolver connectionStringResolver,
+ ILoggerFactory loggerFactory)
+ {
+ // for debugging
+ // System.Threading.Thread.Sleep(5000);
+
+ this.options = options.Value;
+ this.connectionStringResolver = connectionStringResolver;
+ this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
+
+ bool ReadBooleanSetting(string name) => this.options.StorageProvider.TryGetValue(name, out object objValue)
+ && objValue is string stringValue && bool.TryParse(stringValue, out bool boolValue) && boolValue;
+
+ this.reuseTaskHubForAllTests = ReadBooleanSetting(ReuseTaskHubForTests);
+ this.traceToConsole = ReadBooleanSetting(TraceToConsole);
+ this.traceToEtwExtension = ReadBooleanSetting(TraceToEtwExtension);
+ this.traceToBlob = ReadBooleanSetting(TraceToBlob);
+ }
+
+ private EventSourcedOrchestrationServiceSettings GetEventSourcedSettings(string taskHubNameOverride = null)
+ {
+ var eventSourcedSettings = new EventSourcedOrchestrationServiceSettings();
+
+ // override DTFx defaults to the defaults we want to use in DF
+ eventSourcedSettings.ThrowExceptionOnInvalidDedupeStatus = true;
+
+ // copy all applicable fields from both the options and the storageProvider options
+ JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options), eventSourcedSettings);
+ JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options.StorageProvider), eventSourcedSettings);
+
+ // resolve any indirection in the specification of the two connection strings
+ eventSourcedSettings.StorageConnectionString = this.ResolveIndirection(
+ eventSourcedSettings.StorageConnectionString,
+ nameof(EventSourcedOrchestrationServiceSettings.StorageConnectionString));
+ eventSourcedSettings.EventHubsConnectionString = this.ResolveIndirection(
+ eventSourcedSettings.EventHubsConnectionString,
+ nameof(EventSourcedOrchestrationServiceSettings.EventHubsConnectionString));
+
+ // if worker id is specified in environment, it overrides the configured setting
+ string workerId = Environment.GetEnvironmentVariable("WorkerId");
+ if (!string.IsNullOrEmpty(workerId))
+ {
+ eventSourcedSettings.WorkerId = workerId;
+ }
+
+ eventSourcedSettings.HubName = this.options.HubName;
+
+ if (taskHubNameOverride != null)
+ {
+ eventSourcedSettings.HubName = taskHubNameOverride;
+ }
+
+ if (this.reuseTaskHubForAllTests)
+ {
+ eventSourcedSettings.HubName = "test-taskhub";
+ eventSourcedSettings.KeepServiceRunning = true;
+ }
+
+ // TODO sanitize hubname in the same way as AzureStorage does
+
+ return eventSourcedSettings;
+ }
+
+ public void CreateDefaultProvider()
+ {
+ var settings = this.GetEventSourcedSettings();
+
+ if (this.traceToBlob && blobLogger == null)
+ {
+ blobLogger = blobLogger ?? new BlobLogger(settings.StorageConnectionString, settings.WorkerId);
+ }
+
+ if (this.traceToConsole || this.traceToEtwExtension || this.traceToBlob)
+ {
+ // capture trace events generated in the backend and redirect them to generate an ETW event, or to trace to console
+ this.loggerFactory = new LoggerFactoryWrapper(this.loggerFactory, settings.HubName, settings.WorkerId, this);
+ }
+
+ // var providerFactoryName = nameof(EventSourcedDurabilityProviderFactory);
+ // ILogger logger = this.loggerFactory.CreateLogger(providerFactoryName);
+ // var traceHelper = new EndToEndTraceHelper(logger, false);
+ // traceHelper.ExtensionWarningEvent(this.options.HubName, string.Empty, string.Empty, $"{providerFactoryName} instantiated");
+
+ var key = new DurableClientAttribute()
+ {
+ TaskHub = settings.HubName,
+ ConnectionName = settings.StorageConnectionString,
+ };
+
+ if (this.reuseTaskHubForAllTests && cachedProviders.TryGetValue(key, out var cachedProviderFromLastTest))
+ {
+ // We simply use the cached orchestration service, which is still running,
+ // but change the extended sessions setting, which is dynamically checked by the implementation.
+ cachedProviderFromLastTest.Settings.ExtendedSessionsEnabled = settings.ExtendedSessionsEnabled;
+ this.defaultProvider = cachedProviderFromLastTest;
+ }
+ else
+ {
+ var service = new EventSourcedOrchestrationService(settings, this.loggerFactory);
+ this.defaultProvider = new EventSourcedDurabilityProvider(service, settings);
+ cachedProviders[key] = this.defaultProvider;
+ }
+ }
+
+ public DurabilityProvider GetDurabilityProvider(DurableClientAttribute attribute)
+ {
+ EventSourcedOrchestrationServiceSettings settings = this.GetEventSourcedSettings(attribute.TaskHub);
+
+ if (string.Equals(this.defaultProvider.Settings.HubName, settings.HubName, StringComparison.OrdinalIgnoreCase) &&
+ string.Equals(this.defaultProvider.Settings.StorageConnectionString, settings.StorageConnectionString, StringComparison.OrdinalIgnoreCase))
+ {
+ return this.defaultProvider;
+ }
+
+ DurableClientAttribute key = new DurableClientAttribute()
+ {
+ TaskHub = settings.HubName,
+ ConnectionName = settings.StorageConnectionString,
+ };
+
+ return cachedProviders.GetOrAdd(key, _ =>
+ {
+ var service = new EventSourcedOrchestrationService(settings, this.loggerFactory);
+ return new EventSourcedDurabilityProvider(service, settings);
+ });
+ }
+
+ public static bool RemoveDurabilityProvider(EventSourcedDurabilityProvider provider)
+ {
+ return cachedProviders.TryRemove(
+ new DurableClientAttribute()
+ {
+ TaskHub = provider.Settings.HubName,
+ ConnectionName = provider.Settings.StorageConnectionString,
+ },
+ out _);
+ }
+
+ private string ResolveIndirection(string value, string propertyName)
+ {
+ string envName;
+ string setting;
+
+ if (string.IsNullOrEmpty(value))
+ {
+ envName = propertyName;
+ }
+ else if (value.StartsWith("$"))
+ {
+ envName = value.Substring(1);
+ }
+ else if (value.StartsWith("%") && value.EndsWith("%"))
+ {
+ envName = value.Substring(1, value.Length - 2);
+ }
+ else
+ {
+ envName = null;
+ }
+
+ if (envName != null)
+ {
+ setting = this.connectionStringResolver.Resolve(envName);
+ }
+ else
+ {
+ setting = value;
+ }
+
+ if (string.IsNullOrEmpty(setting))
+ {
+ throw new InvalidOperationException($"Could not resolve '{envName}' for required property '{propertyName}' in EventSourced storage provider settings.");
+ }
+ else
+ {
+ return setting;
+ }
+ }
+
+ internal string GetDefaultStorageConnectionString()
+ => this.defaultProvider.Settings.StorageConnectionString;
+
+ public DurabilityProvider GetDurabilityProvider()
+ {
+ if (this.defaultProvider == null)
+ {
+ this.CreateDefaultProvider();
+ }
+
+ return this.defaultProvider;
+ }
+
+ private class LoggerFactoryWrapper : ILoggerFactory
+ {
+ private readonly ILoggerFactory loggerFactory;
+ private readonly EventSourcedDurabilityProviderFactory providerFactory;
+ private readonly string hubName;
+ private readonly string workerId;
+
+ public LoggerFactoryWrapper(ILoggerFactory loggerFactory, string hubName, string workerId, EventSourcedDurabilityProviderFactory providerFactory)
+ {
+ this.hubName = hubName;
+ this.workerId = workerId;
+ this.loggerFactory = loggerFactory;
+ this.providerFactory = providerFactory;
+ }
+
+ public void AddProvider(ILoggerProvider provider)
+ {
+ this.loggerFactory.AddProvider(provider);
+ }
+
+ public ILogger CreateLogger(string categoryName)
+ {
+ var logger = this.loggerFactory.CreateLogger(categoryName);
+ return new LoggerWrapper(logger, categoryName, this.hubName, this.workerId, this.providerFactory);
+ }
+
+ public void Dispose()
+ {
+ this.loggerFactory.Dispose();
+ }
+ }
+
+ private class LoggerWrapper : ILogger
+ {
+ private static readonly string ExtensionVersion = System.Diagnostics.FileVersionInfo.GetVersionInfo(typeof(DurableTaskExtension).Assembly.Location).FileVersion;
+ private readonly ILogger logger;
+ private readonly string prefix;
+ private readonly string hubName;
+ private readonly EventSourcedDurabilityProviderFactory providerFactory;
+ private readonly bool fullTracing;
+
+ public LoggerWrapper(ILogger logger, string category, string hubName, string workerId, EventSourcedDurabilityProviderFactory providerFactory)
+ {
+ this.logger = logger;
+ this.prefix = $"{workerId} [{category}]";
+ this.hubName = hubName;
+ this.providerFactory = providerFactory;
+ this.fullTracing = this.providerFactory.traceToBlob || this.providerFactory.traceToConsole || this.providerFactory.traceToEtwExtension;
+ }
+
+ public IDisposable BeginScope(TState state)
+ {
+ return this.logger.BeginScope(state);
+ }
+
+ public bool IsEnabled(Microsoft.Extensions.Logging.LogLevel logLevel)
+ {
+ return this.fullTracing || this.logger.IsEnabled(logLevel);
+ }
+
+ public void Log(Microsoft.Extensions.Logging.LogLevel logLevel, Microsoft.Extensions.Logging.EventId eventId, TState state, Exception exception, Func formatter)
+ {
+ if (this.IsEnabled(logLevel))
+ {
+ this.logger.Log(logLevel, eventId, state, exception, formatter);
+
+ if (this.providerFactory.traceToEtwExtension)
+ {
+ EtwEventSource.Instance.ExtensionInformationalEvent(
+ this.hubName,
+ EndToEndTraceHelper.LocalAppName,
+ EndToEndTraceHelper.LocalSlotName,
+ string.Empty,
+ string.Empty,
+ $"{logLevel,-11} {this.prefix} {formatter(state, exception)}",
+ ExtensionVersion);
+ }
+
+ if (this.providerFactory.traceToConsole || this.providerFactory.traceToBlob)
+ {
+ string formattedString = $"{DateTime.UtcNow:o} {this.prefix}s{(int)logLevel} {formatter(state, exception)}";
+
+ if (this.providerFactory.traceToConsole)
+ {
+ System.Console.WriteLine(formattedString);
+ }
+
+ EventSourcedDurabilityProviderFactory.blobLogger?.WriteLine(formattedString);
+ }
+ }
+ }
+ }
+
+ private class BlobLogger
+ {
+ private readonly DateTime starttime;
+ private readonly CloudAppendBlob blob;
+ private readonly object flushLock = new object();
+ private readonly object lineLock = new object();
+ private readonly Timer timer;
+ private MemoryStream memoryStream;
+ private StreamWriter writer;
+
+ public BlobLogger(string storageConnectionString, string workerId)
+ {
+ this.starttime = DateTime.UtcNow;
+
+ var storageAccount = CloudStorageAccount.Parse(storageConnectionString);
+ var client = storageAccount.CreateCloudBlobClient();
+ var container = client.GetContainerReference("logs");
+ container.CreateIfNotExists();
+ this.blob = container.GetAppendBlobReference($"{workerId}.{this.starttime:o}.log");
+ this.blob.CreateOrReplace();
+
+ this.memoryStream = new MemoryStream();
+ this.writer = new StreamWriter(this.memoryStream);
+
+ var interval = 14000 + new Random().Next(1000);
+ this.timer = new Timer(this.Flush, null, interval, interval);
+ }
+
+ public void WriteLine(string line)
+ {
+ lock (this.lineLock)
+ {
+ this.writer.WriteLine(line);
+ }
+ }
+
+ public void Flush(object ignored)
+ {
+ if (Monitor.TryEnter(this.flushLock))
+ {
+ try
+ {
+ MemoryStream toSave = null;
+
+ // grab current buffer and create new one
+ lock (this.lineLock)
+ {
+ this.writer.Flush();
+ if (this.memoryStream.Position > 0)
+ {
+ toSave = this.memoryStream;
+ this.memoryStream = new MemoryStream();
+ this.writer = new StreamWriter(this.memoryStream);
+ }
+ }
+
+ if (toSave != null)
+ {
+ // save to storage
+ toSave.Seek(0, SeekOrigin.Begin);
+ this.blob.AppendFromStream(toSave);
+ toSave.Dispose();
+ }
+ }
+ finally
+ {
+ Monitor.Exit(this.flushLock);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
index 5c3894eea..fca091437 100644
--- a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
+++ b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
@@ -169,7 +169,24 @@ private static TemplateMatcher GetInstanceRaiseEventRoute()
Stopwatch stopwatch = Stopwatch.StartNew();
while (true)
{
- DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId);
+ DurableOrchestrationStatus status = null;
+
+ if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
+ {
+ try
+ {
+ var state = await durableClient.DurabilityProvider.WaitForOrchestrationAsync(instanceId, null, timeout, CancellationToken.None);
+ status = DurableClient.ConvertOrchestrationStateToStatus(state);
+ }
+ catch (TimeoutException)
+ {
+ }
+ }
+ else
+ {
+ status = await client.GetStatusAsync(instanceId);
+ }
+
if (status != null)
{
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
@@ -704,6 +721,10 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString
{
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, pollingInterval.Value);
}
+ else if (timeout.HasValue && client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
+ {
+ return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, timeout.Value);
+ }
else
{
return client.CreateCheckStatusResponse(request, id);
diff --git a/src/WebJobs.Extensions.DurableTask/Options/AzureStorageOptions.cs b/src/WebJobs.Extensions.DurableTask/Options/AzureStorageOptions.cs
index 53e3f8ef2..4644b4c37 100644
--- a/src/WebJobs.Extensions.DurableTask/Options/AzureStorageOptions.cs
+++ b/src/WebJobs.Extensions.DurableTask/Options/AzureStorageOptions.cs
@@ -5,7 +5,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
-using Microsoft.WindowsAzure.Storage;
+using Microsoft.Azure.Storage;
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj
index 189fafd7a..6e818983d 100644
--- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj
+++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj
@@ -77,6 +77,7 @@
+
diff --git a/test/Common/DurableHttpTests.cs b/test/Common/DurableHttpTests.cs
index 578ea6f6f..4e8cb05e4 100644
--- a/test/Common/DurableHttpTests.cs
+++ b/test/Common/DurableHttpTests.cs
@@ -63,8 +63,11 @@ private void StartLogCapture()
this.eventSourceListener.OnTraceLog += this.OnEventSourceListenerTraceLog;
- string sessionName = "DTFxTrace" + Guid.NewGuid().ToString("N");
- this.eventSourceListener.CaptureLogs(sessionName, traceConfig);
+ if (TestHelpers.CaptureETWInTestOutput)
+ {
+ string sessionName = "DTFxTrace" + Guid.NewGuid().ToString("N");
+ this.eventSourceListener.CaptureLogs(sessionName, traceConfig);
+ }
}
}
diff --git a/test/Common/DurableTaskEndToEndTests.cs b/test/Common/DurableTaskEndToEndTests.cs
index 07e1cd0c8..5f5e0976f 100644
--- a/test/Common/DurableTaskEndToEndTests.cs
+++ b/test/Common/DurableTaskEndToEndTests.cs
@@ -71,8 +71,11 @@ private void StartLogCapture()
this.eventSourceListener.OnTraceLog += this.OnEventSourceListenerTraceLog;
- string sessionName = "DTFxTrace" + Guid.NewGuid().ToString("N");
- this.eventSourceListener.CaptureLogs(sessionName, traceConfig);
+ if (TestHelpers.CaptureETWInTestOutput)
+ {
+ string sessionName = "DTFxTrace" + Guid.NewGuid().ToString("N");
+ this.eventSourceListener.CaptureLogs(sessionName, traceConfig);
+ }
}
}
@@ -693,7 +696,7 @@ public async Task OutputsValidJSONLogs()
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [MemberData(nameof(TestDataGenerator.GetAllSupportedExtendedSessionWithStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
public async Task HelloWorldOrchestration_Activity(bool extendedSessions, string storageProvider)
{
await this.HelloWorldOrchestration_Activity_Main_Logic(nameof(this.HelloWorldOrchestration_Activity), extendedSessions, storageProvider);
@@ -894,7 +897,7 @@ private async Task HelloWorldOrchestration_Activity_Main_Logic(string taskHubNam
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -945,7 +948,7 @@ public async Task HelloWorldOrchestration_Activity_CustomStatus(bool extendedSes
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -1026,7 +1029,7 @@ public async Task SequentialOrchestration(bool extendedSessions, string storageP
}
// Assert log entry count
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
var logger = this.loggerProvider.CreatedLoggers.Single(l => l.Category == TestHelpers.LogCategory);
var logMessages = logger.LogMessages.Where(
@@ -1123,7 +1126,7 @@ public async Task ActorOrchestration(bool extendedSessions, string storageProvid
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -1355,7 +1358,7 @@ public async Task TerminateOrchestration(bool extendedSessions, string storagePr
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -1374,7 +1377,7 @@ public async Task TerminateOrchestration(bool extendedSessions, string storagePr
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
[Trait("Category", PlatformSpecificHelpers.TestCategory + "_BVT")]
- [MemberData(nameof(TestDataGenerator.GetFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ [MemberData(nameof(TestDataGenerator.GetStorageProviderWithRewindOptions), MemberType = typeof(TestDataGenerator))]
public async Task RewindOrchestration(string storageProvider)
{
string[] orchestratorFunctionNames =
@@ -1411,7 +1414,7 @@ public async Task RewindOrchestration(string storageProvider)
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -1457,7 +1460,7 @@ public async Task TimerCancellation(bool extendedSessions, string storageProvide
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -1503,7 +1506,7 @@ public async Task TimerExpiration(bool extendedSessions, string storageProvider)
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -1744,7 +1747,7 @@ public async Task UnhandledOrchestrationException(bool extendedSessions, string
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -1791,7 +1794,7 @@ public async Task Orchestration_Activity(bool extendedSessions, string storagePr
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -1821,7 +1824,7 @@ public async Task SubOrchestration_ComplexType(string storageProvider)
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [MemberData(nameof(TestDataGenerator.GetFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ [MemberData(nameof(TestDataGenerator.GetHistoryStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
public async Task SubOrchestration_ComplexType_History(string storageProvider)
{
await this.SubOrchestration_ComplexType_Main_Logic(nameof(this.SubOrchestration_ComplexType_History), storageProvider, showHistory: true);
@@ -1832,7 +1835,7 @@ public async Task SubOrchestration_ComplexType_History(string storageProvider)
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [MemberData(nameof(TestDataGenerator.GetFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ [MemberData(nameof(TestDataGenerator.GetHistoryStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
public async Task SubOrchestration_ComplexType_HistoryInputOutput(string storageProvider)
{
await this.SubOrchestration_ComplexType_Main_Logic(nameof(this.SubOrchestration_ComplexType_HistoryInputOutput), storageProvider, showHistory: true, showHistoryOutput: true);
@@ -2043,7 +2046,7 @@ public async Task UnhandledOrchestrationExceptionWithRetry(bool extendedSessions
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.UnhandledOrchesterationExceptionWithRetry_AssertLogMessageSequence(
this.output,
@@ -2132,7 +2135,7 @@ public async Task UnhandledActivityException(bool extendedSessions, string stora
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -2230,7 +2233,7 @@ public async Task UnhandledActivityExceptionWithRetry(bool extendedSessions, str
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -2380,7 +2383,7 @@ public async Task Orchestration_OnUnregisteredActivity(bool extendedSessions, st
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -2443,7 +2446,7 @@ public async Task Orchestration_OnValidOrchestrator(bool extendedSessions, strin
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -2549,7 +2552,7 @@ public async Task Orchestration_OnUnregisteredOrchestrator(bool extendedSessions
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -2824,7 +2827,7 @@ public async Task Activity_Gets_HttpManagementPayload(bool extendedSessions, str
Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus);
HttpManagementPayload httpManagementPayload = status.Output.ToObject();
- ValidateHttpManagementPayload(httpManagementPayload, extendedSessions, "ActivityGetsHttpManagementPayload");
+ ValidateHttpManagementPayload(httpManagementPayload, extendedSessions, storageProvider, "ActivityGetsHttpManagementPayload");
await host.StopAsync();
}
@@ -2856,7 +2859,7 @@ public async Task OrchestrationClient_Gets_HttpManagementPayload(bool extendedSe
var status = await client.WaitForCompletionAsync(this.output);
HttpManagementPayload httpManagementPayload = client.InnerClient.CreateHttpManagementPayload(status.InstanceId);
- ValidateHttpManagementPayload(httpManagementPayload, extendedSessions, "OrchestrationClientGetsHttpManagementPayload");
+ ValidateHttpManagementPayload(httpManagementPayload, extendedSessions, storageProvider, "OrchestrationClientGetsHttpManagementPayload");
Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus);
Assert.Equal("World", status?.Input);
@@ -2864,7 +2867,7 @@ public async Task OrchestrationClient_Gets_HttpManagementPayload(bool extendedSe
await host.StopAsync();
- if (this.useTestLogger)
+ if (this.useTestLogger && storageProvider == TestHelpers.AzureStorageProviderType)
{
TestHelpers.AssertLogMessageSequence(
this.output,
@@ -2961,9 +2964,8 @@ public async Task LegacyBaseClasses()
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_SignalAndCallStringStore(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_SignalAndCallStringStore(bool extendedSessions, string storageProvider)
{
string[] orchestratorFunctionNames =
{
@@ -2973,7 +2975,8 @@ public async Task DurableEntity_SignalAndCallStringStore(bool extendedSessions)
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_SignalAndCallStringStore),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -2999,9 +3002,8 @@ public async Task DurableEntity_SignalAndCallStringStore(bool extendedSessions)
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_StringStoreWithCreateDelete(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_StringStoreWithCreateDelete(bool extendedSessions, string storageProvider)
{
string[] orchestratorFunctionNames =
{
@@ -3011,7 +3013,8 @@ public async Task DurableEntity_StringStoreWithCreateDelete(bool extendedSession
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_StringStoreWithCreateDelete),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3110,9 +3113,8 @@ public async Task DurableEntity_RollbackSignalsOnExceptions(bool extendedSession
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_SignalThenPoll(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_SignalThenPoll(bool extendedSessions, string storageProvider)
{
string[] orchestratorFunctionNames =
{
@@ -3122,7 +3124,8 @@ public async Task DurableEntity_SignalThenPoll(bool extendedSessions)
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_SignalThenPoll),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3181,9 +3184,8 @@ public async Task DurableEntity_EntityFireAndForget(bool extendedSessions)
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_LargeEntity(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_LargeEntity(bool extendedSessions, string storageProvider)
{
string[] orchestratorFunctionNames =
{
@@ -3193,7 +3195,8 @@ public async Task DurableEntity_LargeEntity(bool extendedSessions)
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_LargeEntity),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3221,15 +3224,15 @@ public async Task DurableEntity_LargeEntity(bool extendedSessions)
/// At the end, it validates that all of the appends are reflected in the final state.
///
[Theory]
- [Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_EntityToAndFromBlob(bool extendedSessions)
+ [Trait("Category", PlatformSpecificHelpers.TestCategory + "_UnpublishedDependencies")]
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_EntityToAndFromBlob(bool extendedSessions, string storageProvider)
{
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_EntityToAndFromBlob),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3468,9 +3471,8 @@ public async Task DurableEntity_SelfSchedulingEntity(bool extendedSessions)
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_LockedIncrements(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_LockedIncrements(bool extendedSessions, string storageProvider)
{
string[] orchestratorFunctionNames =
{
@@ -3479,7 +3481,8 @@ public async Task DurableEntity_LockedIncrements(bool extendedSessions)
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_LockedIncrements),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3521,14 +3524,14 @@ public async Task DurableEntity_LockedIncrements(bool extendedSessions)
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_SingleLockedTransfer(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_SingleLockedTransfer(bool extendedSessions, string storageProvider)
{
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_SingleLockedTransfer),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3563,10 +3566,11 @@ public async Task DurableEntity_SingleLockedTransfer(bool extendedSessions)
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true, 5)]
- [InlineData(false, 5)]
- public async Task DurableEntity_MultipleLockedTransfers(bool extendedSessions, int numberEntities)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_MultipleLockedTransfers(bool extendedSessions, string storageProvider)
{
+ var numberEntities = 5;
+
string[] orchestratorFunctionNames =
{
nameof(TestOrchestrations.LockedTransfer),
@@ -3574,7 +3578,8 @@ public async Task DurableEntity_MultipleLockedTransfers(bool extendedSessions, i
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_MultipleLockedTransfers),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3638,14 +3643,16 @@ public async Task DurableEntity_MultipleLockedTransfers(bool extendedSessions, i
///
/// Test which validates that actors can safely make async I/O calls.
///
- [Fact]
+ [Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- public async Task DurableEntity_AsyncIO()
+ [MemberData(nameof(TestDataGenerator.GetFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_AsyncIO(string storageProvider)
{
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_AsyncIO),
- enableExtendedSessions: false))
+ enableExtendedSessions: false,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3682,9 +3689,8 @@ public async Task DurableEntity_AsyncIO()
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_EntityNameCaseInsensitivity(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_EntityNameCaseInsensitivity(bool extendedSessions, string storageProvider)
{
string[] orchestratorFunctionNames =
{
@@ -3694,7 +3700,8 @@ public async Task DurableEntity_EntityNameCaseInsensitivity(bool extendedSession
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_EntityNameCaseInsensitivity),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3803,9 +3810,8 @@ public async Task AzureStorage_EventTimeoutLimitHit_ThrowsException()
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_BasicObjects(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_BasicObjects(bool extendedSessions, string storageProvider)
{
string[] orchestratorFunctionNames =
{
@@ -3814,7 +3820,8 @@ public async Task DurableEntity_BasicObjects(bool extendedSessions)
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_BasicObjects),
- extendedSessions))
+ extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3836,9 +3843,8 @@ public async Task DurableEntity_BasicObjects(bool extendedSessions)
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_EntityProxy(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_EntityProxy(bool extendedSessions, string storageProvider)
{
string[] orchestratorFunctionNames =
{
@@ -3847,7 +3853,8 @@ public async Task DurableEntity_EntityProxy(bool extendedSessions)
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_EntityProxy),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -3943,9 +3950,8 @@ public async Task DurableEntity_EntityProxy_UsesBindings(bool extendedSessions)
///
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [InlineData(true)]
- [InlineData(false)]
- public async Task DurableEntity_EntityProxy_NameResolve(bool extendedSessions)
+ [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ public async Task DurableEntity_EntityProxy_NameResolve(bool extendedSessions, string storageProvider)
{
string[] orchestratorFunctionNames =
{
@@ -3954,7 +3960,8 @@ public async Task DurableEntity_EntityProxy_NameResolve(bool extendedSessions)
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_EntityProxy_NameResolve),
- extendedSessions))
+ enableExtendedSessions: extendedSessions,
+ storageProviderType: storageProvider))
{
await host.StartAsync();
@@ -4089,16 +4096,22 @@ public async Task Purge_Single_Instance_History(bool extendedSessions, string st
Assert.Equal(instanceId, orchestrationStatus.InstanceId);
Assert.True(orchestrationStatus.History.Count > 0);
- int blobCount = await GetBlobCount($"{client.TaskHubName.ToLowerInvariant()}-largemessages", instanceId);
- Assert.True(blobCount > 0);
+ if (storageProvider == TestHelpers.AzureStorageProviderType)
+ {
+ int blobCount = await GetBlobCount($"{client.TaskHubName.ToLowerInvariant()}-largemessages", instanceId);
+ Assert.True(blobCount > 0);
+ }
await client.InnerClient.PurgeInstanceHistoryAsync(instanceId);
orchestrationStatus = await client.GetStatusAsync(true);
Assert.Null(orchestrationStatus);
- blobCount = await GetBlobCount($"{client.TaskHubName.ToLowerInvariant()}-largemessages", instanceId);
- Assert.Equal(0, blobCount);
+ if (storageProvider == TestHelpers.AzureStorageProviderType)
+ {
+ int blobCount = await GetBlobCount($"{client.TaskHubName.ToLowerInvariant()}-largemessages", instanceId);
+ Assert.Equal(0, blobCount);
+ }
await host.StopAsync();
}
@@ -4156,10 +4169,13 @@ public async Task Purge_All_History_By_TimePeriod(bool extendedSessions, string
status = await client.InnerClient.GetStatusAsync(fourthInstanceId, true);
Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus);
Assert.True(status.History.Count > 0);
- await ValidateBlobUrlAsync(client.TaskHubName, client.InstanceId, (string)status.Output);
- int blobCount = await GetBlobCount($"{client.TaskHubName.ToLowerInvariant()}-largemessages", fourthInstanceId);
- Assert.True(blobCount > 0);
+ if (storageProvider == TestHelpers.AzureStorageProviderType)
+ {
+ await ValidateBlobUrlAsync(client.TaskHubName, client.InstanceId, (string)status.Output);
+ int blobCount = await GetBlobCount($"{client.TaskHubName.ToLowerInvariant()}-largemessages", fourthInstanceId);
+ Assert.True(blobCount > 0);
+ }
await client.InnerClient.PurgeInstanceHistoryAsync(
startDateTime,
@@ -4183,8 +4199,11 @@ public async Task Purge_All_History_By_TimePeriod(bool extendedSessions, string
status = await client.InnerClient.GetStatusAsync(fourthInstanceId, true);
Assert.Null(status);
- blobCount = await GetBlobCount($"{client.TaskHubName.ToLowerInvariant()}-largemessages", fourthInstanceId);
- Assert.Equal(0, blobCount);
+ if (storageProvider == TestHelpers.AzureStorageProviderType)
+ {
+ var blobCount = await GetBlobCount($"{client.TaskHubName.ToLowerInvariant()}-largemessages", fourthInstanceId);
+ Assert.Equal(0, blobCount);
+ }
await host.StopAsync();
}
@@ -4361,7 +4380,7 @@ public async Task DurableEntity_ListEntitiesAsync_FetchState(bool fetchState, st
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
- [MemberData(nameof(TestDataGenerator.GetFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))]
+ [MemberData(nameof(TestDataGenerator.GetStorageProviderWithPagedQueries), MemberType = typeof(TestDataGenerator))]
public async Task DurableEntity_ListEntitiesAsync_Paging(string storageProvider)
{
var yesterday = DateTime.UtcNow.Subtract(TimeSpan.FromDays(1));
@@ -5297,25 +5316,36 @@ private static async Task ValidateBlobUrlAsync(string taskHubName, string instan
Assert.True(await blob.ExistsAsync(), $"Blob named {blob.Uri} is expected to exist.");
}
- private static void ValidateHttpManagementPayload(HttpManagementPayload httpManagementPayload, bool extendedSessions, string defaultTaskHubName)
+ private static void ValidateHttpManagementPayload(HttpManagementPayload httpManagementPayload, bool extendedSessions, string storageProvider, string defaultTaskHubName)
{
Assert.NotNull(httpManagementPayload);
Assert.NotEmpty(httpManagementPayload.Id);
string instanceId = httpManagementPayload.Id;
string notificationUrl = TestConstants.NotificationUrlBase;
+ string storageName;
+
string taskHubName = extendedSessions
- ? $"{defaultTaskHubName}EX"
- : defaultTaskHubName;
+ ? $"{defaultTaskHubName}EX"
+ : defaultTaskHubName;
taskHubName += PlatformSpecificHelpers.VersionSuffix;
+ if (storageProvider == TestHelpers.EventSourcedProviderType)
+ {
+ storageName = "StorageConnectionString";
+ }
+ else
+ {
+ storageName = "AzureWebJobsStorage";
+ }
+
Assert.Equal(
- $"{notificationUrl}/instances/{instanceId}?taskHub={taskHubName}&connection=AzureWebJobsStorage&code=mykey",
+ $"{notificationUrl}/instances/{instanceId}?taskHub={taskHubName}&connection={storageName}&code=mykey",
httpManagementPayload.StatusQueryGetUri);
Assert.Equal(
- $"{notificationUrl}/instances/{instanceId}/raiseEvent/{{eventName}}?taskHub={taskHubName}&connection=AzureWebJobsStorage&code=mykey",
+ $"{notificationUrl}/instances/{instanceId}/raiseEvent/{{eventName}}?taskHub={taskHubName}&connection={storageName}&code=mykey",
httpManagementPayload.SendEventPostUri);
Assert.Equal(
- $"{notificationUrl}/instances/{instanceId}/terminate?reason={{text}}&taskHub={taskHubName}&connection=AzureWebJobsStorage&code=mykey",
+ $"{notificationUrl}/instances/{instanceId}/terminate?reason={{text}}&taskHub={taskHubName}&connection={storageName}&code=mykey",
httpManagementPayload.TerminatePostUri);
}
diff --git a/test/Common/TestDataGenerator.cs b/test/Common/TestDataGenerator.cs
index 8d20d0d1a..769a5a07a 100644
--- a/test/Common/TestDataGenerator.cs
+++ b/test/Common/TestDataGenerator.cs
@@ -2,16 +2,26 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System.Collections.Generic;
+using System.Linq;
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
{
public class TestDataGenerator
{
private static readonly object[] BoolOptions = new object[] { true, false };
- private static readonly object[] FullFeaturedStorageProviders = new object[] { TestHelpers.AzureStorageProviderType };
#if !FUNCTIONS_V1
- private static readonly object[] AllStorageProviders = new object[] { TestHelpers.AzureStorageProviderType, TestHelpers.RedisProviderType };
+ private static readonly object[] ExtendedSessionsStorageProviders = new object[] { TestHelpers.AzureStorageProviderType, TestHelpers.EventSourcedProviderType };
+ private static readonly object[] HistoryStorageProviders = new object[] { TestHelpers.AzureStorageProviderType, TestHelpers.EventSourcedProviderType };
+ private static readonly object[] FullFeaturedStorageProviders = new object[] { TestHelpers.AzureStorageProviderType, TestHelpers.EventSourcedProviderType };
+ private static readonly object[] RewindStorageProviders = new object[] { TestHelpers.AzureStorageProviderType };
+ private static readonly object[] PagedQueriesStorageProviders = new object[] { TestHelpers.AzureStorageProviderType };
+ private static readonly object[] AllStorageProviders = new object[] { TestHelpers.AzureStorageProviderType, TestHelpers.RedisProviderType, TestHelpers.EventSourcedProviderType };
#else
+ private static readonly object[] ExtendedSessionsStorageProviders = new object[] { TestHelpers.AzureStorageProviderType };
+ private static readonly object[] HistoryStorageProviders = new object[] { TestHelpers.AzureStorageProviderType };
+ private static readonly object[] FullFeaturedStorageProviders = new object[] { TestHelpers.AzureStorageProviderType };
+ private static readonly object[] RewindStorageProviders = new object[] { TestHelpers.AzureStorageProviderType };
+ private static readonly object[] PagedQueriesStorageProviders = new object[] { TestHelpers.AzureStorageProviderType };
private static readonly object[] AllStorageProviders = new object[] { TestHelpers.AzureStorageProviderType };
#endif
@@ -29,7 +39,48 @@ public static IEnumerable