Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable suspend-resume of orchestrators. #764

Merged
merged 42 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c0c48cc
Finish first draft suspend-resume feature
julio-arroyo02 Jul 13, 2022
77f0815
Rename to convey behavior better, update inaccurate documentation.
julio-arroyo02 Jul 15, 2022
4d11050
Remove unnecessary logic.
julio-arroyo02 Jul 15, 2022
db822e7
Avoid setting completed time when suspending or resuming an orchestra…
julio-arroyo02 Jul 15, 2022
1140a17
Incorporate feedback and fixes from draft PR comments
julio-arroyo02 Jul 18, 2022
2091db5
Refactor logic for consistency and less repetition
julio-arroyo02 Jul 18, 2022
b94455a
Implement tests for suspend-resume feature
julio-arroyo02 Jul 20, 2022
fb6fc4b
Add support for suspended orchestrations when waiting for completion
julio-arroyo02 Jul 21, 2022
7ab6b76
Clean up comments
julio-arroyo02 Jul 21, 2022
206bf94
Modularize buffer implementation for suspended orchestration messages
julio-arroyo02 Jul 21, 2022
775d0bb
Add new tests for suspend-resume
julio-arroyo02 Jul 21, 2022
a2a128f
Modify underlying data structure, change way buffer is initialized
julio-arroyo02 Jul 21, 2022
37639ef
Derive status from runtime, remove setter
julio-arroyo02 Jul 21, 2022
b847747
Add new test
julio-arroyo02 Jul 21, 2022
4663fcb
Refactor resuming orchestration to avoid code duplication
julio-arroyo02 Jul 21, 2022
bfaa1e9
Rename for consistency with other parameters
julio-arroyo02 Jul 21, 2022
5842118
Fix minor typo
julio-arroyo02 Jul 21, 2022
a2a8063
Introduce new handler to deal with events while orchestrations are su…
julio-arroyo02 Jul 22, 2022
202f2b4
Delete unnecessary class
julio-arroyo02 Jul 22, 2022
878bf2f
Address new feedback in PR
julio-arroyo02 Jul 25, 2022
acad657
Add minor modifications
julio-arroyo02 Jul 25, 2022
0745b9f
Add minor fixes
julio-arroyo02 Jul 26, 2022
331012f
Consolidate multiple test cases into single end-to-end test
julio-arroyo02 Jul 27, 2022
6924f4e
Fix EventType enum order to avoid OOProc bugs.
julio-arroyo02 Jul 29, 2022
44cef71
Address feedback
julio-arroyo02 Aug 3, 2022
d941d6f
Change patch version
julio-arroyo02 Aug 8, 2022
f6ba832
Fix strings.
julio-arroyo02 Aug 8, 2022
f280b65
Implement local orchestration's suspend-resume.
julio-arroyo02 Aug 9, 2022
4234747
Fix redis version.
julio-arroyo02 Aug 9, 2022
5e2b45f
Correct redis version.
julio-arroyo02 Aug 9, 2022
a766701
Correct redis version.
julio-arroyo02 Aug 9, 2022
c329a99
Implement new design to avoid exposing new APIs to client.
julio-arroyo02 Aug 11, 2022
6525c06
Refactor tests to increase reliability.
julio-arroyo02 Aug 11, 2022
22f9391
Fix minor version since we introduce new feature.
julio-arroyo02 Aug 12, 2022
8332463
Refactor logic of marker events.
julio-arroyo02 Aug 12, 2022
59b1cea
Put back code that should not have been removed.
julio-arroyo02 Aug 12, 2022
2a5e8da
Fix bug related to completed events.
julio-arroyo02 Aug 12, 2022
ec2f7ed
Fix AzureStorage version.
julio-arroyo02 Aug 16, 2022
e098959
Merge branch 'main' into suspend-resume
Julio-Arroyo Aug 16, 2022
374fbf3
Fix bug: suspended orchestrations are still executable.
julio-arroyo02 Aug 17, 2022
f0e718d
Merge pull request #1 from Azure/main
Julio-Arroyo Aug 18, 2022
4dcef91
Merge branch 'main' of https://github.com/Julio-Arroyo/durabletask in…
julio-arroyo02 Aug 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,8 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMess

if (runtimeState.ExecutionStartedEvent != null &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Running &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Pending)
runtimeState.OrchestrationStatus != OrchestrationStatus.Pending &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the latest design doesn't require any changes to the backend, so I'm not expecting this change to be necessary (in fact, I worry about potential problems it could cause). Is this a leftover from a previous design?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, great point. Thanks for bringing it up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgillum Actually, I want to argue that we do need this change: runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended must be included. (@davidmrdavid this is the bug we were unable to explain earlier today).

isExecutableInstance has one single reference. It is used by AzureStorageOrchestrationService.cs to check whether the batch of messages dequeued should be discarded or abandoned. If we do not include this extra check, there are many scenarios where this feature won't work. Consider the following two examples:

  1. Consider a suspended orchestration. The client tries to raise an event. Thus, that message is enqueued, later dequeued. If we don't add runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended, isExecutableInstance will return false. Which means that the RaiseEvent will be discarded and will not make it to the history.
  2. Similarly, if you have a suspended orchestration, and try to terminate it, the TerminateEvent message will be also discarded. So the terminate operation wouldn't work on suspended orchestrations.

@cgillum I know you mentioned you worry about potential problems this change could introduce. Could you elaborate on that?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Julio-Arroyo Ah, I see. I was reading the logic backwards and thought we'd discard the message if it was in a suspended state. You're right - we need the suspended check here to avoid unnecessarily discarding work items for suspended instances.

{
message = $"Instance is {runtimeState.OrchestrationStatus}";
return false;
Expand Down Expand Up @@ -1847,6 +1848,7 @@ public async Task<OrchestrationState> WaitForOrchestrationAsync(
OrchestrationState state = await this.GetOrchestrationStateAsync(instanceId, executionId);
if (state == null ||
state.OrchestrationStatus == OrchestrationStatus.Running ||
state.OrchestrationStatus == OrchestrationStatus.Suspended ||
state.OrchestrationStatus == OrchestrationStatus.Pending ||
state.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
{
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
<!-- Version Info -->
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>12</MinorVersion>
<PatchVersion>1</PatchVersion>
<MinorVersion>13</MinorVersion>
<PatchVersion>0</PatchVersion>

<Version>$(MajorVersion).$(MinorVersion).$(PatchVersion)</Version>
<FileVersion>$(Version).0</FileVersion>
Expand Down
22 changes: 22 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,28 @@ public override async Task<string> UpdateStateAsync(
instancePropertyName: OutputProperty,
data: executionTerminatedEvent.Input);
break;
case EventType.ExecutionSuspended:
runtimeStatus = OrchestrationStatus.Suspended;
ExecutionSuspendedEvent executionSuspendedEvent = (ExecutionSuspendedEvent)historyEvent;
instanceEntity.Properties["RuntimeStatus"] = new EntityProperty(OrchestrationStatus.Suspended.ToString());
this.SetInstancesTablePropertyFromHistoryProperty(
historyEntity,
instanceEntity,
historyPropertyName: nameof(executionSuspendedEvent.Reason),
instancePropertyName: OutputProperty,
Julio-Arroyo marked this conversation as resolved.
Show resolved Hide resolved
data: executionSuspendedEvent.Reason);
break;
case EventType.ExecutionResumed:
runtimeStatus = OrchestrationStatus.Running;
ExecutionResumedEvent executionResumedEvent = (ExecutionResumedEvent)historyEvent;
instanceEntity.Properties["RuntimeStatus"] = new EntityProperty(OrchestrationStatus.Running.ToString());
this.SetInstancesTablePropertyFromHistoryProperty(
historyEntity,
instanceEntity,
historyPropertyName: nameof(executionResumedEvent.Reason),
instancePropertyName: OutputProperty,
data: executionResumedEvent.Reason);
break;
case EventType.ContinueAsNew:
runtimeStatus = OrchestrationStatus.ContinuedAsNew;
ExecutionCompletedEvent executionCompletedEvent = (ExecutionCompletedEvent)historyEvent;
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.Core/DurableTask.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<!-- Version Info -->
<PropertyGroup>
<MajorVersion>2</MajorVersion>
<MinorVersion>10</MinorVersion>
<MinorVersion>11</MinorVersion>
<PatchVersion>0</PatchVersion>

<Version>$(MajorVersion).$(MinorVersion).$(PatchVersion)</Version>
Expand Down
13 changes: 13 additions & 0 deletions src/DurableTask.Core/History/EventType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// limitations under the License.
// ----------------------------------------------------------------------------------


// NOTE: Changing the order of variables may cause bugs in OOProc SDKs. When introducing
// new EventTypes, append them at the end.
Julio-Arroyo marked this conversation as resolved.
Show resolved Hide resolved
namespace DurableTask.Core.History
{
/// <summary>
Expand Down Expand Up @@ -112,5 +115,15 @@ public enum EventType
/// Orchestration state history event
/// </summary>
HistoryState,

/// <summary>
/// Orchestration was suspended event
/// </summary>
ExecutionSuspended,

/// <summary>
/// Orchestration was resumed event
/// </summary>
ExecutionResumed,
}
}
46 changes: 46 additions & 0 deletions src/DurableTask.Core/History/ExecutionResumedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.History
{
using System.Runtime.Serialization;

/// <summary>
/// A history event for orchestration resuming
/// </summary>
[DataContract]
public class ExecutionResumedEvent : HistoryEvent
{
/// <summary>
/// Creates a new ExecutionResumedEvent with the supplied params
/// </summary>
/// <param name="eventId">The event id of the history event</param>
/// <param name="reason">The serialized reason of the resuming event</param>
public ExecutionResumedEvent(int eventId, string reason)
: base(eventId)
{
Reason = reason;
}

/// <summary>
/// Gets the event type
/// </summary>
public override EventType EventType => EventType.ExecutionResumed;

/// <summary>
/// Gets or sets the reason for the resuming event
/// </summary>
[DataMember]
public string Reason { get; set; }
}
}
46 changes: 46 additions & 0 deletions src/DurableTask.Core/History/ExecutionSuspendedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core.History
{
using System.Runtime.Serialization;

/// <summary>
/// A history event for orchestration suspension.
/// </summary>
[DataContract]
public class ExecutionSuspendedEvent : HistoryEvent
{
/// <summary>
/// Creates a new ExecutionSuspendedEvent with the supplied params
/// </summary>
/// <param name="eventId">The event id of the history event</param>
/// <param name="reason">The serialized input of the suspension event</param>
public ExecutionSuspendedEvent(int eventId, string reason)
: base(eventId)
{
Reason = reason;
}

/// <summary>
/// Gets the event type
/// </summary>
public override EventType EventType => EventType.ExecutionSuspended;

/// <summary>
/// Gets or sets the serialized input for the the suspension event
/// </summary>
[DataMember]
public string Reason { get; set; }
}
}
3 changes: 3 additions & 0 deletions src/DurableTask.Core/Logging/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,8 @@ static class EventIds
public const int RenewActivityMessageStarting = 65;
public const int RenewActivityMessageCompleted = 66;
public const int RenewActivityMessageFailed = 67;

public const int SuspendingInstance = 68;
public const int ResumingInstance = 69;
}
}
72 changes: 72 additions & 0 deletions src/DurableTask.Core/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,78 @@ void IEventSourceEvent.WriteEventSource() =>
Utils.PackageVersion);
}

internal class SuspendingInstance : StructuredLogEvent, IEventSourceEvent
{
public SuspendingInstance(OrchestrationInstance instance, string reason)
{
this.InstanceId = instance.InstanceId;
this.ExecutionId = instance.ExecutionId;
this.Details = reason;
}

[StructuredLogField]
public string InstanceId { get; }

[StructuredLogField]
public string ExecutionId { get; }

[StructuredLogField]
public string Details { get; }

public override EventId EventId => new EventId(
EventIds.SuspendingInstance,
nameof(EventIds.SuspendingInstance));

public override LogLevel Level => LogLevel.Information;

protected override string CreateLogMessage() =>
$"Suspending instance '{this.InstanceId}': {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.SuspendingInstance(
this.InstanceId,
this.ExecutionId,
this.Details,
Utils.AppName,
Utils.PackageVersion);
}

internal class ResumingInstance : StructuredLogEvent, IEventSourceEvent
{
public ResumingInstance(OrchestrationInstance instance, string reason)
{
this.InstanceId = instance.InstanceId;
this.ExecutionId = instance.ExecutionId;
this.Details = reason;
}

[StructuredLogField]
public string InstanceId { get; }

[StructuredLogField]
public string ExecutionId { get; }

[StructuredLogField]
public string Details { get; }

public override EventId EventId => new EventId(
EventIds.ResumingInstance,
nameof(EventIds.ResumingInstance));

public override LogLevel Level => LogLevel.Information;

protected override string CreateLogMessage() =>
$"Resuming instance '{this.InstanceId}': {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.ResumingInstance(
this.InstanceId,
this.ExecutionId,
this.Details,
Utils.AppName,
Utils.PackageVersion);
}

internal class WaitingForInstance : StructuredLogEvent, IEventSourceEvent
{
public WaitingForInstance(OrchestrationInstance instance, TimeSpan timeout)
Expand Down
26 changes: 26 additions & 0 deletions src/DurableTask.Core/Logging/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,32 @@ internal void TerminatingInstance(OrchestrationInstance instance, string reason)
}
}

/// <summary>
/// Logs that an instance is being scheduled to be suspended.
/// </summary>
/// <param name="instance">The instance to be suspended.</param>
/// <param name="reason">The user-specified reason for the suspension.</param>
internal void SuspendingInstance(OrchestrationInstance instance, string reason)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.SuspendingInstance(instance, reason));
}
}

/// <summary>
/// Logs that an instance is being scheduled to be resumed.
/// </summary>
/// <param name="instance">The instance to be resumed.</param>
/// <param name="reason">The user-specified reason for the resumption.</param>
internal void ResumingInstance(OrchestrationInstance instance, string reason)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.ResumingInstance(instance, reason));
}
}

/// <summary>
/// Logs that the client is waiting for an instance to reach a terminal state.
/// </summary>
Expand Down
40 changes: 40 additions & 0 deletions src/DurableTask.Core/Logging/StructuredEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,46 @@ internal void TerminatingInstance(
}
}

[Event(EventIds.SuspendingInstance, Level = EventLevel.Informational, Version = 1)]
internal void SuspendingInstance(
string InstanceId,
string ExecutionId,
string Details,
string AppName,
string ExtensionVersion)
{
if (this.IsEnabled(EventLevel.Informational))
{
this.WriteEvent(
EventIds.SuspendingInstance,
InstanceId,
ExecutionId ?? string.Empty,
Details ?? string.Empty,
AppName,
ExtensionVersion);
}
}

[Event(EventIds.ResumingInstance, Level = EventLevel.Informational, Version = 1)]
internal void ResumingInstance(
string InstanceId,
string ExecutionId,
string Details,
string AppName,
string ExtensionVersion)
{
if (this.IsEnabled(EventLevel.Informational))
{
this.WriteEvent(
EventIds.ResumingInstance,
InstanceId,
ExecutionId ?? string.Empty,
Details ?? string.Empty,
AppName,
ExtensionVersion);
}
}

[Event(EventIds.WaitingForInstance, Level = EventLevel.Informational, Version = 1)]
internal void WaitingForInstance(
string InstanceId,
Expand Down
Loading