Skip to content

Commit

Permalink
Save dropped events (#578)
Browse files Browse the repository at this point in the history
  • Loading branch information
glennamanns authored and cgillum committed Jan 22, 2019
1 parent f6f1dce commit 11463dd
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 13 deletions.
44 changes: 36 additions & 8 deletions src/WebJobs.Extensions.DurableTask/DurableOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public sealed class DurableOrchestrationContext : DurableOrchestrationContextBas
private readonly Dictionary<string, Stack> pendingExternalEvents =
new Dictionary<string, Stack>(StringComparer.OrdinalIgnoreCase);

private readonly Dictionary<string, Queue> bufferedExternalEvents =
new Dictionary<string, Queue>(StringComparer.OrdinalIgnoreCase);

private readonly DurableTaskExtension config;
private readonly string orchestrationName;
private readonly List<Func<Task>> deferredTasks;
Expand Down Expand Up @@ -276,6 +279,7 @@ public override Task<T> WaitForExternalEvent<T>(string name)
Stack taskCompletionSources;
TaskCompletionSource<T> tcs;

// Set up the stack for listening to external events
if (!this.pendingExternalEvents.TryGetValue(name, out taskCompletionSources))
{
tcs = new TaskCompletionSource<T>();
Expand All @@ -297,12 +301,28 @@ public override Task<T> WaitForExternalEvent<T>(string name)
}
}

this.config.TraceHelper.FunctionListening(
this.config.Options.HubName,
this.orchestrationName,
this.InstanceId,
reason: $"WaitForExternalEvent:{name}",
isReplay: this.innerContext.IsReplaying);
// Check the queue to see if any events came in before the orchestrator was listening
if (this.bufferedExternalEvents.TryGetValue(name, out Queue queue))
{
object input = queue.Dequeue();

if (queue.Count == 0)
{
this.bufferedExternalEvents.Remove(name);
}

// We can call raise event right away, since we already have an event's input
this.RaiseEvent(name, input.ToString());
}
else
{
this.config.TraceHelper.FunctionListening(
this.config.Options.HubName,
this.orchestrationName,
this.InstanceId,
reason: $"WaitForExternalEvent:{name}",
isReplay: this.innerContext.IsReplaying);
}

return tcs.Task;
}
Expand Down Expand Up @@ -527,8 +547,16 @@ internal void RaiseEvent(string name, string input)
}
else
{
// The orchestrator was not waiting for any event by this name, so the event will be dropped.
this.config.TraceHelper.ExternalEventDropped(
// Add the event to an (in-memory) queue, so we don't drop or lose it
if (!this.bufferedExternalEvents.TryGetValue(name, out Queue bufferedEvents))
{
bufferedEvents = new Queue();
this.bufferedExternalEvents[name] = bufferedEvents;
}

bufferedEvents.Enqueue(input);

this.config.TraceHelper.ExternalEventSaved(
this.HubName,
this.Name,
this.InstanceId,
Expand Down
6 changes: 3 additions & 3 deletions src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public void ExtensionWarningEvent(string hubName, string functionName, string in
}
}

public void ExternalEventDropped(
public void ExternalEventSaved(
string hubName,
string functionName,
string instanceId,
Expand All @@ -351,7 +351,7 @@ public void ExtensionWarningEvent(string hubName, string functionName, string in
{
FunctionType functionType = FunctionType.Orchestrator;

EtwEventSource.Instance.ExternalEventDropped(
EtwEventSource.Instance.ExternalEventSaved(
hubName,
LocalAppName,
LocalSlotName,
Expand All @@ -365,7 +365,7 @@ public void ExtensionWarningEvent(string hubName, string functionName, string in
if (this.ShouldLogEvent(isReplay: false))
{
this.logger.LogInformation(
"{instanceId}: Function '{functionName} ({functionType})' dropped a '{eventName}' event. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.",
"{instanceId}: Function '{functionName} ({functionType})' saved a '{eventName}' event to an in-memory queue. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.",
instanceId, functionName, functionType, eventName, FunctionState.ExternalEventDropped, hubName,
LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++);
}
Expand Down
4 changes: 2 additions & 2 deletions src/WebJobs.Extensions.DurableTask/EtwEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ private EtwEventSource()
this.WriteEvent(214, TaskHub, AppName, SlotName, FunctionName ?? string.Empty, InstanceId ?? string.Empty, Details, ExtensionVersion);
}

[Event(215, Level = EventLevel.Warning)]
public void ExternalEventDropped(
[Event(215, Level = EventLevel.Informational, Version = 2)]
public void ExternalEventSaved(
string TaskHub,
string AppName,
string SlotName,
Expand Down
35 changes: 35 additions & 0 deletions test/Common/DurableTaskEndToEndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,41 @@ public async Task ActivityWithRetry_NullRetryOptions()
}
}

/// <summary>
/// End-to-end test which validates that waiting for an external event and calling
/// an activity multiple times in a row does not lead to dropped events.
/// </summary>
[Fact]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
public async Task WaitForEventAndCallActivity_DroppedEventsTest()
{
using (JobHost host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.WaitForEventAndCallActivity_DroppedEventsTest),
enableExtendedSessions: false))
{
await host.StartAsync();

string orchestratorFunctionName = nameof(TestOrchestrations.WaitForEventAndCallActivity);
var client = await host.StartOrchestratorAsync(orchestratorFunctionName, null, this.output);

for (int i = 0; i < 5; i++)
{
await client.RaiseEventAsync("add", i, this.output);
}

var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60), this.output);

Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus);

int output = (int)status?.Output;
this.output.WriteLine($"Orchestration output string: {output}");
Assert.Equal(26, output);

await host.StopAsync();
}
}

/// <summary>
/// End-to-end test which runs a orchestrator function that calls a non-existent orchestrator function.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions test/Common/TestActivities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public static long Multiply([ActivityTrigger] DurableActivityContext ctx)
return a * b;
}

public static long Add([ActivityTrigger] DurableActivityContext ctx)
{
(long a, long b) = ctx.GetInput<(long, long)>();
return a + b;
}

public static string[] GetFileList([ActivityTrigger] DurableActivityContext ctx)
{
string directory = ctx.GetInput<string>();
Expand Down
21 changes: 21 additions & 0 deletions test/Common/TestOrchestrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -473,5 +473,26 @@ public static async Task<int> Counter2([OrchestrationTrigger] DurableOrchestrati

return "Done";
}

public static async Task<int> WaitForEventAndCallActivity(
[OrchestrationTrigger] DurableOrchestrationContext context)
{
int sum = 0;

// Sums all of the inputs and intermediate sums
// If the 4 inputs are 0-4, then the calls are as following:
// 0 + (0 + 0) = 0
// 0 + (0 + 1) = 1
// 1 + (1 + 2) = 4
// 4 + (4 + 3) = 11
// 11 + (11 + 4) = 26
for (int i = 0; i < 5; i++)
{
int number = await context.WaitForExternalEvent<int>("add");
sum += await context.CallActivityAsync<int>("Add", (sum, number));
}

return sum;
}
}
}

0 comments on commit 11463dd

Please sign in to comment.