diff --git a/src/WebJobs.Extensions.DurableTask/DurableOrchestrationContext.cs b/src/WebJobs.Extensions.DurableTask/DurableOrchestrationContext.cs index c3ce494d1..5909fe471 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableOrchestrationContext.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableOrchestrationContext.cs @@ -30,6 +30,9 @@ public sealed class DurableOrchestrationContext : DurableOrchestrationContextBas private readonly Dictionary pendingExternalEvents = new Dictionary(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary bufferedExternalEvents = + new Dictionary(StringComparer.OrdinalIgnoreCase); + private readonly DurableTaskExtension config; private readonly string orchestrationName; private readonly List> deferredTasks; @@ -276,6 +279,7 @@ public override Task WaitForExternalEvent(string name) Stack taskCompletionSources; TaskCompletionSource tcs; + // Set up the stack for listening to external events if (!this.pendingExternalEvents.TryGetValue(name, out taskCompletionSources)) { tcs = new TaskCompletionSource(); @@ -297,12 +301,28 @@ public override Task WaitForExternalEvent(string name) } } - this.config.TraceHelper.FunctionListening( - this.config.Options.HubName, - this.orchestrationName, - this.InstanceId, - reason: $"WaitForExternalEvent:{name}", - isReplay: this.innerContext.IsReplaying); + // Check the queue to see if any events came in before the orchestrator was listening + if (this.bufferedExternalEvents.TryGetValue(name, out Queue queue)) + { + object input = queue.Dequeue(); + + if (queue.Count == 0) + { + this.bufferedExternalEvents.Remove(name); + } + + // We can call raise event right away, since we already have an event's input + this.RaiseEvent(name, input.ToString()); + } + else + { + this.config.TraceHelper.FunctionListening( + this.config.Options.HubName, + this.orchestrationName, + this.InstanceId, + reason: $"WaitForExternalEvent:{name}", + isReplay: this.innerContext.IsReplaying); + } return tcs.Task; } @@ -527,8 +547,16 @@ internal void RaiseEvent(string name, string input) } else { - // The orchestrator was not waiting for any event by this name, so the event will be dropped. - this.config.TraceHelper.ExternalEventDropped( + // Add the event to an (in-memory) queue, so we don't drop or lose it + if (!this.bufferedExternalEvents.TryGetValue(name, out Queue bufferedEvents)) + { + bufferedEvents = new Queue(); + this.bufferedExternalEvents[name] = bufferedEvents; + } + + bufferedEvents.Enqueue(input); + + this.config.TraceHelper.ExternalEventSaved( this.HubName, this.Name, this.InstanceId, diff --git a/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs b/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs index 0440ab8f5..386e2ec0b 100644 --- a/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs +++ b/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs @@ -342,7 +342,7 @@ public void ExtensionWarningEvent(string hubName, string functionName, string in } } - public void ExternalEventDropped( + public void ExternalEventSaved( string hubName, string functionName, string instanceId, @@ -351,7 +351,7 @@ public void ExtensionWarningEvent(string hubName, string functionName, string in { FunctionType functionType = FunctionType.Orchestrator; - EtwEventSource.Instance.ExternalEventDropped( + EtwEventSource.Instance.ExternalEventSaved( hubName, LocalAppName, LocalSlotName, @@ -365,7 +365,7 @@ public void ExtensionWarningEvent(string hubName, string functionName, string in if (this.ShouldLogEvent(isReplay: false)) { this.logger.LogInformation( - "{instanceId}: Function '{functionName} ({functionType})' dropped a '{eventName}' event. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", + "{instanceId}: Function '{functionName} ({functionType})' saved a '{eventName}' event to an in-memory queue. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", instanceId, functionName, functionType, eventName, FunctionState.ExternalEventDropped, hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); } diff --git a/src/WebJobs.Extensions.DurableTask/EtwEventSource.cs b/src/WebJobs.Extensions.DurableTask/EtwEventSource.cs index fd5b25dad..0cd5473c3 100644 --- a/src/WebJobs.Extensions.DurableTask/EtwEventSource.cs +++ b/src/WebJobs.Extensions.DurableTask/EtwEventSource.cs @@ -239,8 +239,8 @@ private EtwEventSource() this.WriteEvent(214, TaskHub, AppName, SlotName, FunctionName ?? string.Empty, InstanceId ?? string.Empty, Details, ExtensionVersion); } - [Event(215, Level = EventLevel.Warning)] - public void ExternalEventDropped( + [Event(215, Level = EventLevel.Informational, Version = 2)] + public void ExternalEventSaved( string TaskHub, string AppName, string SlotName, diff --git a/test/Common/DurableTaskEndToEndTests.cs b/test/Common/DurableTaskEndToEndTests.cs index d8166aa02..c89213d73 100644 --- a/test/Common/DurableTaskEndToEndTests.cs +++ b/test/Common/DurableTaskEndToEndTests.cs @@ -1565,6 +1565,41 @@ public async Task ActivityWithRetry_NullRetryOptions() } } + /// + /// End-to-end test which validates that waiting for an external event and calling + /// an activity multiple times in a row does not lead to dropped events. + /// + [Fact] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + public async Task WaitForEventAndCallActivity_DroppedEventsTest() + { + using (JobHost host = TestHelpers.GetJobHost( + this.loggerProvider, + nameof(this.WaitForEventAndCallActivity_DroppedEventsTest), + enableExtendedSessions: false)) + { + await host.StartAsync(); + + string orchestratorFunctionName = nameof(TestOrchestrations.WaitForEventAndCallActivity); + var client = await host.StartOrchestratorAsync(orchestratorFunctionName, null, this.output); + + for (int i = 0; i < 5; i++) + { + await client.RaiseEventAsync("add", i, this.output); + } + + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60), this.output); + + Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus); + + int output = (int)status?.Output; + this.output.WriteLine($"Orchestration output string: {output}"); + Assert.Equal(26, output); + + await host.StopAsync(); + } + } + /// /// End-to-end test which runs a orchestrator function that calls a non-existent orchestrator function. /// diff --git a/test/Common/TestActivities.cs b/test/Common/TestActivities.cs index 656b5a519..7f6f332b9 100644 --- a/test/Common/TestActivities.cs +++ b/test/Common/TestActivities.cs @@ -30,6 +30,12 @@ public static long Multiply([ActivityTrigger] DurableActivityContext ctx) return a * b; } + public static long Add([ActivityTrigger] DurableActivityContext ctx) + { + (long a, long b) = ctx.GetInput<(long, long)>(); + return a + b; + } + public static string[] GetFileList([ActivityTrigger] DurableActivityContext ctx) { string directory = ctx.GetInput(); diff --git a/test/Common/TestOrchestrations.cs b/test/Common/TestOrchestrations.cs index e25a1d3e2..e3f7302b7 100644 --- a/test/Common/TestOrchestrations.cs +++ b/test/Common/TestOrchestrations.cs @@ -473,5 +473,26 @@ public static async Task Counter2([OrchestrationTrigger] DurableOrchestrati return "Done"; } + + public static async Task WaitForEventAndCallActivity( + [OrchestrationTrigger] DurableOrchestrationContext context) + { + int sum = 0; + + // Sums all of the inputs and intermediate sums + // If the 4 inputs are 0-4, then the calls are as following: + // 0 + (0 + 0) = 0 + // 0 + (0 + 1) = 1 + // 1 + (1 + 2) = 4 + // 4 + (4 + 3) = 11 + // 11 + (11 + 4) = 26 + for (int i = 0; i < 5; i++) + { + int number = await context.WaitForExternalEvent("add"); + sum += await context.CallActivityAsync("Add", (sum, number)); + } + + return sum; + } } }