Skip to content

Commit

Permalink
merge with latest dev and squash, to fix diff issues
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt committed Nov 12, 2020
1 parent 6c7873d commit fe94721
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ internal class DurableClient : IDurableClient,

string IDurableEntityClient.TaskHubName => this.TaskHubName;

/// <inheritdoc/>
public override string ToString()
{
return $"DurableClient[backend={this.config.GetBackendInfo()}]";
}

/// <inheritdoc />
HttpResponseMessage IDurableOrchestrationClient.CreateCheckStatusResponse(HttpRequestMessage request, string instanceId, bool returnInternalServerErrorOnFailure)
{
Expand Down Expand Up @@ -541,10 +547,14 @@ async Task<CleanEntityStorageResult> IDurableEntityClient.CleanEntityStorageAsyn
tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
}

if (removeEmptyEntities && !status.EntityExists && status.LockedBy == null && status.QueueSize == 0
&& now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.config.Options.EntityMessageReorderWindowInMinutes))
if (removeEmptyEntities)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.QueueSize == 0;
bool safeToRemoveWithoutBreakingMessageSorterLogic = now - state.LastUpdatedTime > this.config.MessageReorderWindow;
if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)
{
this.Config = config ?? throw new ArgumentNullException(nameof(config));
this.FunctionName = functionName;
this.EntityMessageReorderWindow = TimeSpan.FromMinutes(config.Options.EntityMessageReorderWindowInMinutes);
}

internal DurableTaskExtension Config { get; }
Expand All @@ -41,8 +40,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)

internal string Name => this.FunctionName;

internal TimeSpan EntityMessageReorderWindow { get; private set; }

internal bool ExecutorCalledBack { get; set; }

internal void AddDeferredTask(Func<Task> function)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ internal void SendOutbox(OrchestrationContext innerContext, bool writeBackSucces
{
if (!operationMessage.EventContent.ScheduledTime.HasValue)
{
this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.EntityMessageReorderWindow);
this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.Config.MessageReorderWindow);
}

this.Config.TraceHelper.SendingEntityMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ internal void SendEntityMessage(OrchestrationInstance target, object eventConten
requestMessage,
target.InstanceId,
this.InnerContext.CurrentUtcDateTime,
TimeSpan.FromMinutes(this.Config.Options.EntityMessageReorderWindowInMinutes));
this.Config.MessageReorderWindow);

eventName = EntityMessageEventNames.RequestMessageEventName;
}
Expand Down
63 changes: 62 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// </summary>
public virtual bool SupportsEntities => false;

/// <summary>
/// Specifies whether the backend's WaitForOrchestration is implemented without polling.
/// </summary>
public virtual bool SupportsPollFreeWait => false;

/// <summary>
/// Specifies whether this backend delivers messages in order.
/// </summary>
public virtual bool GuaranteesOrderedDelivery => false;

/// <summary>
/// JSON representation of configuration to emit in telemetry.
/// </summary>
Expand Down Expand Up @@ -87,6 +97,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// <inheritdoc/>
public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems;

internal string GetBackendInfo()
{
return this.GetOrchestrationService().ToString();
}

private IOrchestrationService GetOrchestrationService()
{
if (this.innerService == null)
Expand Down Expand Up @@ -278,7 +293,7 @@ public virtual Task<IList<OrchestrationState>> GetAllOrchestrationStatesWithFilt
/// <returns>Returns a task which completes when the state has been fetched.</returns>
public virtual Task<IList<OrchestrationState>> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateAsync));
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithInputsAsync));
}

/// <summary>
Expand Down Expand Up @@ -418,5 +433,51 @@ internal virtual bool ConnectionNameMatches(DurabilityProvider durabilityProvide
{
return this.ConnectionName.Equals(durabilityProvider.ConnectionName);
}

/// <summary>
/// Returns the instance id of the entity scheduler for a given entity id.
/// </summary>
/// <param name="entityId">The entity id.</param>
/// <returns>The instance id of the scheduler.</returns>
protected string GetSchedulerIdFromEntityId(EntityId entityId)
{
return EntityId.GetSchedulerIdFromEntityId(entityId);
}

/// <summary>
/// Reads the state of an entity from the serialized entity scheduler state.
/// </summary>
/// <param name="state">The orchestration state of the scheduler.</param>
/// <param name="serializerSettings">The serializer settings.</param>
/// <param name="result">The serialized state of the entity.</param>
/// <returns>true if the entity exists, false otherwise.</returns>
protected bool TryGetEntityStateFromSerializedSchedulerState(OrchestrationState state, JsonSerializerSettings serializerSettings, out string result)
{
if (state != null
&& state.OrchestrationInstance != null
&& state.Input != null)
{
var schedulerState = JsonConvert.DeserializeObject<SchedulerState>(state.Input, serializerSettings);

if (schedulerState.EntityExists)
{
result = schedulerState.EntityState;
return true;
}
}

result = null;
return false;
}

/// <summary>
/// Converts the DTFx representation of the orchestration state into the DF representation.
/// </summary>
/// <param name="orchestrationState">The orchestration state.</param>
/// <returns>The orchestration status.</returns>
protected DurableOrchestrationStatus ConvertOrchestrationStateToStatus(OrchestrationState orchestrationState)
{
return DurableClient.ConvertOrchestrationStateToStatus(orchestrationState);
}
}
}
10 changes: 9 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ public string HubName

internal MessagePayloadDataConverter ErrorDataConverter { get; private set; }

internal TimeSpan MessageReorderWindow
=> this.defaultDurabilityProvider.GuaranteesOrderedDelivery ? TimeSpan.Zero : TimeSpan.FromMinutes(this.Options.EntityMessageReorderWindowInMinutes);

internal static MessagePayloadDataConverter CreateMessageDataConverter(IMessageSerializerSettingsFactory messageSerializerSettingsFactory)
{
bool isDefault;
Expand All @@ -215,6 +218,11 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}

internal string GetBackendInfo()
{
return this.defaultDurabilityProvider.GetBackendInfo();
}

/// <summary>
/// Internal initialization call from the WebJobs host.
/// </summary>
Expand Down Expand Up @@ -725,7 +733,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
else
{
// run this through the message sorter to help with reordering and duplicate filtering
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, entityContext.EntityMessageReorderWindow);
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow);
}

foreach (var message in deliverNow)
Expand Down
33 changes: 32 additions & 1 deletion src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,34 @@ private static TemplateMatcher GetInstanceRaiseEventRoute()

IDurableOrchestrationClient client = this.GetClient(request);
Stopwatch stopwatch = Stopwatch.StartNew();

// This retry loop completes either when the
// orchestration has completed, or when the timeout is reached.
while (true)
{
DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId);
DurableOrchestrationStatus status = null;

if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
{
// For durability providers that support efficient (poll-free) waiting, we take advantage of that API
try
{
var state = await durableClient.DurabilityProvider.WaitForOrchestrationAsync(instanceId, null, timeout, CancellationToken.None);
status = DurableClient.ConvertOrchestrationStateToStatus(state);
}
catch (TimeoutException)
{
// The orchestration did not complete.
// Depending on the implementation of the backend, we may get here before the full timeout has elapsed,
// so we recheck how much time has elapsed below, and retry if there is time left.
}
}
else
{
// For durability providers that do not support efficient (poll-free) waiting, we do explicit retries.
status = await client.GetStatusAsync(instanceId);
}

if (status != null)
{
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
Expand Down Expand Up @@ -706,6 +731,12 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString
TimeSpan? timeout = GetTimeSpan(request, "timeout");
TimeSpan? pollingInterval = GetTimeSpan(request, "pollingInterval");

// for durability providers that support poll-free waiting, we override the specified polling interval
if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
{
pollingInterval = timeout;
}

if (timeout.HasValue && pollingInterval.HasValue)
{
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, pollingInterval.Value);
Expand Down

0 comments on commit fe94721

Please sign in to comment.