diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs
index bd9f47498..c1f0302e1 100644
--- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs
+++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs
@@ -83,6 +83,12 @@ internal class DurableClient : IDurableClient,
string IDurableEntityClient.TaskHubName => this.TaskHubName;
+ ///
+ public override string ToString()
+ {
+ return $"DurableClient[backend={this.config.GetBackendInfo()}]";
+ }
+
///
HttpResponseMessage IDurableOrchestrationClient.CreateCheckStatusResponse(HttpRequestMessage request, string instanceId, bool returnInternalServerErrorOnFailure)
{
@@ -541,10 +547,14 @@ async Task IDurableEntityClient.CleanEntityStorageAsyn
tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
}
- if (removeEmptyEntities && !status.EntityExists && status.LockedBy == null && status.QueueSize == 0
- && now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.config.Options.EntityMessageReorderWindowInMinutes))
+ if (removeEmptyEntities)
{
- tasks.Add(DeleteIdleOrchestrationEntity(state));
+ bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0;
+ bool safeToRemoveWithoutBreakingMessageSorterLogic = now - state.LastUpdatedTime > this.config.MessageReorderWindow;
+ if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
+ {
+ tasks.Add(DeleteIdleOrchestrationEntity(state));
+ }
}
}
diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs
index 8e4a5fdca..85b1dc00f 100644
--- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs
+++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs
@@ -22,7 +22,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)
{
this.Config = config ?? throw new ArgumentNullException(nameof(config));
this.FunctionName = functionName;
- this.EntityMessageReorderWindow = TimeSpan.FromMinutes(config.Options.EntityMessageReorderWindowInMinutes);
}
internal DurableTaskExtension Config { get; }
@@ -41,8 +40,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)
internal string Name => this.FunctionName;
- internal TimeSpan EntityMessageReorderWindow { get; private set; }
-
internal bool ExecutorCalledBack { get; set; }
internal void AddDeferredTask(Func function)
diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs
index 14e5751fb..9c7711117 100644
--- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs
+++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs
@@ -539,7 +539,7 @@ internal void SendOutbox(OrchestrationContext innerContext, bool writeBackSucces
{
if (!operationMessage.EventContent.ScheduledTime.HasValue)
{
- this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.EntityMessageReorderWindow);
+ this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.Config.MessageReorderWindow);
}
this.Config.TraceHelper.SendingEntityMessage(
diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs
index 6f5710d57..63d4757bc 100644
--- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs
+++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs
@@ -1117,7 +1117,7 @@ internal void SendEntityMessage(OrchestrationInstance target, object eventConten
requestMessage,
target.InstanceId,
this.InnerContext.CurrentUtcDateTime,
- TimeSpan.FromMinutes(this.Config.Options.EntityMessageReorderWindowInMinutes));
+ this.Config.MessageReorderWindow);
eventName = EntityMessageEventNames.RequestMessageEventName;
}
diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
index 60d2ef972..856507a9b 100644
--- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
+++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
@@ -57,6 +57,16 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
///
public virtual bool SupportsEntities => false;
+ ///
+ /// Specifies whether the backend's WaitForOrchestration is implemented without polling.
+ ///
+ public virtual bool SupportsPollFreeWait => false;
+
+ ///
+ /// Specifies whether this backend delivers messages in order.
+ ///
+ public virtual bool GuaranteesOrderedDelivery => false;
+
///
/// JSON representation of configuration to emit in telemetry.
///
@@ -87,6 +97,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
///
public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems;
+ internal string GetBackendInfo()
+ {
+ return this.GetOrchestrationService().ToString();
+ }
+
private IOrchestrationService GetOrchestrationService()
{
if (this.innerService == null)
@@ -278,7 +293,7 @@ public virtual Task> GetAllOrchestrationStatesWithFilt
/// Returns a task which completes when the state has been fetched.
public virtual Task> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
- throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateAsync));
+ throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithInputsAsync));
}
///
@@ -418,5 +433,51 @@ internal virtual bool ConnectionNameMatches(DurabilityProvider durabilityProvide
{
return this.ConnectionName.Equals(durabilityProvider.ConnectionName);
}
+
+ ///
+ /// Returns the instance id of the entity scheduler for a given entity id.
+ ///
+ /// The entity id.
+ /// The instance id of the scheduler.
+ protected string GetSchedulerIdFromEntityId(EntityId entityId)
+ {
+ return EntityId.GetSchedulerIdFromEntityId(entityId);
+ }
+
+ ///
+ /// Reads the state of an entity from the serialized entity scheduler state.
+ ///
+ /// The orchestration state of the scheduler.
+ /// The serializer settings.
+ /// The serialized state of the entity.
+ /// true if the entity exists, false otherwise.
+ protected bool TryGetEntityStateFromSerializedSchedulerState(OrchestrationState state, JsonSerializerSettings serializerSettings, out string result)
+ {
+ if (state != null
+ && state.OrchestrationInstance != null
+ && state.Input != null)
+ {
+ var schedulerState = JsonConvert.DeserializeObject(state.Input, serializerSettings);
+
+ if (schedulerState.EntityExists)
+ {
+ result = schedulerState.EntityState;
+ return true;
+ }
+ }
+
+ result = null;
+ return false;
+ }
+
+ ///
+ /// Converts the DTFx representation of the orchestration state into the DF representation.
+ ///
+ /// The orchestration state.
+ /// The orchestration status.
+ protected DurableOrchestrationStatus ConvertOrchestrationStateToStatus(OrchestrationState orchestrationState)
+ {
+ return DurableClient.ConvertOrchestrationStateToStatus(orchestrationState);
+ }
}
}
diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
index c8845d312..53ffb849e 100644
--- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
+++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
@@ -189,6 +189,9 @@ public string HubName
internal MessagePayloadDataConverter ErrorDataConverter { get; private set; }
+ internal TimeSpan MessageReorderWindow
+ => this.defaultDurabilityProvider.GuaranteesOrderedDelivery ? TimeSpan.Zero : TimeSpan.FromMinutes(this.Options.EntityMessageReorderWindowInMinutes);
+
internal static MessagePayloadDataConverter CreateMessageDataConverter(IMessageSerializerSettingsFactory messageSerializerSettingsFactory)
{
bool isDefault;
@@ -215,6 +218,11 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}
+ internal string GetBackendInfo()
+ {
+ return this.defaultDurabilityProvider.GetBackendInfo();
+ }
+
///
/// Internal initialization call from the WebJobs host.
///
@@ -725,7 +733,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
else
{
// run this through the message sorter to help with reordering and duplicate filtering
- deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, entityContext.EntityMessageReorderWindow);
+ deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow);
}
foreach (var message in deliverNow)
diff --git a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
index 891b46f6e..148e679bc 100644
--- a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
+++ b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
@@ -181,9 +181,34 @@ private static TemplateMatcher GetInstanceRaiseEventRoute()
IDurableOrchestrationClient client = this.GetClient(request);
Stopwatch stopwatch = Stopwatch.StartNew();
+
+ // This retry loop completes either when the
+ // orchestration has completed, or when the timeout is reached.
while (true)
{
- DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId);
+ DurableOrchestrationStatus status = null;
+
+ if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
+ {
+ // For durability providers that support efficient (poll-free) waiting, we take advantage of that API
+ try
+ {
+ var state = await durableClient.DurabilityProvider.WaitForOrchestrationAsync(instanceId, null, timeout, CancellationToken.None);
+ status = DurableClient.ConvertOrchestrationStateToStatus(state);
+ }
+ catch (TimeoutException)
+ {
+ // The orchestration did not complete.
+ // Depending on the implementation of the backend, we may get here before the full timeout has elapsed,
+ // so we recheck how much time has elapsed below, and retry if there is time left.
+ }
+ }
+ else
+ {
+ // For durability providers that do not support efficient (poll-free) waiting, we do explicit retries.
+ status = await client.GetStatusAsync(instanceId);
+ }
+
if (status != null)
{
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
@@ -706,6 +731,12 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString
TimeSpan? timeout = GetTimeSpan(request, "timeout");
TimeSpan? pollingInterval = GetTimeSpan(request, "pollingInterval");
+ // for durability providers that support poll-free waiting, we override the specified polling interval
+ if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
+ {
+ pollingInterval = timeout;
+ }
+
if (timeout.HasValue && pollingInterval.HasValue)
{
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, pollingInterval.Value);