Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Netherite provider to be implemented separately #1541

Merged
merged 2 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ internal class DurableClient : IDurableClient,

string IDurableEntityClient.TaskHubName => this.TaskHubName;

/// <inheritdoc/>
public override string ToString()
davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved
{
return $"DurableClient[backend={this.config.GetBackendInfo()}]";
}

/// <inheritdoc />
HttpResponseMessage IDurableOrchestrationClient.CreateCheckStatusResponse(HttpRequestMessage request, string instanceId, bool returnInternalServerErrorOnFailure)
{
Expand Down Expand Up @@ -541,10 +547,14 @@ async Task<CleanEntityStorageResult> IDurableEntityClient.CleanEntityStorageAsyn
tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
}

if (removeEmptyEntities && !status.EntityExists && status.LockedBy == null && status.QueueSize == 0
&& now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.config.Options.EntityMessageReorderWindowInMinutes))
if (removeEmptyEntities)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0;
bool safeToRemoveWithoutBreakingMessageSorterLogic = now - state.LastUpdatedTime > this.config.MessageReorderWindow;
if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)
{
this.Config = config ?? throw new ArgumentNullException(nameof(config));
this.FunctionName = functionName;
this.EntityMessageReorderWindow = TimeSpan.FromMinutes(config.Options.EntityMessageReorderWindowInMinutes);
}

internal DurableTaskExtension Config { get; }
Expand All @@ -41,8 +40,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)

internal string Name => this.FunctionName;

internal TimeSpan EntityMessageReorderWindow { get; private set; }

internal bool ExecutorCalledBack { get; set; }

internal void AddDeferredTask(Func<Task> function)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ internal void SendOutbox(OrchestrationContext innerContext, bool writeBackSucces
{
if (!operationMessage.EventContent.ScheduledTime.HasValue)
{
this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.EntityMessageReorderWindow);
this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.Config.MessageReorderWindow);
}

this.Config.TraceHelper.SendingEntityMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ internal void SendEntityMessage(OrchestrationInstance target, object eventConten
requestMessage,
target.InstanceId,
this.InnerContext.CurrentUtcDateTime,
TimeSpan.FromMinutes(this.Config.Options.EntityMessageReorderWindowInMinutes));
this.Config.MessageReorderWindow);

eventName = EntityMessageEventNames.RequestMessageEventName;
}
Expand Down
17 changes: 16 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// </summary>
public virtual bool SupportsEntities => false;

/// <summary>
/// Specifies whether the backend's WaitForOrchestration is implemented without polling.
/// </summary>
public virtual bool SupportsPollFreeWait => false;

/// <summary>
/// Specifies whether this backend delivers messages in order.
/// </summary>
public virtual bool GuaranteesOrderedDelivery => false;

/// <summary>
/// JSON representation of configuration to emit in telemetry.
/// </summary>
Expand Down Expand Up @@ -87,6 +97,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// <inheritdoc/>
public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems;

internal string GetBackendInfo()
{
return this.GetOrchestrationService().ToString();
}

private IOrchestrationService GetOrchestrationService()
{
if (this.innerService == null)
Expand Down Expand Up @@ -278,7 +293,7 @@ public virtual Task<IList<OrchestrationState>> GetAllOrchestrationStatesWithFilt
/// <returns>Returns a task which completes when the state has been fetched.</returns>
public virtual Task<IList<OrchestrationState>> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateAsync));
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithInputsAsync));
}

/// <summary>
Expand Down
10 changes: 9 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ public string HubName

internal MessagePayloadDataConverter ErrorDataConverter { get; private set; }

internal TimeSpan MessageReorderWindow
=> this.defaultDurabilityProvider.GuaranteesOrderedDelivery ? TimeSpan.Zero : TimeSpan.FromMinutes(this.Options.EntityMessageReorderWindowInMinutes);

internal static MessagePayloadDataConverter CreateMessageDataConverter(IMessageSerializerSettingsFactory messageSerializerSettingsFactory)
{
bool isDefault;
Expand All @@ -215,6 +218,11 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}

internal string GetBackendInfo()
{
return this.defaultDurabilityProvider.GetBackendInfo();
}

/// <summary>
/// Internal initialization call from the WebJobs host.
/// </summary>
Expand Down Expand Up @@ -725,7 +733,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
else
{
// run this through the message sorter to help with reordering and duplicate filtering
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, entityContext.EntityMessageReorderWindow);
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow);
}

foreach (var message in deliverNow)
Expand Down
33 changes: 32 additions & 1 deletion src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,34 @@ private static TemplateMatcher GetInstanceRaiseEventRoute()

IDurableOrchestrationClient client = this.GetClient(request);
Stopwatch stopwatch = Stopwatch.StartNew();

// This retry loop completes either when the
// orchestration has completed, or when the timeout is reached.
while (true)
{
DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId);
DurableOrchestrationStatus status = null;

if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved
{
// For durability providers that support efficient (poll-free) waiting, we take advantage of that API
try
{
var state = await durableClient.DurabilityProvider.WaitForOrchestrationAsync(instanceId, null, timeout, CancellationToken.None);
status = DurableClient.ConvertOrchestrationStateToStatus(state);
}
catch (TimeoutException)
{
// The orchestration did not complete.
// Depending on the implementation of the backend, we may get here before the full timeout has elapsed,
// so we recheck how much time has elapsed below, and retry if there is time left.
}
}
else
{
// For durability providers that do not support efficient (poll-free) waiting, we do explicit retries.
status = await client.GetStatusAsync(instanceId);
}

if (status != null)
{
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
Expand Down Expand Up @@ -706,6 +731,12 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString
TimeSpan? timeout = GetTimeSpan(request, "timeout");
TimeSpan? pollingInterval = GetTimeSpan(request, "pollingInterval");

// for durability providers that support poll-free waiting, we override the specified polling interval
if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
{
pollingInterval = timeout;
}

if (timeout.HasValue && pollingInterval.HasValue)
{
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, pollingInterval.Value);
Expand Down
65 changes: 65 additions & 0 deletions src/WebJobs.Extensions.DurableTask/ProviderUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
/// <summary>
/// Provides access to internal functionality for the purpose of implementing durability providers.
/// </summary>
public static class ProviderUtils
{
/// <summary>
/// Returns the instance id of the entity scheduler for a given entity id.
/// </summary>
/// <param name="entityId">The entity id.</param>
/// <returns>The instance id of the scheduler.</returns>
public static string GetSchedulerIdFromEntityId(EntityId entityId)
{
return EntityId.GetSchedulerIdFromEntityId(entityId);
}

/// <summary>
/// Reads the state of an entity from the serialized entity scheduler state.
/// </summary>
/// <param name="state">The orchestration state of the scheduler.</param>
/// <param name="serializerSettings">The serializer settings.</param>
/// <param name="result">The serialized state of the entity.</param>
/// <returns>true if the entity exists, false otherwise.</returns>
public static bool TryGetEntityStateFromSerializedSchedulerState(OrchestrationState state, JsonSerializerSettings serializerSettings, out string result)
{
if (state != null
&& state.OrchestrationInstance != null
&& state.Input != null)
{
var schedulerState = JsonConvert.DeserializeObject<SchedulerState>(state.Input, serializerSettings);

if (schedulerState.EntityExists)
{
result = schedulerState.EntityState;
return true;
}
}

result = null;
return false;
}

/// <summary>
/// Converts the DTFx representation of the orchestration state into the DF representation.
/// </summary>
/// <param name="orchestrationState">The orchestration state.</param>
/// <returns>The orchestration status.</returns>
public static DurableOrchestrationStatus ConvertOrchestrationStateToStatus(OrchestrationState orchestrationState)
{
return DurableClient.ConvertOrchestrationStateToStatus(orchestrationState);
}
}
}