diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs index bd9f47498..c1f0302e1 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs @@ -83,6 +83,12 @@ internal class DurableClient : IDurableClient, string IDurableEntityClient.TaskHubName => this.TaskHubName; + /// + public override string ToString() + { + return $"DurableClient[backend={this.config.GetBackendInfo()}]"; + } + /// HttpResponseMessage IDurableOrchestrationClient.CreateCheckStatusResponse(HttpRequestMessage request, string instanceId, bool returnInternalServerErrorOnFailure) { @@ -541,10 +547,14 @@ async Task IDurableEntityClient.CleanEntityStorageAsyn tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy)); } - if (removeEmptyEntities && !status.EntityExists && status.LockedBy == null && status.QueueSize == 0 - && now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.config.Options.EntityMessageReorderWindowInMinutes)) + if (removeEmptyEntities) { - tasks.Add(DeleteIdleOrchestrationEntity(state)); + bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0; + bool safeToRemoveWithoutBreakingMessageSorterLogic = now - state.LastUpdatedTime > this.config.MessageReorderWindow; + if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic) + { + tasks.Add(DeleteIdleOrchestrationEntity(state)); + } } } diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs index 8e4a5fdca..85b1dc00f 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs @@ -22,7 +22,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName) { this.Config = config ?? throw new ArgumentNullException(nameof(config)); this.FunctionName = functionName; - this.EntityMessageReorderWindow = TimeSpan.FromMinutes(config.Options.EntityMessageReorderWindowInMinutes); } internal DurableTaskExtension Config { get; } @@ -41,8 +40,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName) internal string Name => this.FunctionName; - internal TimeSpan EntityMessageReorderWindow { get; private set; } - internal bool ExecutorCalledBack { get; set; } internal void AddDeferredTask(Func function) diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs index 14e5751fb..9c7711117 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs @@ -539,7 +539,7 @@ internal void SendOutbox(OrchestrationContext innerContext, bool writeBackSucces { if (!operationMessage.EventContent.ScheduledTime.HasValue) { - this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.EntityMessageReorderWindow); + this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.Config.MessageReorderWindow); } this.Config.TraceHelper.SendingEntityMessage( diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs index 6f5710d57..63d4757bc 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs @@ -1117,7 +1117,7 @@ internal void SendEntityMessage(OrchestrationInstance target, object eventConten requestMessage, target.InstanceId, this.InnerContext.CurrentUtcDateTime, - TimeSpan.FromMinutes(this.Config.Options.EntityMessageReorderWindowInMinutes)); + this.Config.MessageReorderWindow); eventName = EntityMessageEventNames.RequestMessageEventName; } diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs index 60d2ef972..702b4f047 100644 --- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs @@ -57,6 +57,16 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv /// public virtual bool SupportsEntities => false; + /// + /// Specifies whether the backend's WaitForOrchestration is implemented without polling. + /// + public virtual bool SupportsPollFreeWait => false; + + /// + /// Specifies whether this backend delivers messages in order. + /// + public virtual bool GuaranteesOrderedDelivery => false; + /// /// JSON representation of configuration to emit in telemetry. /// @@ -87,6 +97,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv /// public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems; + internal string GetBackendInfo() + { + return this.GetOrchestrationService().ToString(); + } + private IOrchestrationService GetOrchestrationService() { if (this.innerService == null) @@ -278,7 +293,7 @@ public virtual Task> GetAllOrchestrationStatesWithFilt /// Returns a task which completes when the state has been fetched. public virtual Task> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true) { - throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateAsync)); + throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithInputsAsync)); } /// diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index c8845d312..53ffb849e 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -189,6 +189,9 @@ public string HubName internal MessagePayloadDataConverter ErrorDataConverter { get; private set; } + internal TimeSpan MessageReorderWindow + => this.defaultDurabilityProvider.GuaranteesOrderedDelivery ? TimeSpan.Zero : TimeSpan.FromMinutes(this.Options.EntityMessageReorderWindowInMinutes); + internal static MessagePayloadDataConverter CreateMessageDataConverter(IMessageSerializerSettingsFactory messageSerializerSettingsFactory) { bool isDefault; @@ -215,6 +218,11 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault); } + internal string GetBackendInfo() + { + return this.defaultDurabilityProvider.GetBackendInfo(); + } + /// /// Internal initialization call from the WebJobs host. /// @@ -725,7 +733,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F else { // run this through the message sorter to help with reordering and duplicate filtering - deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, entityContext.EntityMessageReorderWindow); + deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow); } foreach (var message in deliverNow) diff --git a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs index 891b46f6e..148e679bc 100644 --- a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs +++ b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs @@ -181,9 +181,34 @@ private static TemplateMatcher GetInstanceRaiseEventRoute() IDurableOrchestrationClient client = this.GetClient(request); Stopwatch stopwatch = Stopwatch.StartNew(); + + // This retry loop completes either when the + // orchestration has completed, or when the timeout is reached. while (true) { - DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId); + DurableOrchestrationStatus status = null; + + if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait) + { + // For durability providers that support efficient (poll-free) waiting, we take advantage of that API + try + { + var state = await durableClient.DurabilityProvider.WaitForOrchestrationAsync(instanceId, null, timeout, CancellationToken.None); + status = DurableClient.ConvertOrchestrationStateToStatus(state); + } + catch (TimeoutException) + { + // The orchestration did not complete. + // Depending on the implementation of the backend, we may get here before the full timeout has elapsed, + // so we recheck how much time has elapsed below, and retry if there is time left. + } + } + else + { + // For durability providers that do not support efficient (poll-free) waiting, we do explicit retries. + status = await client.GetStatusAsync(instanceId); + } + if (status != null) { if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed) @@ -706,6 +731,12 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString TimeSpan? timeout = GetTimeSpan(request, "timeout"); TimeSpan? pollingInterval = GetTimeSpan(request, "pollingInterval"); + // for durability providers that support poll-free waiting, we override the specified polling interval + if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait) + { + pollingInterval = timeout; + } + if (timeout.HasValue && pollingInterval.HasValue) { return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, pollingInterval.Value); diff --git a/src/WebJobs.Extensions.DurableTask/ProviderUtils.cs b/src/WebJobs.Extensions.DurableTask/ProviderUtils.cs new file mode 100644 index 000000000..cf1668a76 --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/ProviderUtils.cs @@ -0,0 +1,65 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using DurableTask.Core; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask +{ + /// + /// Provides access to internal functionality for the purpose of implementing durability providers. + /// + public static class ProviderUtils + { + /// + /// Returns the instance id of the entity scheduler for a given entity id. + /// + /// The entity id. + /// The instance id of the scheduler. + public static string GetSchedulerIdFromEntityId(EntityId entityId) + { + return EntityId.GetSchedulerIdFromEntityId(entityId); + } + + /// + /// Reads the state of an entity from the serialized entity scheduler state. + /// + /// The orchestration state of the scheduler. + /// The serializer settings. + /// The serialized state of the entity. + /// true if the entity exists, false otherwise. + public static bool TryGetEntityStateFromSerializedSchedulerState(OrchestrationState state, JsonSerializerSettings serializerSettings, out string result) + { + if (state != null + && state.OrchestrationInstance != null + && state.Input != null) + { + var schedulerState = JsonConvert.DeserializeObject(state.Input, serializerSettings); + + if (schedulerState.EntityExists) + { + result = schedulerState.EntityState; + return true; + } + } + + result = null; + return false; + } + + /// + /// Converts the DTFx representation of the orchestration state into the DF representation. + /// + /// The orchestration state. + /// The orchestration status. + public static DurableOrchestrationStatus ConvertOrchestrationStateToStatus(OrchestrationState orchestrationState) + { + return DurableClient.ConvertOrchestrationStateToStatus(orchestrationState); + } + } +}