diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs index 4c73ec38f..d4d6d1140 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs @@ -71,6 +71,11 @@ internal class DurableClient : IDurableClient, string IDurableEntityClient.TaskHubName => this.TaskHubName; + public override string ToString() + { + return $"DurableClient[backend={this.config.GetBackendInfo()}]"; + } + /// HttpResponseMessage IDurableOrchestrationClient.CreateCheckStatusResponse(HttpRequestMessage request, string instanceId, bool returnInternalServerErrorOnFailure) { @@ -530,7 +535,7 @@ async Task IDurableEntityClient.CleanEntityStorageAsyn } if (removeEmptyEntities && !status.EntityExists && status.LockedBy == null && status.QueueSize == 0 - && now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.config.Options.EntityMessageReorderWindowInMinutes)) + && now - state.LastUpdatedTime > this.config.MessageReorderWindow) { tasks.Add(DeleteIdleOrchestrationEntity(state)); } diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs index 8e4a5fdca..85b1dc00f 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableCommonContext.cs @@ -22,7 +22,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName) { this.Config = config ?? throw new ArgumentNullException(nameof(config)); this.FunctionName = functionName; - this.EntityMessageReorderWindow = TimeSpan.FromMinutes(config.Options.EntityMessageReorderWindowInMinutes); } internal DurableTaskExtension Config { get; } @@ -41,8 +40,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName) internal string Name => this.FunctionName; - internal TimeSpan EntityMessageReorderWindow { get; private set; } - internal bool ExecutorCalledBack { get; set; } internal void AddDeferredTask(Func function) diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs index 14e5751fb..9c7711117 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs @@ -539,7 +539,7 @@ internal void SendOutbox(OrchestrationContext innerContext, bool writeBackSucces { if (!operationMessage.EventContent.ScheduledTime.HasValue) { - this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.EntityMessageReorderWindow); + this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.Config.MessageReorderWindow); } this.Config.TraceHelper.SendingEntityMessage( diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs index f87a5bfed..883794e8c 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs @@ -1132,7 +1132,7 @@ internal void SendEntityMessage(OrchestrationInstance target, object eventConten requestMessage, target.InstanceId, this.InnerContext.CurrentUtcDateTime, - TimeSpan.FromMinutes(this.Config.Options.EntityMessageReorderWindowInMinutes)); + this.Config.MessageReorderWindow); eventName = EntityMessageEventNames.RequestMessageEventName; } diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs index 60d2ef972..856507a9b 100644 --- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs @@ -57,6 +57,16 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv /// public virtual bool SupportsEntities => false; + /// + /// Specifies whether the backend's WaitForOrchestration is implemented without polling. + /// + public virtual bool SupportsPollFreeWait => false; + + /// + /// Specifies whether this backend delivers messages in order. + /// + public virtual bool GuaranteesOrderedDelivery => false; + /// /// JSON representation of configuration to emit in telemetry. /// @@ -87,6 +97,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv /// public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems; + internal string GetBackendInfo() + { + return this.GetOrchestrationService().ToString(); + } + private IOrchestrationService GetOrchestrationService() { if (this.innerService == null) @@ -278,7 +293,7 @@ public virtual Task> GetAllOrchestrationStatesWithFilt /// Returns a task which completes when the state has been fetched. public virtual Task> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true) { - throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateAsync)); + throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithInputsAsync)); } /// @@ -418,5 +433,51 @@ internal virtual bool ConnectionNameMatches(DurabilityProvider durabilityProvide { return this.ConnectionName.Equals(durabilityProvider.ConnectionName); } + + /// + /// Returns the instance id of the entity scheduler for a given entity id. + /// + /// The entity id. + /// The instance id of the scheduler. + protected string GetSchedulerIdFromEntityId(EntityId entityId) + { + return EntityId.GetSchedulerIdFromEntityId(entityId); + } + + /// + /// Reads the state of an entity from the serialized entity scheduler state. + /// + /// The orchestration state of the scheduler. + /// The serializer settings. + /// The serialized state of the entity. + /// true if the entity exists, false otherwise. + protected bool TryGetEntityStateFromSerializedSchedulerState(OrchestrationState state, JsonSerializerSettings serializerSettings, out string result) + { + if (state != null + && state.OrchestrationInstance != null + && state.Input != null) + { + var schedulerState = JsonConvert.DeserializeObject(state.Input, serializerSettings); + + if (schedulerState.EntityExists) + { + result = schedulerState.EntityState; + return true; + } + } + + result = null; + return false; + } + + /// + /// Converts the DTFx representation of the orchestration state into the DF representation. + /// + /// The orchestration state. + /// The orchestration status. + protected DurableOrchestrationStatus ConvertOrchestrationStateToStatus(OrchestrationState orchestrationState) + { + return DurableClient.ConvertOrchestrationStateToStatus(orchestrationState); + } } } diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index 56bb497b6..600d43e97 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -189,6 +189,9 @@ public string HubName internal MessagePayloadDataConverter ErrorDataConverter { get; private set; } + internal TimeSpan MessageReorderWindow + => this.defaultDurabilityProvider.GuaranteesOrderedDelivery ? TimeSpan.Zero : TimeSpan.FromMinutes(this.Options.EntityMessageReorderWindowInMinutes); + private MessagePayloadDataConverter CreateMessageDataConverter(IMessageSerializerSettingsFactory messageSerializerSettingsFactory) { bool isDefault; @@ -215,6 +218,11 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault); } + internal string GetBackendInfo() + { + return this.defaultDurabilityProvider.GetBackendInfo(); + } + /// /// Internal initialization call from the WebJobs host. /// @@ -725,7 +733,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F else { // run this through the message sorter to help with reordering and duplicate filtering - deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, entityContext.EntityMessageReorderWindow); + deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow); } foreach (var message in deliverNow) diff --git a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs index 5c3894eea..fca091437 100644 --- a/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs +++ b/src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs @@ -169,7 +169,24 @@ private static TemplateMatcher GetInstanceRaiseEventRoute() Stopwatch stopwatch = Stopwatch.StartNew(); while (true) { - DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId); + DurableOrchestrationStatus status = null; + + if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait) + { + try + { + var state = await durableClient.DurabilityProvider.WaitForOrchestrationAsync(instanceId, null, timeout, CancellationToken.None); + status = DurableClient.ConvertOrchestrationStateToStatus(state); + } + catch (TimeoutException) + { + } + } + else + { + status = await client.GetStatusAsync(instanceId); + } + if (status != null) { if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed) @@ -704,6 +721,10 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString { return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, pollingInterval.Value); } + else if (timeout.HasValue && client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait) + { + return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, timeout.Value); + } else { return client.CreateCheckStatusResponse(request, id);