Skip to content

reset pending orchestrations when worker restart#1354

Closed
kaibocai wants to merge 7 commits into
mainfrom
kaibocai/reset-pending-orchitem-worker-drain
Closed

reset pending orchestrations when worker restart#1354
kaibocai wants to merge 7 commits into
mainfrom
kaibocai/reset-pending-orchitem-worker-drain

Conversation

@kaibocai
Copy link
Copy Markdown
Member

This PR improves partition drain behavior for Azure Storage control queues. When a partition is released, any control queue messages that were already dequeued but not yet dispatched to an active orchestration session are now abandoned with zero visibility timeout, making them immediately visible for the next partition owner.

The change prevents a throughput gap during lease transitions where pending in-memory messages could otherwise remain invisible until their original visibility timeout expired.

Related ICM: https://portal.microsofticm.com/imp/v5/incidents/details/21000001021644/summary

Copilot AI review requested due to automatic review settings May 15, 2026 21:48
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Improves partition drain behavior for Azure Storage control queues by immediately re-exposing dequeued-but-undispatched control messages when a partition is released, reducing throughput gaps during lease transitions.

Changes:

  • Abandon pending (in-memory) control queue messages for a drained partition with zero visibility timeout before removing the partition.
  • Guard dispatch logic to skip “ready” nodes that were drained/removed from the pending list.
  • Add a unit test covering the drained-ready-node scenario.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
src/DurableTask.AzureStorage/OrchestrationSessionManager.cs Abandons pending batches during drain and skips drained nodes during dispatch.
src/DurableTask.AzureStorage/Messaging/ControlQueue.cs Adds a drain-specific abandon path that immediately re-queues messages (visibility timeout = 0).
Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs Adds test ensuring drained nodes in the ready queue are ignored.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Comment thread src/DurableTask.AzureStorage/Messaging/ControlQueue.cs Outdated
Comment thread Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs Outdated
Comment thread src/DurableTask.AzureStorage/Messaging/ControlQueue.cs Fixed
Comment thread test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs Fixed
PR feedback 01

Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Copilot AI review requested due to automatic review settings May 15, 2026 22:02
kaibocai and others added 2 commits May 15, 2026 17:02
…tch block'

Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

Comment thread src/DurableTask.AzureStorage/OrchestrationSessionManager.cs Outdated
this.settings.WorkerId,
this.Name,
$"Failed to abandon message {queueMessage.MessageId} during drain: {e}");
}
Comment on lines +247 to +255
catch (RequestFailedException e)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
this.Name,
$"Failed to abandon message {queueMessage.MessageId} during drain: {e}");
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in dd23148. AbandonMessageForDrainAsync now catches broader Exception so drain-abandon stays best-effort and won’t bubble failures that can interfere with partition cleanup; warning logging continues to include full exception details via {e}.

Comment thread Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs Outdated
Comment thread Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs Outdated
Comment on lines +265 to +296
static object CreatePendingBatch(ControlQueue controlQueue)
{
Type pendingBatchType = typeof(OrchestrationSessionManager)
.GetNestedType("PendingMessageBatch", BindingFlags.NonPublic);

return Activator.CreateInstance(
pendingBatchType,
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
binder: null,
args: new object[] { controlQueue, "instance1", "execution1" },
culture: null);
}

static object AddPendingBatchNode(OrchestrationSessionManager manager, object pendingBatch)
{
object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
MethodInfo addLast = pendingBatches.GetType().GetMethod("AddLast", new[] { pendingBatch.GetType() });
return addLast.Invoke(pendingBatches, new[] { pendingBatch });
}

static void RemovePendingBatchNode(OrchestrationSessionManager manager, object node)
{
object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
MethodInfo remove = pendingBatches.GetType().GetMethod("Remove", new[] { node.GetType() });
remove.Invoke(pendingBatches, new[] { node });
}

static void EnqueueReadyForProcessingNode(OrchestrationSessionManager manager, object node)
{
object readyQueue = GetPrivateField(manager, "orchestrationsReadyForProcessingQueue");
MethodInfo enqueue = readyQueue.GetType().GetMethod("Enqueue");
enqueue.Invoke(readyQueue, new[] { node });
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in dd23148. I added explicit Assert.IsNotNull(...) checks for reflection targets used by the helpers (PendingMessageBatch nested type and the AddLast/Remove/Enqueue methods), plus null-checks on constructed/invoked reflection results.

Copilot AI review requested due to automatic review settings May 15, 2026 22:23
@kaibocai kaibocai review requested due to automatic review settings May 15, 2026 22:23
Comment on lines +233 to +241
catch (Exception e)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Failed to abandon pending messages during drain: {e}");
}
Copilot AI review requested due to automatic review settings May 15, 2026 22:38
@kaibocai kaibocai review requested due to automatic review settings May 15, 2026 22:38
Comment on lines +248 to +256
catch (Exception e)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
this.Name,
$"Failed to abandon message {queueMessage.MessageId} during drain: {e}");
}
@kaibocai kaibocai closed this May 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants