Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace DurableTask.AzureStorage.Messaging
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using DurableTask.Core;
using DurableTask.Core.History;
using Newtonsoft.Json;
Expand Down Expand Up @@ -226,5 +225,11 @@ bool IsNonexistantInstance()
{
return this.RuntimeState.Events.Count == 0 || this.RuntimeState.ExecutionStartedEvent == null;
}

public Task EndSessionAsync()
{
// No-op
return Task.CompletedTask;
}
}
}
9 changes: 8 additions & 1 deletion src/DurableTask.Core/IOrchestrationSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.Core
{
using System.Collections.Generic;
Expand All @@ -29,6 +30,12 @@ public interface IOrchestrationSession
/// or until an internal wait period has expired. In either case, <c>null</c> can be returned
/// and the dispatcher will shut down the session.
/// </remarks>
Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem);
Task<IList<TaskMessage>?> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem);

/// <summary>
/// Ends the session.
/// </summary>
/// <returns>A task that completes when the session has been ended.</returns>
Task EndSessionAsync();
Comment thread
sophiatev marked this conversation as resolved.
}
}
1 change: 1 addition & 0 deletions src/DurableTask.Core/TaskEntityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
if (concurrencyLockAcquired)
{
this.concurrentSessionLock.Release();
await workItem.Session.EndSessionAsync();
Comment thread
sophiatev marked this conversation as resolved.
}
Comment thread
sophiatev marked this conversation as resolved.
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/DurableTask.Core/TaskOrchestrationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
"OnProcessWorkItemSession-Release",
$"Releasing extended session after {processCount} batch(es).");
this.concurrentSessionLock.Release();
await workItem.Session.EndSessionAsync();
}
Comment thread
sophiatev marked this conversation as resolved.
}
}
Expand Down Expand Up @@ -552,10 +553,6 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
orchestratorMessages.AddRange(subOrchestrationRewindMessages);
workItem.OrchestrationRuntimeState = newRuntimeState;
runtimeState = newRuntimeState;
Comment thread
sophiatev marked this conversation as resolved.
// Setting this to true here will end an extended session if it is in progress.
// We don't want to save the state across executions, since we essentially manually modify
// the orchestration history here and so that stored by the extended session is incorrect.
isRewinding = true;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this assignment no longer needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was leftover from a previous PR where I added rewind support. I realized that I had moved where this was set to here but forgot to remove this redundant setting later on.

break;
default:
throw TraceHelper.TraceExceptionInstance(
Expand Down
Loading