Skip to content

Commit

Permalink
inital checkin for netherite.
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt committed Oct 27, 2020
1 parent 044d234 commit 4e6b170
Show file tree
Hide file tree
Showing 20 changed files with 873 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ internal class DurableClient : IDurableClient,

string IDurableEntityClient.TaskHubName => this.TaskHubName;

public override string ToString()
{
return $"DurableClient[backend={this.config.GetBackendInfo()}]";
}

/// <inheritdoc />
HttpResponseMessage IDurableOrchestrationClient.CreateCheckStatusResponse(HttpRequestMessage request, string instanceId, bool returnInternalServerErrorOnFailure)
{
Expand Down Expand Up @@ -530,7 +535,7 @@ async Task<CleanEntityStorageResult> IDurableEntityClient.CleanEntityStorageAsyn
}

if (removeEmptyEntities && !status.EntityExists && status.LockedBy == null && status.QueueSize == 0
&& now - state.LastUpdatedTime > TimeSpan.FromMinutes(this.config.Options.EntityMessageReorderWindowInMinutes))
&& now - state.LastUpdatedTime > this.config.MessageReorderWindow)
{
tasks.Add(DeleteIdleOrchestrationEntity(state));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Core.History;
Expand All @@ -22,7 +23,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)
{
this.Config = config ?? throw new ArgumentNullException(nameof(config));
this.FunctionName = functionName;
this.EntityMessageReorderWindow = TimeSpan.FromMinutes(config.Options.EntityMessageReorderWindowInMinutes);
}

internal DurableTaskExtension Config { get; }
Expand All @@ -41,8 +41,6 @@ internal DurableCommonContext(DurableTaskExtension config, string functionName)

internal string Name => this.FunctionName;

internal TimeSpan EntityMessageReorderWindow { get; private set; }

internal bool ExecutorCalledBack { get; set; }

internal void AddDeferredTask(Func<Task> function)
Expand All @@ -55,5 +53,11 @@ internal async Task RunDeferredTasks()
await Task.WhenAll(this.deferredTasks.Select(x => x()));
this.deferredTasks.Clear();
}

[Conditional("false")]
internal void TraceWorkItemProgress(string format, object arg)
{
// TODO hook this up with tracing in the backend when it is implemented
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ internal void SendOutbox(OrchestrationContext innerContext, bool writeBackSucces
{
if (!operationMessage.EventContent.ScheduledTime.HasValue)
{
this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.EntityMessageReorderWindow);
this.State.MessageSorter.LabelOutgoingMessage(operationMessage.EventContent, operationMessage.Target.InstanceId, DateTime.UtcNow, this.Config.MessageReorderWindow);
}

this.Config.TraceHelper.SendingEntityMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ internal void SendEntityMessage(OrchestrationInstance target, object eventConten
requestMessage,
target.InstanceId,
this.InnerContext.CurrentUtcDateTime,
TimeSpan.FromMinutes(this.Config.Options.EntityMessageReorderWindowInMinutes));
this.Config.MessageReorderWindow);

eventName = EntityMessageEventNames.RequestMessageEventName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,7 @@ public interface IDurableOrchestrationContext
/// </summary>
/// <param name="ownedLocks">The collection of owned locks.</param>
/// <remarks>
/// Note that the collection of owned locks can be empty even if the context is locked. This happens
/// if an orchestration calls a suborchestration without lending any locks.
/// Note that the collection of owned locks can be empty even if the context is locked.
/// </remarks>
/// <returns><c>true</c> if the context already holds some locks.</returns>
bool IsLocked(out IReadOnlyList<EntityId> ownedLocks);
Expand Down
19 changes: 17 additions & 2 deletions src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// </summary>
public virtual bool SupportsEntities => false;

/// <summary>
/// Specifies whether the backend's WaitForOrchestration is implemented without polling.
/// </summary>
public virtual bool SupportsPollFreeWait => false;

/// <summary>
/// Specifies whether this backend delivers messages in order.
/// </summary>
public virtual bool GuaranteesOrderedDelivery => false;

/// <summary>
/// JSON representation of configuration to emit in telemetry.
/// </summary>
Expand Down Expand Up @@ -87,6 +97,11 @@ public DurabilityProvider(string storageProviderName, IOrchestrationService serv
/// <inheritdoc/>
public int MaxConcurrentTaskActivityWorkItems => this.GetOrchestrationService().MaxConcurrentTaskActivityWorkItems;

internal string GetBackendInfo()
{
return this.GetOrchestrationService().ToString();
}

private IOrchestrationService GetOrchestrationService()
{
if (this.innerService == null)
Expand Down Expand Up @@ -236,7 +251,7 @@ public Task StopAsync()
}

/// <inheritdoc/>
public Task StopAsync(bool isForced)
public virtual Task StopAsync(bool isForced)
{
return this.GetOrchestrationService().StopAsync(isForced);
}
Expand Down Expand Up @@ -278,7 +293,7 @@ public virtual Task<IList<OrchestrationState>> GetAllOrchestrationStatesWithFilt
/// <returns>Returns a task which completes when the state has been fetched.</returns>
public virtual Task<IList<OrchestrationState>> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateAsync));
throw this.GetNotImplementedException(nameof(this.GetOrchestrationStateWithInputsAsync));
}

/// <summary>
Expand Down
19 changes: 18 additions & 1 deletion src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ public string HubName

internal MessagePayloadDataConverter ErrorDataConverter { get; private set; }

internal TimeSpan MessageReorderWindow
=> this.defaultDurabilityProvider.GuaranteesOrderedDelivery ? TimeSpan.Zero : TimeSpan.FromMinutes(this.Options.EntityMessageReorderWindowInMinutes);

private MessagePayloadDataConverter CreateMessageDataConverter(IMessageSerializerSettingsFactory messageSerializerSettingsFactory)
{
bool isDefault;
Expand All @@ -215,6 +218,11 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}

internal string GetBackendInfo()
{
return this.defaultDurabilityProvider.GetBackendInfo();
}

/// <summary>
/// Internal initialization call from the WebJobs host.
/// </summary>
Expand Down Expand Up @@ -725,14 +733,15 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
else
{
// run this through the message sorter to help with reordering and duplicate filtering
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, entityContext.EntityMessageReorderWindow);
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow);
}

foreach (var message in deliverNow)
{
if (entityContext.State.LockedBy == message.ParentInstanceId)
{
// operation requests from the lock holder are processed immediately
entityContext.TraceWorkItemProgress("processes {entityMessage}", message);
entityShim.AddOperationToBatch(message);
}
else
Expand All @@ -749,6 +758,8 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F

if (entityContext.State.LockedBy == message.ParentInstanceId)
{
entityContext.TraceWorkItemProgress("processes {entityMessage}", message);

this.TraceHelper.EntityLockReleased(
entityContext.HubName,
entityContext.Name,
Expand All @@ -759,6 +770,10 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F

entityContext.State.LockedBy = null;
}
else
{
entityContext.TraceWorkItemProgress("!!!! drops {entityMessage}", message);
}
}

break;
Expand All @@ -769,6 +784,8 @@ private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, F
while (entityContext.State.LockedBy == null
&& entityContext.State.TryDequeue(out var request))
{
entityContext.TraceWorkItemProgress("processes {entityMessage}", request);

if (request.IsLockRequest)
{
entityShim.AddLockRequestToBatch(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder, Actio
return builder;
}

/// <summary>
/// Override the AzureStorageDurabilityProvider specification that was done in AddDurableTask.
/// </summary>
/// <param name="services">The <see cref="IServiceCollection"/> to configure, usually from the Functions app's FunctionsStartup.</param>
/// <returns>Returns the provided <see cref="IServiceCollection"/>.</returns>
public static IServiceCollection UseEventSourcedDurabilityProvider(this IServiceCollection services)
=> services.AddSingleton<IDurabilityProviderFactory, EventSourcedDurabilityProviderFactory>();

#else
/// <summary>
/// Enable running durable orchestrations implemented as functions.
Expand Down
121 changes: 121 additions & 0 deletions src/WebJobs.Extensions.DurableTask/EventSourcedDurabilityProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.EventSourced;
using Newtonsoft.Json;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class EventSourcedDurabilityProvider : DurabilityProvider
{
private readonly EventSourcedOrchestrationService serviceClient;

internal EventSourcedDurabilityProvider(EventSourcedOrchestrationService service, EventSourcedOrchestrationServiceSettings settings)
: base("EventSourced", service, service, "StorageConnectionString")
{
this.serviceClient = service;
this.Settings = settings;
}

public EventSourcedOrchestrationServiceSettings Settings { get; private set; }

public override bool SupportsEntities => true;

public override bool SupportsPollFreeWait => true;

public override bool GuaranteesOrderedDelivery => true;

public override TimeSpan MaximumDelayTime { get; set; } = TimeSpan.MaxValue;

/// <summary>
/// The app setting containing the Azure Storage connection string.
/// </summary>
public override string ConnectionName => "StorageConnectionString"; // TODO this needs to be refactored to work across providers

/// <inheritdoc/>
public override async Task StopAsync(bool isForced)
{
if (!this.Settings.KeepServiceRunning)
{
await this.serviceClient.StopAsync(isForced);
EventSourcedDurabilityProviderFactory.RemoveDurabilityProvider(this);
}
else
{
await this.PurgeHistoryByFilters(default, default, null);
}
}

/// <inheritdoc/>
public async override Task<string> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
var instanceId = EntityId.GetSchedulerIdFromEntityId(entityId);
OrchestrationState state = await this.serviceClient.GetOrchestrationStateAsync(instanceId, true, true);

if (state != null
&& state.OrchestrationInstance != null
&& state.Input != null)
{
var schedulerState = JsonConvert.DeserializeObject<SchedulerState>(state.Input, serializerSettings);

if (schedulerState.EntityExists)
{
return schedulerState.EntityState;
}
}

return null;
}

/// <inheritdoc/>
public async override Task<IList<OrchestrationState>> GetOrchestrationStateWithInputsAsync(string instanceId, bool showInput = true)
{
var result = new List<OrchestrationState>();
var state = await this.serviceClient.GetOrchestrationStateAsync(instanceId, showInput, true);
if (state != null)
{
result.Add(state);
}

return result;
}

/// <inheritdoc/>
public async override Task<PurgeHistoryResult> PurgeInstanceHistoryByInstanceId(string instanceId)
{
var numberInstancesDeleted = await this.serviceClient.PurgeInstanceHistoryAsync(instanceId);
return new PurgeHistoryResult(numberInstancesDeleted);
}

/// <inheritdoc/>
public override Task<int> PurgeHistoryByFilters(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus)
{
return this.serviceClient.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus);
}

/// <inheritdoc/>
public async override Task<OrchestrationStatusQueryResult> GetOrchestrationStateWithPagination(OrchestrationStatusQueryCondition condition, CancellationToken cancellationToken)
{
var instanceQuery = new InstanceQuery(
runtimeStatus: condition.RuntimeStatus?.Select(p => (OrchestrationStatus)Enum.Parse(typeof(OrchestrationStatus), p.ToString())).ToArray(),
createdTimeFrom: (condition.CreatedTimeFrom == default) ? (DateTime?)null : condition.CreatedTimeFrom.ToUniversalTime(),
createdTimeTo: (condition.CreatedTimeTo == default) ? (DateTime?)null : condition.CreatedTimeTo.ToUniversalTime(),
instanceIdPrefix: condition.InstanceIdPrefix,
fetchInput: condition.ShowInput);

InstanceQueryResult result = await this.serviceClient.QueryOrchestrationStatesAsync(instanceQuery, condition.PageSize, condition.ContinuationToken, cancellationToken);

return new OrchestrationStatusQueryResult()
{
DurableOrchestrationState = result.Instances.Select(ostate => DurableClient.ConvertOrchestrationStateToStatus(ostate)).ToList(),
ContinuationToken = result.ContinuationToken,
};
}
}
}
Loading

0 comments on commit 4e6b170

Please sign in to comment.