Skip to content

Commit

Permalink
revise initial check in, so it contains only the changes needed for i…
Browse files Browse the repository at this point in the history
…mplementing the Netherite provider externally.
  • Loading branch information
sebastianburckhardt committed Oct 29, 2020
1 parent 044d234 commit e2d768a
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ internal class DurableClient : IDurableClient,

string IDurableEntityClient.TaskHubName => this.TaskHubName;

public override string ToString()
{
return $"DurableClient[backend={this.config.GetBackendInfo()}]";
}

/// <inheritdoc />
HttpResponseMessage IDurableOrchestrationClient.CreateCheckStatusResponse(HttpRequestMessage request, string instanceId, bool returnInternalServerErrorOnFailure)
{
Expand Down Expand Up @@ -530,7 +535,7 @@ async Task<CleanEntityStorageResult> IDurableEntityClient.CleanEntityStorageAsyn
}

if (removeEmptyEntities && !status.EntityExists && status.LockedBy == null && status.QueueSize == 0
&& now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.config.Options.EntityMessageReorderWindowInMinutes))
&& now - state.LastUpdatedTime > this.config.MessageReorderWindow)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)
{
this.Config = config ?? throw new ArgumentNullException(nameof(config));
this.FunctionName = functionName;
this.EntityMessageReorderWindow = TimeSpan.FromMinutes(config.Options.EntityMessageReorderWindowInMinutes);
}

internal DurableTaskExtension Config { get; }
Expand All @@ -41,8 +40,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)

internal string Name => this.FunctionName;

internal TimeSpan EntityMessageReorderWindow { get; private set; }

internal bool ExecutorCalledBack { get; set; }

internal void AddDeferredTask(Func<Task> function)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ internal void SendOutbox(OrchestrationContext innerContext, bool writeBackSucces
{
if (!operationMessage.EventContent.ScheduledTime.HasValue)
{
this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.EntityMessageReorderWindow);
this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.Config.MessageReorderWindow);
}

this.Config.TraceHelper.SendingEntityMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ internal void SendEntityMessage(OrchestrationInstance target, object eventConten
requestMessage,
target.InstanceId,
this.InnerContext.CurrentUtcDateTime,
TimeSpan.FromMinutes(this.Config.Options.EntityMessageReorderWindowInMinutes));
this.Config.MessageReorderWindow);

eventName = EntityMessageEventNames.RequestMessageEventName;
}
Expand Down
63 changes: 62 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// </summary>
public virtual bool SupportsEntities => false;

/// <summary>
/// Specifies whether the backend's WaitForOrchestration is implemented without polling.
/// </summary>
public virtual bool SupportsPollFreeWait => false;

/// <summary>
/// Specifies whether this backend delivers messages in order.
/// </summary>
public virtual bool GuaranteesOrderedDelivery => false;

/// <summary>
/// JSON representation of configuration to emit in telemetry.
/// </summary>
Expand Down Expand Up @@ -87,6 +97,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// <inheritdoc/>
public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems;

internal string GetBackendInfo()
{
return this.GetOrchestrationService().ToString();
}

private IOrchestrationService GetOrchestrationService()
{
if (this.innerService == null)
Expand Down Expand Up @@ -278,7 +293,7 @@ public virtual Task<IList<OrchestrationState>> GetAllOrchestrationStatesWithFilt
/// <returns>Returns a task which completes when the state has been fetched.</returns>
public virtual Task<IList<OrchestrationState>> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateAsync));
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithInputsAsync));
}

/// <summary>
Expand Down Expand Up @@ -418,5 +433,51 @@ internal virtual bool ConnectionNameMatches(DurabilityProvider durabilityProvide
{
return this.ConnectionName.Equals(durabilityProvider.ConnectionName);
}

/// <summary>
/// Returns the instance id of the entity scheduler for a given entity id.
/// </summary>
/// <param name="entityId">The entity id.</param>
/// <returns>The instance id of the scheduler.</returns>
protected string GetSchedulerIdFromEntityId(EntityId entityId)
{
return EntityId.GetSchedulerIdFromEntityId(entityId);
}

/// <summary>
/// Reads the state of an entity from the serialized entity scheduler state.
/// </summary>
/// <param name="state">The orchestration state of the scheduler.</param>
/// <param name="serializerSettings">The serializer settings.</param>
/// <param name="result">The serialized state of the entity.</param>
/// <returns>true if the entity exists, false otherwise.</returns>
protected bool TryGetEntityStateFromSerializedSchedulerState(OrchestrationState state, JsonSerializerSettings serializerSettings, out string result)
{
if (state != null
&& state.OrchestrationInstance != null
&& state.Input != null)
{
var schedulerState = JsonConvert.DeserializeObject<SchedulerState>(state.Input, serializerSettings);

if (schedulerState.EntityExists)
{
result = schedulerState.EntityState;
return true;
}
}

result = null;
return false;
}

/// <summary>
/// Converts the DTFx representation of the orchestration state into the DF representation.
/// </summary>
/// <param name="orchestrationState">The orchestration state.</param>
/// <returns>The orchestration status.</returns>
protected DurableOrchestrationStatus ConvertOrchestrationStateToStatus(OrchestrationState orchestrationState)
{
return DurableClient.ConvertOrchestrationStateToStatus(orchestrationState);
}
}
}
10 changes: 9 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ public string HubName

internal MessagePayloadDataConverter ErrorDataConverter { get; private set; }

internal TimeSpan MessageReorderWindow
=> this.defaultDurabilityProvider.GuaranteesOrderedDelivery ? TimeSpan.Zero : TimeSpan.FromMinutes(this.Options.EntityMessageReorderWindowInMinutes);

private MessagePayloadDataConverter CreateMessageDataConverter(IMessageSerializerSettingsFactory messageSerializerSettingsFactory)
{
bool isDefault;
Expand All @@ -215,6 +218,11 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}

internal string GetBackendInfo()
{
return this.defaultDurabilityProvider.GetBackendInfo();
}

/// <summary>
/// Internal initialization call from the WebJobs host.
/// </summary>
Expand Down Expand Up @@ -725,7 +733,7 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
else
{
// run this through the message sorter to help with reordering and duplicate filtering
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, entityContext.EntityMessageReorderWindow);
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow);
}

foreach (var message in deliverNow)
Expand Down
23 changes: 22 additions & 1 deletion src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,24 @@ private static TemplateMatcher GetInstanceRaiseEventRoute()
Stopwatch stopwatch = Stopwatch.StartNew();
while (true)
{
DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId);
DurableOrchestrationStatus status = null;

if (client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
{
try
{
var state = await durableClient.DurabilityProvider.WaitForOrchestrationAsync(instanceId, null, timeout, CancellationToken.None);
status = DurableClient.ConvertOrchestrationStateToStatus(state);
}
catch (TimeoutException)
{
}
}
else
{
status = await client.GetStatusAsync(instanceId);
}

if (status != null)
{
if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
Expand Down Expand Up @@ -704,6 +721,10 @@ private static bool TryGetIntQueryParameterValue(NameValueCollection queryString
{
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, pollingInterval.Value);
}
else if (timeout.HasValue && client is DurableClient durableClient && durableClient.DurabilityProvider.SupportsPollFreeWait)
{
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, timeout.Value);
}
else
{
return client.CreateCheckStatusResponse(request, id);
Expand Down

0 comments on commit e2d768a

Please sign in to comment.