Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Sticky sessions" support in DurableTask.Core and DurableTask.AzureStorage #171

Merged
merged 3 commits into from Apr 30, 2018

Conversation

cgillum
Copy link
Collaborator

@cgillum cgillum commented Apr 20, 2018

This PR contains the same changes as #170 but also includes the changes to DurableTask.AzureStorage to support it.

@cgillum cgillum changed the title "Sticky sessions" support in DurableTask.AzureStorage "Sticky sessions" support in DurableTask.Core and DurableTask.AzureStorage Apr 21, 2018
Copy link
Collaborator Author

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some notes about the implementation which may be non-obvious when reading the code.

/// The primary use case is for detecting illegal async usage in orchestration code.
/// </summary>
[ThreadStatic]
public static bool IsOrchestratorThread;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI regarding this: In Durable Functions I added some code in my shim layer which uses thread IDs to best-effort detect illegal task usage in orchestration functions. It's been extremely helpful in deterring CRIs from customers who don't read the documentation. However, that best-effort logic is broken by sticky sessions because now it's possible for multiple threads to access the same orchestration instance. To account for this, I've created this public [ThreadStatic] variable. Instead of comparing thread IDs, I can just check to see if OrchestrationContext.IsOrchestratorThread is false to detect illegal task continuations. It's simpler and much more reliable than my previous method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

..you should add this explanation to the comment in the code as well.

Also you also point me to example usage in durable functions code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a PR for it yet since this sticky-sessions PR has yet to be approved. However, you can see where I'm adding it here: https://github.com/Azure/azure-functions-durable-extension/blob/36e1388c02b1c34d986a3876eef080f79d0c00c1/src/WebJobs.Extensions.DurableTask/DurableOrchestrationContext.cs#L453


In reply to: 183552220 [](ancestors = 183552220)

if (newMessages == null)
{
break;
}
Copy link
Collaborator Author

@cgillum cgillum Apr 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is unfortunately a little complex. It's basically trying to handle three different conditions:

  1. Run normally if the provider returns a work item with new events (legacy behavior)
  2. Run normally if the provider is using sticky sessions but only reads a single batch of messages
  3. Invoke throttling behavior if a session wants to read a subsequent batch of messages (to avoid starvation) #Resolved

@cgillum
Copy link
Collaborator Author

cgillum commented Apr 21, 2018

@affandar, @simonporter, let's use this as the main PR. If you think this is good then we can merge it into azure-functions and subsequently merge azure-functions into master. I'll let you decide if you care to review the DurableTask.AzureStorage code. The most important thing to review IMO is the DurableTask.Core changes.

FYI @adarsh1

instanceState);
if (isExtendedSession)
{
this.concurrentSessionLock.Release();
Copy link
Contributor

@adarsh1 adarsh1 Apr 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in a finally block? #Resolved

Copy link
Collaborator Author

@cgillum cgillum Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is fixed in the latest push (which was a force update to make it easier for me to revert some of the unnecessary changes). #Resolved

@cgillum cgillum force-pushed the azure-functions-sticky-sessions branch from 652e4a1 to 4f2a10c Compare April 23, 2018 00:13
@cgillum
Copy link
Collaborator Author

cgillum commented Apr 23, 2018

I've re-pushed the changes to this PR. The overall diff should be reduced now, making it a simpler review for the parts in DurableTask.Core. Please have a look.

@@ -282,16 +338,18 @@ public class TaskOrchestrationDispatcher
continuedAsNew ? null : timerMessages,
continuedAsNewMessage,
instanceState);

Copy link
Contributor

@affandar affandar Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync call above will release the session lock for session based providers. So we need to add a flag that specifies 'update only but not release'. In fact in addition to this flag, we'd need to move this to the wrapping onprocessworkitemsession method so that guy can periodically complete tasks (without releasing lock le). Otherwise the uncommitted 'transaction log' is going to continue growing which has two problems:

i) much much heavier replay on a reload
ii) runs into transaction limits (e.g. service bus can't save more than 100 message sends per transaction commit)
#Resolved

Copy link
Contributor

@affandar affandar Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

..iii) without this the orchestration may stop moving forward, because completetaskorchestrationworkitemasync() is where activities are actually triggered (and respond) #Resolved

Copy link
Collaborator Author

@cgillum cgillum Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that each call to CompleteTaskOrchestrationWorkItemAsync() will commit all the changes in SB (sending messages, etc.) but that the session doesn't get released until ReleaseTaskOrchestrationWorkItemAsync() is called. Is that not the case? If that is the case, then I think the current implementation will work just fine for Service Bus. #Resolved

Copy link
Contributor

@affandar affandar Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea nvm, I had forgotten my own code :)

but pls do think through the max message issue mentioned below #Resolved

@@ -209,16 +265,16 @@ public class TaskOrchestrationDispatcher
if (this.orchestrationService.IsMaxMessageCountExceeded(totalMessages, runtimeState))
Copy link
Contributor

@affandar affandar Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgillum have you thought though how this max message count limit mechanic play out with the sticky sessions?

when this limit hits, we need to completetaskorchestrationworkitem (without releasing the lock.. see comment below) so the messages getting queued up to be sent are actually sent. #Resolved

Copy link
Collaborator Author

@cgillum cgillum Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already calling CompleteTaskOrchestrationWorkItem for every batch (every loop iteration), so there is no real behavior change here. The only real change is that we're doing multiple CompleteTaskOrchestrationWorkItem calls before we release the work item with ReleaseTaskOrchestrationWorkItemAsync. #Resolved

Copy link
Contributor

@affandar affandar Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep but the history events that were chopped off from iteration 1 would not be replayed in iteration 2 because we are resuming orchestration now. Which means all the messages that needed to be sent because of those history event would be omitted..right? #Resolved

Copy link
Collaborator Author

@cgillum cgillum Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I see what you're getting at - sorry I completely misunderstood the concern. DurableTask.AzureStorage is not impacted by this since we don't have a max message count, so I don't think this necessarily needs to be a blocker for this PR - i.e. it could be worked on once you guys on-board DurableTask.ServiceBus.

If I understand correctly, what you're doing is you're check-pointing the progress that was made thus far (scheduling 100 messages and updating the history accordingly) and then scheduling a no-op timer to run through the history a second time to make sure any non-processed tasks are re-executed. The next iteration will know which tasks were scheduled and which ones weren't because it will be reflected in the updated history.

With my current implementation, we won't replay the history, thus won't re-run those "aborted" tasks. I can see this would be a problem. In that case, I think it's best for us to break out of the sticky-session loop and re-execute from scratch on the next work item. I'll add that change. #Resolved

@@ -613,13 +616,38 @@ async delegate (CloudQueue controlQueue)

this.controlQueueBackoff.Reset();

ReceivedMessageContext messageContext =
ReceivedMessageContext.CreateFromReceivedMessageBatch(
OrchestrationInstance instance = nextBatch.Messages[0].TaskMessage.OrchestrationInstance;
Copy link
Contributor

@simonporter simonporter Apr 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messages[0] [](start = 55, length = 11)

Is this gauranteed to exist? #Resolved

Copy link
Collaborator Author

@cgillum cgillum Apr 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a batch is only ever created if we have a message to put into it. #Resolved

@@ -159,7 +185,7 @@ public class AzureStorageScenarioTests
// The end message will cause the actor to complete itself.
await client.RaiseEventAsync("operation", "end");

status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));
status = await client.WaitForCompletionAsync(Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(10));
Copy link
Contributor

@simonporter simonporter Apr 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(10) [](start = 61, length = 72)

nit: more to helper, duped 5 times in here already #Pending

@@ -282,16 +343,18 @@ public class TaskOrchestrationDispatcher
continuedAsNew ? null : timerMessages,
continuedAsNewMessage,
instanceState);

return isCompleted || isInterrupted;
Copy link
Collaborator Author

@cgillum cgillum Apr 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return isCompleted || isInterrupted; [](start = 12, length = 36)

More testing revealed that we also need to return true here if continuedAsNew is true. #Pending

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out the continueAsNew case completely derailed my session implementation in Azure Storage due to race conditions. I've had to refactor it.


In reply to: 183619482 [](ancestors = 183619482)


// To avoid starvation, we only allow half of all concurrently execution orchestrations to
// leverage extended sessions.
int maxConcurrentSessions = (int)Math.Ceiling(this.dispatcher.MaxConcurrentWorkItems / 2.0);
Copy link
Contributor

@adarsh1 adarsh1 Apr 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable? #WontFix

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make it configurable later. For now I think half is a good default.


In reply to: 183831739 [](ancestors = 183831739)


return this.dispatchPipeline.RunAsync(dispatchContext, _ =>
{
cursor.LatestDecisions = cursor.OrchestrationExecutor.ExecuteNewEvents();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we be exposing the OrchestrationExecutor from the cursor here? shouldn't the cursor update itself rather than its latest decisions being set by the dispatcher?

// Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
// until either new messages are available or until a provider-specific timeout has expired.
workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
if (workItem.NewMessages == null)
Copy link
Contributor

@affandar affandar Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking nit: please add tracing to this method. when things go wrong, there should be a log we can look at to see how long each fetch took and how many messages did it actually fetch. similar to the logging we have for the master fetch etc. #Resolved


public override string ToString()
{
return $"Signaled: {this.isSignaled}, Waiters: {this.waiters.Count}";
Copy link
Contributor

@simonporter simonporter Apr 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Count [](start = 73, length = 5)

nit: use tostring() to avoid boxing #Pending

using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>TODO</summary>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO [](start = 17, length = 4)

Would be nice if we can populate this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I totally spaced on that. I'll fill these in.


In reply to: 184815743 [](ancestors = 184815743)

Copy link
Contributor

@simonporter simonporter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

@cgillum cgillum force-pushed the azure-functions-sticky-sessions branch from 545d08b to 008c353 Compare April 30, 2018 06:03
@cgillum cgillum merged commit 534e357 into azure-functions Apr 30, 2018
@cgillum cgillum deleted the azure-functions-sticky-sessions branch April 30, 2018 06:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants