Skip to content

Commit

Permalink
Merge Main into Azure Storage v12 Branch (#911)
Browse files Browse the repository at this point in the history
  • Loading branch information
wsugarman committed May 31, 2023
1 parent 2ce73c5 commit cd477c7
Show file tree
Hide file tree
Showing 16 changed files with 401 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Microsoft.Azure.DurableTask.AzureServiceFabric</PackageId>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Version>2.3.7</Version>
<Version>2.3.9</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Title>Azure Service Fabric provider extension for the Durable Task Framework.</Title>
Expand Down
127 changes: 101 additions & 26 deletions src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace DurableTask.AzureServiceFabric
using DurableTask.AzureServiceFabric.TaskHelpers;
using DurableTask.AzureServiceFabric.Tracing;
using Microsoft.ServiceFabric.Data;
using Newtonsoft.Json;

class FabricOrchestrationService : IOrchestrationService
{
Expand Down Expand Up @@ -149,6 +150,7 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
newMessages = await this.orchestrationProvider.ReceiveSessionMessagesAsync(currentSession);

var currentRuntimeState = new OrchestrationRuntimeState(currentSession.SessionState);

var workItem = new TaskOrchestrationWorkItem()
{
NewMessages = newMessages.Select(m => m.Value.TaskMessage).ToList(),
Expand All @@ -167,7 +169,7 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
bool isComplete = this.IsOrchestrationComplete(currentRuntimeState.OrchestrationStatus);
if (isComplete)
{
await this.HandleCompletedOrchestration(workItem);
await this.HandleCompletedOrchestrationAsync(workItem);
}

this.orchestrationProvider.TryUnlockSession(currentSession.SessionId, isComplete: isComplete);
Expand Down Expand Up @@ -209,10 +211,27 @@ public async Task CompleteTaskOrchestrationWorkItemAsync(
OrchestrationState orchestrationState)
{
SessionInformation sessionInfo = GetSessionInfo(workItem.InstanceId);
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
bool isComplete = false;

try
{
var orchestrationStatus = workItem.OrchestrationRuntimeState.OrchestrationStatus;
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId,
$"Current orchestration status: {workItem.OrchestrationRuntimeState.OrchestrationStatus}");
bool isComplete = this.IsOrchestrationComplete(workItem.OrchestrationRuntimeState.OrchestrationStatus);
$"Current orchestration status: {orchestrationStatus}");
isComplete = this.IsOrchestrationComplete(orchestrationStatus);
}
catch (InvalidOperationException ex)
{
// OrchestrationRuntimeState.OrchestrationStatus throws 'InvalidOperationException' if 'ExecutionStartedEvent' is missing.
// Do not process the orchestration workitem if 'ExecutionStartedEvent' is missing.
// This can happen when an orchestration message like ExecutionTerminatedEvent is sent to an already finished orchestration
if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent == null)
{
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"InstanceId: '{workItem.InstanceId}', exception: {ex}. Dropping the bad orchestration to avoid noise.");
await this.DropOrchestrationAsync(workItem);
}
}

IList<OrchestrationInstance> sessionsToEnqueue = null;
List<Message<Guid, TaskMessageItem>> scheduledMessages = null;
Expand Down Expand Up @@ -271,7 +290,7 @@ await RetryHelper.ExecuteWithRetryOnTransient(async () =>
if (workItem.OrchestrationRuntimeState.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
{
await HandleCompletedOrchestration(workItem);
await HandleCompletedOrchestrationAsync(workItem);
}
// When an orchestration is completed, we need to drop the session which involves 2 steps (1) Removing the row from sessions
Expand Down Expand Up @@ -342,12 +361,37 @@ await RetryHelper.ExecuteWithRetryOnTransient(async () =>

if (isComplete)
{
await HandleCompletedOrchestration(workItem);
await HandleCompletedOrchestrationAsync(workItem);
}
}

async Task DropOrchestrationAsync(TaskOrchestrationWorkItem workItem)
{
await CompleteOrchestrationAsync(workItem);

string message = $"{nameof(DropOrchestrationAsync)}: Dropped. Orchestration history: {JsonConvert.SerializeObject(workItem.OrchestrationRuntimeState.Events)}";
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId,
message);
}

// Caller should ensure the workItem has reached terminal state.
private async Task HandleCompletedOrchestration(TaskOrchestrationWorkItem workItem)
async Task HandleCompletedOrchestrationAsync(TaskOrchestrationWorkItem workItem)
{
await CompleteOrchestrationAsync(workItem);

string message = string.Format("Orchestration with instanceId : '{0}' and executionId : '{1}' Finished with the status {2} and result {3} in {4} seconds.",
workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId,
workItem.OrchestrationRuntimeState.OrchestrationStatus.ToString(),
workItem.OrchestrationRuntimeState.Output,
(workItem.OrchestrationRuntimeState.CompletedTime - workItem.OrchestrationRuntimeState.CreatedTime).TotalSeconds);
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId,
message);
}

async Task CompleteOrchestrationAsync(TaskOrchestrationWorkItem workItem)
{
await RetryHelper.ExecuteWithRetryOnTransient(async () =>
{
Expand All @@ -357,35 +401,34 @@ await RetryHelper.ExecuteWithRetryOnTransient(async () =>
{
new OrchestrationStateInstanceEntity()
{
State = Utils.BuildOrchestrationState(workItem.OrchestrationRuntimeState)
State = Utils.BuildOrchestrationState(workItem)
}
});
var instance = workItem.OrchestrationRuntimeState.OrchestrationInstance;
if (instance == null)
{
// This condition happens when an orchestration message like ExecutionTerminatedEvent enqueued for an already completed orchestration
SessionInformation sessionInfo = this.GetSessionInfo(workItem.InstanceId);
instance = sessionInfo.Instance;
}
// DropSession does 2 things (like mentioned in the comments above) - remove the row from sessions dictionary
// and delete the session messages dictionary. The second step is in a background thread and not part of transaction.
// However even if this transaction failed but we ended up deleting session messages dictionary, that's ok - at
// that time, it should be an empty dictionary and we would have updated the runtime session state to full completed
// state in the transaction from Complete method. So the subsequent attempt would be able to complete the session.
await this.orchestrationProvider.DropSession(txn, workItem.OrchestrationRuntimeState.OrchestrationInstance);
await this.orchestrationProvider.DropSessionAsync(txn, instance);
await txn.CommitAsync();
}
}, uniqueActionIdentifier: $"OrchestrationId = '{workItem.InstanceId}', Action = '{nameof(HandleCompletedOrchestration)}'");
}, uniqueActionIdentifier: $"OrchestrationId = '{workItem.InstanceId}', Action = '{nameof(CompleteOrchestrationAsync)}'");

this.instanceStore.OnOrchestrationCompleted(workItem.OrchestrationRuntimeState.OrchestrationInstance);

string message = string.Format("Orchestration with instanceId : '{0}' and executionId : '{1}' Finished with the status {2} and result {3} in {4} seconds.",
workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId,
workItem.OrchestrationRuntimeState.OrchestrationStatus.ToString(),
workItem.OrchestrationRuntimeState.Output,
(workItem.OrchestrationRuntimeState.CompletedTime - workItem.OrchestrationRuntimeState.CreatedTime).TotalSeconds);
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId,
message);
}

public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
SessionInformation sessionInfo = TryRemoveSessionInfo(workItem.InstanceId);
SessionInformation sessionInfo = this.TryRemoveSessionInfo(workItem.InstanceId);
if (sessionInfo == null)
{
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"{nameof(AbandonTaskOrchestrationWorkItemAsync)} : Could not get a session info object while trying to abandon session {workItem.InstanceId}");
Expand All @@ -399,9 +442,23 @@ public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem work

public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
bool isComplete = this.IsOrchestrationComplete(workItem.OrchestrationRuntimeState.OrchestrationStatus);
bool isComplete = false;
try
{
isComplete = this.IsOrchestrationComplete(workItem.OrchestrationRuntimeState.OrchestrationStatus);
}
catch (InvalidOperationException ex)
{
// OrchestrationRuntimeState.OrchestrationStatus throws 'InvalidOperationException' if 'ExecutionStartedEvent' is missing.
// This can happen when an orchestration message like ExecutionTerminatedEvent is sent to an already finished orchestration
if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent == null)
{
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition($"InstanceId: '{workItem.InstanceId}', exception: {ex}. Dropping/Unlocking the session as completed.");
isComplete = true;
}
}

SessionInformation sessionInfo = TryRemoveSessionInfo(workItem.InstanceId);
SessionInformation sessionInfo = this.TryRemoveSessionInfo(workItem.InstanceId);
if (sessionInfo != null)
{
this.orchestrationProvider.TryUnlockSession(sessionInfo.Instance, isComplete: isComplete);
Expand Down Expand Up @@ -472,10 +529,28 @@ await RetryHelper.ExecuteWithRetryOnTransient(async () =>
}
}

public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
{
this.activitiesProvider.Abandon(workItem.Id);
return Task.CompletedTask;
bool removed = false;
using (var txn = this.stateManager.CreateTransaction())
{
// Remove task activity if orchestration was already terminated or cleaned up
if (!await this.orchestrationProvider.ContainsSessionAsync(txn, workItem.TaskMessage.OrchestrationInstance))
{
var errorMessage = $"Session doesn't exist. Dropping TaskActivity for Orchestration = '{workItem.TaskMessage.OrchestrationInstance}', ActivityId = '{workItem.Id}', Action = '{nameof(AbandonTaskActivityWorkItemAsync)}'";
ServiceFabricProviderEventSource.Tracing.UnexpectedCodeCondition(errorMessage);
await this.activitiesProvider.CompleteAsync(txn, workItem.Id);
removed = true;
}

await txn.CommitAsync();
}

if (!removed)
{
// Re-Enqueue task activity
this.activitiesProvider.Abandon(workItem.Id);
}
}

public Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@ namespace DurableTask.AzureServiceFabric
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using DurableTask.AzureServiceFabric.Stores;
using DurableTask.AzureServiceFabric.TaskHelpers;
using DurableTask.AzureServiceFabric.Tracing;
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Serializing;
using DurableTask.Core.Tracking;
using DurableTask.AzureServiceFabric.Stores;
using DurableTask.AzureServiceFabric.TaskHelpers;
using DurableTask.AzureServiceFabric.Tracing;

using Microsoft.ServiceFabric.Data;

using Newtonsoft.Json;
using DurableTask.Core.Serializing;

class FabricOrchestrationServiceClient : IOrchestrationServiceClient, IFabricProviderClient
{
Expand Down Expand Up @@ -124,28 +121,56 @@ public async Task ForceTerminateTaskOrchestrationAsync(string instanceId, string

if (latestExecutionId == null)
{
throw new ArgumentException($"No execution id found for given instanceId {instanceId}, can only terminate the latest execution of a given orchestration");
throw new InvalidOperationException($"No execution id found for given instanceId {instanceId}, can only terminate the latest execution of a given orchestration");
}

var orchestrationInstance = new OrchestrationInstance { InstanceId = instanceId, ExecutionId = latestExecutionId };
if (reason?.Trim().StartsWith("CleanupStore", StringComparison.OrdinalIgnoreCase) == true)
{
using (var txn = this.stateManager.CreateTransaction())
{
// DropSession does 2 things (like mentioned in the comments above) - remove the row from sessions dictionary
// and delete the session messages dictionary. The second step is in a background thread and not part of transaction.
var stateInstance = await this.instanceStore.GetOrchestrationStateAsync(instanceId, latestExecutionId);
var state = stateInstance?.State;
if (state == null)
{
state = new OrchestrationState()
{
OrchestrationInstance = orchestrationInstance,
LastUpdatedTime = DateTime.UtcNow,
};
}

state.OrchestrationStatus = OrchestrationStatus.Terminated;
state.Output = $"Orchestration dropped with reason '{reason}'";

await this.instanceStore.WriteEntitiesAsync(txn, new InstanceEntityBase[]
{
new OrchestrationStateInstanceEntity()
{
State = state
}
}); ;
// DropSession does 2 things : removes the row from sessions dictionary and delete the session messages dictionary.
// The second step is in a background thread and not part of transaction.
// However even if this transaction failed but we ended up deleting session messages dictionary, that's ok - at
// that time, it should be an empty dictionary and we would have updated the runtime session state to full completed
// state in the transaction from Complete method. So the subsequent attempt would be able to complete the session.
var instance = new OrchestrationInstance { InstanceId = instanceId, ExecutionId = latestExecutionId };
await this.orchestrationProvider.DropSession(txn, instance);
await this.orchestrationProvider.DropSessionAsync(txn, orchestrationInstance);
await txn.CommitAsync();

// TODO: Renmove from FabricOrchestrationService.SessionInfo dictionary and SessionProvider.lockedSessions
}

this.instanceStore.OnOrchestrationCompleted(orchestrationInstance);

string message = $"{nameof(ForceTerminateTaskOrchestrationAsync)}: Terminated with reason '{reason}'";
ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(instanceId, latestExecutionId, message);
}
else
{
var taskMessage = new TaskMessage
{
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId, ExecutionId = latestExecutionId },
OrchestrationInstance = orchestrationInstance,
Event = new ExecutionTerminatedEvent(-1, reason)
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace DurableTask.AzureServiceFabric.Remote

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Web.Http.Results;

/// <summary>
/// Allows to interact with a remote IOrchestrationServiceClient
Expand Down Expand Up @@ -122,7 +123,8 @@ public async Task ForceTerminateTaskOrchestrationAsync(string instanceId, string
{
if (!response.IsSuccessStatusCode)
{
throw new RemoteServiceException("Unable to terminate task instance", response.StatusCode);
var message = await response.Content.ReadAsStringAsync();
throw new RemoteServiceException($"Unable to terminate task instance. Error: {response.StatusCode}:{message}", response.StatusCode);
}
}
}
Expand Down
Loading

0 comments on commit cd477c7

Please sign in to comment.