diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/AzureStorageCheckpointLeaseManager.cs b/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/AzureStorageCheckpointLeaseManager.cs index 6ca1883ed1ab..d04a37e69d5e 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/AzureStorageCheckpointLeaseManager.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/AzureStorageCheckpointLeaseManager.cs @@ -166,12 +166,12 @@ public Task DeleteCheckpointAsync(string partitionId) public Task LeaseStoreExistsAsync() { - return this.eventHubContainer.ExistsAsync(this.defaultRequestOptions, this.operationContext); + return this.eventHubContainer.ExistsAsync(null, this.operationContext); } public Task CreateLeaseStoreIfNotExistsAsync() { - return this.eventHubContainer.CreateIfNotExistsAsync(this.defaultRequestOptions, this.operationContext); + return this.eventHubContainer.CreateIfNotExistsAsync(null, this.operationContext); } public async Task DeleteLeaseStoreAsync() diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionManager.cs b/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionManager.cs index 6ee62dc51ad6..9713208585a1 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionManager.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionManager.cs @@ -90,6 +90,10 @@ async Task RunAsync() { await this.RunLoopAsync(this.cancellationTokenSource.Token).ConfigureAwait(false); } + catch (TaskCanceledException) when (this.cancellationTokenSource.IsCancellationRequested) + { + // Expected during host shutdown. + } catch (Exception e) { // Ideally RunLoop should never throw. @@ -249,7 +253,7 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception try { allLeases[subjectLease.PartitionId] = subjectLease; - if (subjectLease.Owner == this.host.HostName) + if (subjectLease.Owner == this.host.HostName && !(await subjectLease.IsExpired().ConfigureAwait(false))) { ourLeaseCount++; @@ -307,54 +311,7 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished."); // Check any expired leases that we can grab here. - var expiredLeaseTasks = new List(); - foreach (var possibleLease in allLeases.Values) - { - var subjectLease = possibleLease; - - if (await subjectLease.IsExpired().ConfigureAwait(false)) - { - expiredLeaseTasks.Add(Task.Run(async () => - { - try - { - // Get fresh content of lease subject to acquire. - var downloadedLease = await leaseManager.GetLeaseAsync(subjectLease.PartitionId).ConfigureAwait(false); - allLeases[subjectLease.PartitionId] = downloadedLease; - - // Check expired once more here incase another host have already leased this since we populated the list. - if (await downloadedLease.IsExpired().ConfigureAwait(false)) - { - ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Trying to acquire lease."); - if (await leaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false)) - { - ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Acquired lease."); - leasesOwnedByOthers.TryRemove(downloadedLease.PartitionId, out var removedLease); - Interlocked.Increment(ref ourLeaseCount); - } - else - { - // Acquisition failed. Make sure we don't leave the lease as owned. - allLeases[subjectLease.PartitionId].Owner = null; - - ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName, - "Failed to acquire lease for partition " + downloadedLease.PartitionId, null); - } - } - } - catch (Exception e) - { - ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, subjectLease.PartitionId, "Failure during acquiring lease", e.ToString()); - this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases); - - // Acquisition failed. Make sure we don't leave the lease as owned. - allLeases[subjectLease.PartitionId].Owner = null; - } - }, cancellationToken)); - } - } - - await Task.WhenAll(expiredLeaseTasks).ConfigureAwait(false); + ourLeaseCount += await this.AcquireExpiredLeasesAsync(allLeases, leasesOwnedByOthers, ourLeaseCount, cancellationToken); ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Expired lease check is finished."); // Grab more leases if available and needed for load balancing @@ -476,6 +433,88 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception } } + async Task AcquireExpiredLeasesAsync( + ConcurrentDictionary allLeases, + ConcurrentDictionary leasesOwnedByOthers, + int ownedCount, + CancellationToken cancellationToken) + { + var totalAcquired = 0; + var hosts = new List(); + + // Find distinct hosts actively owning leases. + foreach (var lease in allLeases.Values) + { + if (lease.Owner != null && !hosts.Contains(lease.Owner) && !(await lease.IsExpired().ConfigureAwait(false))) + { + hosts.Add(lease.Owner); + } + } + + // Calculate how many more leases we can own. + var hostCount = hosts.Count() == 0 ? 1 : hosts.Count(); + var targetLeaseCount = (int)Math.Ceiling((double)allLeases.Count / (double)hostCount) - ownedCount; + + // Attempt to acquire expired leases now up to allowed target lease count. + var tasks = new List(); + foreach (var possibleLease in allLeases.Values) + { + // Break if we already acquired enough number of leases. + if (targetLeaseCount <= 0) + { + break; + } + + var subjectLease = possibleLease; + + if (await subjectLease.IsExpired().ConfigureAwait(false)) + { + targetLeaseCount--; + tasks.Add(Task.Run(async () => + { + try + { + // Get fresh content of lease subject to acquire. + var downloadedLease = await this.host.LeaseManager.GetLeaseAsync(subjectLease.PartitionId).ConfigureAwait(false); + allLeases[subjectLease.PartitionId] = downloadedLease; + + // Check expired once more here incase another host have already leased this since we populated the list. + if (await downloadedLease.IsExpired().ConfigureAwait(false)) + { + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Trying to acquire lease."); + if (await this.host.LeaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false)) + { + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Acquired lease."); + leasesOwnedByOthers.TryRemove(downloadedLease.PartitionId, out var removedLease); + Interlocked.Increment(ref totalAcquired); + } + else + { + // Acquisition failed. Make sure we don't leave the lease as owned. + allLeases[subjectLease.PartitionId].Owner = null; + + ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName, + "Failed to acquire lease for partition " + downloadedLease.PartitionId, null); + } + } + } + catch (Exception e) + { + ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, subjectLease.PartitionId, "Failure during acquiring lease", e.ToString()); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases); + + // Acquisition failed. Make sure we don't leave the lease as owned. + allLeases[subjectLease.PartitionId].Owner = null; + } + }, cancellationToken)); + } + } + + await Task.WhenAll(tasks).ConfigureAwait(false); + + return totalAcquired; + } + async Task CheckAndAddPumpAsync(string partitionId, Lease lease) { PartitionPump capturedPump; diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionPump.cs b/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionPump.cs index e72530493276..76072d9de8e8 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionPump.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionPump.cs @@ -94,6 +94,23 @@ public async Task CloseAsync(CloseReason reason) { ProcessorEventSource.Log.PartitionPumpCloseStart(this.Host.HostName, this.PartitionContext.PartitionId, reason.ToString()); this.PumpStatus = PartitionPumpStatus.Closing; + + // Release lease as the first thing since closing receiver can take up to operation timeout. + // This helps other available hosts discover lease available sooner. + if (reason != CloseReason.LeaseLost) + { + // Since this pump is dead, release the lease. + try + { + await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false); + } + catch (Exception e) + { + // Log and ignore any failure since expired lease will be picked by another host. + this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease); + } + } + try { this.cancellationTokenSource.Cancel(); @@ -121,20 +138,6 @@ public async Task CloseAsync(CloseReason reason) this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, "Closing Event Processor"); } - if (reason != CloseReason.LeaseLost) - { - // Since this pump is dead, release the lease. - try - { - await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false); - } - catch (Exception e) - { - // Log and ignore any failure since expired lease will be picked by another host. - this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease); - } - } - this.PumpStatus = PartitionPumpStatus.Closed; ProcessorEventSource.Log.PartitionPumpCloseStop(this.Host.HostName, this.PartitionContext.PartitionId); } diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Client/ReceiverTests.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Client/ReceiverTests.cs index da37d0921bc0..27ff0100a77f 100755 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Client/ReceiverTests.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Client/ReceiverTests.cs @@ -549,19 +549,19 @@ public async Task CreateEpochReceiverAfterNonEpochReceiver() try { TestUtility.Log("Starting nonepoch receiver"); - await nonEpochReceiver.ReceiveAsync(10); + await nonEpochReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10)); await Task.Delay(TimeSpan.FromSeconds(10)); TestUtility.Log("Starting epoch receiver"); - await epochReceiver.ReceiveAsync(10); + await epochReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10)); await Task.Delay(TimeSpan.FromSeconds(10)); try { TestUtility.Log("Restarting nonepoch receiver, this should fail"); - await nonEpochReceiver.ReceiveAsync(10); + await nonEpochReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10)); throw new InvalidOperationException("Non-Epoch receiver should have encountered an exception by now!"); } catch (ReceiverDisconnectedException ex) when (ex.Message.Contains("non-epoch receiver is not allowed")) @@ -640,7 +640,7 @@ public async Task ReceiverIdentifier() }); // Issue a receive call so link will become active. - await newReceiver.ReceiveAsync(10); + await newReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10)); receivers.Add(newReceiver); } @@ -648,7 +648,7 @@ public async Task ReceiverIdentifier() { // Attempt to create 6th receiver. This should fail. var failReceiver = ehClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, "1", EventPosition.FromStart()); - await failReceiver.ReceiveAsync(10); + await failReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10)); throw new InvalidOperationException("6th receiver should have encountered QuotaExceededException."); } catch (QuotaExceededException ex) diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorNegativeCases.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorNegativeCases.cs index 1bc037a0816c..710ea906c5b3 100755 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorNegativeCases.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorNegativeCases.cs @@ -10,7 +10,6 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor public class NegativeCases : ProcessorTestBase { - [Fact] [LiveTest] [DisplayTestMethodName] diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTestBase.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTestBase.cs index 74cd6e7b846f..3c08df6acafc 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTestBase.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTestBase.cs @@ -7,1124 +7,22 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; - using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.EventHubs.Primitives; using Microsoft.Azure.EventHubs.Processor; - using Microsoft.IdentityModel.Clients.ActiveDirectory; - using Microsoft.WindowsAzure.Storage; using Xunit; public class ProcessorTestBase { - - public string[] GetPartitionIds(string connectionString) + protected string[] GetPartitionIds(string connectionString) { var ehClient = EventHubClient.CreateFromConnectionString(connectionString); var eventHubInfo = ehClient.GetRuntimeInformationAsync().WaitAndUnwrapException(); return eventHubInfo.PartitionIds; } - /// - /// Validating cases where entity path is provided through eventHubPath and EH connection string parameters - /// on the EPH constructor. - /// - [Fact] - [LiveTest] - [DisplayTestMethodName] - public void ProcessorHostEntityPathSetting() - { - var connectionString = TestUtility.BuildEventHubsConnectionString("dimmyeventhubname"); - var csb = new EventHubsConnectionStringBuilder(connectionString) - { - EntityPath = "myeh" - }; - - // Entity path provided in the connection string. - TestUtility.Log("Testing condition: Entity path provided in the connection string only."); - var eventProcessorHost = new EventProcessorHost( - null, - PartitionReceiver.DefaultConsumerGroupName, - csb.ToString(), - TestUtility.StorageConnectionString, - "dimmyeventhubname".ToLower()); - Assert.Equal("myeh", eventProcessorHost.EventHubPath); - - // Entity path provided in the eventHubPath parameter. - TestUtility.Log("Testing condition: Entity path provided in the eventHubPath only."); - csb.EntityPath = null; - eventProcessorHost = new EventProcessorHost( - "myeh2", - PartitionReceiver.DefaultConsumerGroupName, - csb.ToString(), - TestUtility.StorageConnectionString, - "dimmyeventhubname".ToLower()); - Assert.Equal("myeh2", eventProcessorHost.EventHubPath); - - // The same entity path provided in both eventHubPath parameter and the connection string. - TestUtility.Log("Testing condition: The same entity path provided in the eventHubPath and connection string."); - csb.EntityPath = "mYeH"; - eventProcessorHost = new EventProcessorHost( - "myeh", - PartitionReceiver.DefaultConsumerGroupName, - csb.ToString(), - TestUtility.StorageConnectionString, - "dimmyeventhubname".ToLower()); - Assert.Equal("myeh", eventProcessorHost.EventHubPath); - - // Entity path not provided in both eventHubPath and the connection string. - TestUtility.Log("Testing condition: Entity path not provided in both eventHubPath and connection string."); - try - { - csb.EntityPath = null; - new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - csb.ToString(), - TestUtility.StorageConnectionString, - "dimmyeventhubname".ToLower()); - throw new Exception("Entity path wasn't provided and this new call was supposed to fail"); - } - catch (ArgumentException) - { - TestUtility.Log("Caught ArgumentException as expected."); - } - - // Entity path conflict. - TestUtility.Log("Testing condition: Entity path conflict."); - try - { - csb.EntityPath = "myeh"; - new EventProcessorHost( - "myeh2", - PartitionReceiver.DefaultConsumerGroupName, - csb.ToString(), - TestUtility.StorageConnectionString, - "dimmyeventhubname".ToLower()); - throw new Exception("Entity path values conflict and this new call was supposed to fail"); - } - catch (ArgumentException) - { - TestUtility.Log("Caught ArgumentException as expected."); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task SingleProcessorHost() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - var eventProcessorHost = new EventProcessorHost( - null, // Entity path will be picked from connection string. - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - var epo = await GetOptionsAsync(connectionString); - await RunGenericScenario(eventProcessorHost, epo); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task MultipleProcessorHosts() - { - await using (var scope = await EventHubScope.CreateAsync(3)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - string[] PartitionIds = GetPartitionIds(connectionString); - int hostCount = 3; - - TestUtility.Log($"Testing with {hostCount} EventProcessorHost instances"); - - // Prepare partition trackers. - var partitionReceiveEvents = new ConcurrentDictionary(); - foreach (var partitionId in PartitionIds) - { - partitionReceiveEvents[partitionId] = new AsyncAutoResetEvent(false); - } - - // Prepare host trackers. - var hostReceiveEvents = new ConcurrentDictionary(); - - var containerName = Guid.NewGuid().ToString(); - var hosts = new List(); - - try - { - for (int hostId = 0; hostId < hostCount; hostId++) - { - var thisHostName = $"host-{hostId}"; - hostReceiveEvents[thisHostName] = new AsyncAutoResetEvent(false); - - TestUtility.Log("Creating EventProcessorHost"); - var eventProcessorHost = new EventProcessorHost( - thisHostName, - string.Empty, // Passing empty as entity path here since path is already in EH connection string. - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - containerName); - hosts.Add(eventProcessorHost); - TestUtility.Log($"Calling RegisterEventProcessorAsync"); - var processorOptions = new EventProcessorOptions - { - ReceiveTimeout = TimeSpan.FromSeconds(10), - InvokeProcessorAfterReceiveTimeout = true, - MaxBatchSize = 100, - InitialOffsetProvider = pId => EventPosition.FromEnqueuedTime(DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(60))) - }; - - var processorFactory = new TestEventProcessorFactory(); - processorFactory.OnCreateProcessor += (f, createArgs) => - { - var processor = createArgs.Item2; - string partitionId = createArgs.Item1.PartitionId; - string hostName = createArgs.Item1.Owner; - processor.OnOpen += (_, partitionContext) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor opened"); - processor.OnClose += (_, closeArgs) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); - processor.OnProcessError += (_, errorArgs) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); - processor.OnProcessEvents += (_, eventsArgs) => - { - int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; - if (eventCount > 0) - { - TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); - partitionReceiveEvents[partitionId].Set(); - hostReceiveEvents[hostName].Set(); - } - }; - }; - - await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); - } - - // Allow some time for each host to own at least 1 partition. - // Partition stealing logic balances partition ownership one at a time. - TestUtility.Log("Waiting for partition ownership to settle..."); - await Task.Delay(TimeSpan.FromSeconds(60)); - - TestUtility.Log("Sending an event to each partition"); - var ehClient = EventHubClient.CreateFromConnectionString(connectionString); - var sendTasks = new List(); - foreach (var partitionId in PartitionIds) - { - sendTasks.Add(TestUtility.SendToPartitionAsync(ehClient, partitionId, $"{partitionId} event.")); - } - await Task.WhenAll(sendTasks); - - TestUtility.Log("Verifying an event was received by each partition"); - foreach (var e in partitionReceiveEvents) - { - bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30)); - Assert.True(ret, $"Partition {e.Key} didn't receive any message!"); - } - - TestUtility.Log("Verifying at least an event was received by each host"); - foreach (var e in hostReceiveEvents) - { - bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30)); - Assert.True(ret, $"Host {e.Key} didn't receive any message!"); - } - } - finally - { - var shutdownTasks = new List(); - foreach (var host in hosts) - { - TestUtility.Log($"Host {host} Calling UnregisterEventProcessorAsync."); - shutdownTasks.Add(host.UnregisterEventProcessorAsync()); - } - - await Task.WhenAll(shutdownTasks); - } - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task WithBlobPrefix() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - string leaseContainerName = Guid.NewGuid().ToString(); - var epo = await GetOptionsAsync(connectionString); - - // Consume all messages with first host. - // Create host with 'firsthost' prefix. - var eventProcessorHostFirst = new EventProcessorHost( - "host1", - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName, - "firsthost"); - var runResult1 = await RunGenericScenario(eventProcessorHostFirst, epo); - - // Consume all messages with second host. - // Create host with 'secondhost' prefix. - // Although on the same lease container, this second host should receive exactly the same set of messages - // as the first host. - var eventProcessorHostSecond = new EventProcessorHost( - "host2", - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName, - "secondhost"); - var runResult2 = await RunGenericScenario(eventProcessorHostSecond, epo, numberOfEventsToSendPerPartition: 0); - - // Confirm that we are looking at 2 identical sets of messages in the end. - foreach (var kvp in runResult1.ReceivedEvents) - { - Assert.True(kvp.Value.Count() == runResult2.ReceivedEvents[kvp.Key].Count, - $"The sets of messages returned from first host and the second host are different for partition {kvp.Key}."); - } - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task InvokeAfterReceiveTimeoutTrue() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - string[] PartitionIds = GetPartitionIds(connectionString); - const int ReceiveTimeoutInSeconds = 15; - - TestUtility.Log("Testing EventProcessorHost with InvokeProcessorAfterReceiveTimeout=true"); - - var emptyBatchReceiveEvents = new ConcurrentDictionary(); - foreach (var partitionId in PartitionIds) - { - emptyBatchReceiveEvents[partitionId] = new AsyncAutoResetEvent(false); - } - - var eventProcessorHost = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - var processorOptions = new EventProcessorOptions - { - ReceiveTimeout = TimeSpan.FromSeconds(ReceiveTimeoutInSeconds), - InvokeProcessorAfterReceiveTimeout = true, - InitialOffsetProvider = pId => EventPosition.FromEnd() - }; - - var processorFactory = new TestEventProcessorFactory(); - processorFactory.OnCreateProcessor += (f, createArgs) => - { - var processor = createArgs.Item2; - string partitionId = createArgs.Item1.PartitionId; - processor.OnOpen += (_, partitionContext) => TestUtility.Log($"Partition {partitionId} TestEventProcessor opened"); - processor.OnProcessEvents += (_, eventsArgs) => - { - int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; - TestUtility.Log($"Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); - if (eventCount == 0) - { - var emptyBatchReceiveEvent = emptyBatchReceiveEvents[partitionId]; - emptyBatchReceiveEvent.Set(); - } - }; - }; - - await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); - try - { - TestUtility.Log("Waiting for each partition to receive an empty batch of events..."); - foreach (var partitionId in PartitionIds) - { - var emptyBatchReceiveEvent = emptyBatchReceiveEvents[partitionId]; - bool emptyBatchReceived = await emptyBatchReceiveEvent.WaitAsync(TimeSpan.FromSeconds(ReceiveTimeoutInSeconds * 2)); - Assert.True(emptyBatchReceived, $"Partition {partitionId} didn't receive an empty batch!"); - } - } - finally - { - TestUtility.Log("Calling UnregisterEventProcessorAsync"); - await eventProcessorHost.UnregisterEventProcessorAsync(); - } - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task InvokeAfterReceiveTimeoutFalse() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - const int ReceiveTimeoutInSeconds = 15; - - TestUtility.Log("Calling RegisterEventProcessorAsync with InvokeProcessorAfterReceiveTimeout=false"); - - var eventProcessorHost = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - scope.EventHubName.ToLower()); - - var processorOptions = new EventProcessorOptions - { - ReceiveTimeout = TimeSpan.FromSeconds(ReceiveTimeoutInSeconds), - InvokeProcessorAfterReceiveTimeout = false, - MaxBatchSize = 100 - }; - - var emptyBatchReceiveEvent = new AsyncAutoResetEvent(false); - var processorFactory = new TestEventProcessorFactory(); - processorFactory.OnCreateProcessor += (f, createArgs) => - { - var processor = createArgs.Item2; - string partitionId = createArgs.Item1.PartitionId; - processor.OnProcessEvents += (_, eventsArgs) => - { - int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; - TestUtility.Log($"Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); - if (eventCount == 0) - { - emptyBatchReceiveEvent.Set(); - } - }; - }; - - await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); - try - { - TestUtility.Log("Verifying no empty batches arrive..."); - bool waitSucceeded = await emptyBatchReceiveEvent.WaitAsync(TimeSpan.FromSeconds(ReceiveTimeoutInSeconds * 2)); - Assert.False(waitSucceeded, "No empty batch should have been received!"); - } - finally - { - TestUtility.Log("Calling UnregisterEventProcessorAsync"); - await eventProcessorHost.UnregisterEventProcessorAsync(); - } - } - } - - /// - /// This test requires a eventhub with consumer groups $Default and cgroup1. - /// - /// - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task NonDefaultConsumerGroup() - { - var consumerGroups = new[] - { - "notdefault" - }; - await using (var scope = await EventHubScope.CreateAsync(2, consumerGroups)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - var epo = await GetOptionsAsync(connectionString); - var a = Guid.NewGuid().ToString(); - // Run on non-default consumer group - var eventProcessorHost = new EventProcessorHost( - null, // Entity path will be picked from connection string. - scope.ConsumerGroups[0], - connectionString, - TestUtility.StorageConnectionString, - a); - - await RunGenericScenario(eventProcessorHost, epo); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task InitialOffsetProviderWithDateTime() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - // Send and receive single message so we can find out enqueue date-time of the last message. - var partitions = await DiscoverEndOfStream(connectionString); - - // We will use last enqueued message's enqueue date-time so EPH will pick messages only after that point. - var lastEnqueueDateTime = partitions.Max(le => le.Value.Item2); - TestUtility.Log($"Last message enqueued at {lastEnqueueDateTime}"); - - // Use a randomly generated container name so that initial offset provider will be respected. - var eventProcessorHost = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - var processorOptions = new EventProcessorOptions - { - ReceiveTimeout = TimeSpan.FromSeconds(15), - InitialOffsetProvider = partitionId => EventPosition.FromEnqueuedTime(lastEnqueueDateTime), - MaxBatchSize = 100 - }; - - var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); - - // We should have received only 1 event from each partition. - Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task InitialOffsetProviderWithOffset() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - // Send and receive single message so we can find out offset of the last message. - var partitions = await DiscoverEndOfStream(connectionString); - TestUtility.Log("Discovered last event offsets on each partition as below:"); - foreach (var p in partitions) - { - TestUtility.Log($"Partition {p.Key}: {p.Value.Item1}"); - } - - // Use a randomly generated container name so that initial offset provider will be respected. - var eventProcessorHost = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - var processorOptions = new EventProcessorOptions - { - ReceiveTimeout = TimeSpan.FromSeconds(15), - InitialOffsetProvider = partitionId => EventPosition.FromOffset(partitions[partitionId].Item1), - MaxBatchSize = 100 - }; - - var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); - - // We should have received only 1 event from each partition. - Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task InitialOffsetProviderWithEndOfStream() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - // Use a randomly generated container name so that initial offset provider will be respected. - var eventProcessorHost = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - var processorOptions = new EventProcessorOptions - { - ReceiveTimeout = TimeSpan.FromSeconds(15), - InitialOffsetProvider = partitionId => EventPosition.FromEnd(), - MaxBatchSize = 100 - }; - - var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); - - // We should have received only 1 event from each partition. - Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task InitialOffsetProviderOverrideBehavior() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - // Generate a new lease container name that will be used through out the test. - string leaseContainerName = Guid.NewGuid().ToString(); - TestUtility.Log($"Using lease container {leaseContainerName}"); - - var epo = await GetOptionsAsync(connectionString); - - // First host will send and receive as usual. - var eventProcessorHost = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName); - await this.RunGenericScenario(eventProcessorHost, epo); - - // Second host will use an initial offset provider. - // Since we are still on the same lease container, initial offset provider shouldn't rule. - // We should continue receiving where we left instead if start-of-stream where initial offset provider dictates. - eventProcessorHost = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName); - var processorOptions = new EventProcessorOptions - { - ReceiveTimeout = TimeSpan.FromSeconds(15), - InitialOffsetProvider = partitionId => EventPosition.FromStart(), - MaxBatchSize = 100 - }; - - var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions, checkpointLastEvent: false); - - // We should have received only 1 event from each partition. - Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task CheckpointEventDataShouldHold() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - // Generate a new lease container name that will use through out the test. - string leaseContainerName = Guid.NewGuid().ToString(); - - var epo = await GetOptionsAsync(connectionString); - - // Consume all messages with first host. - var eventProcessorHostFirst = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName); - await RunGenericScenario(eventProcessorHostFirst, epo); - - // For the second time we initiate a host and this time it should pick from where the previous host left. - // In other words, it shouldn't start receiving from start of the stream. - var eventProcessorHostSecond = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName); - var runResult = await RunGenericScenario(eventProcessorHostSecond); - - // We should have received only 1 event from each partition. - Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task CheckpointBatchShouldHold() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - // Generate a new lease container name that will use through out the test. - string leaseContainerName = Guid.NewGuid().ToString(); - - var epo = await GetOptionsAsync(connectionString); - - // Consume all messages with first host. - var eventProcessorHostFirst = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName); - await RunGenericScenario(eventProcessorHostFirst, epo, checkpointLastEvent: false, checkpointBatch: true); - - // For the second time we initiate a host and this time it should pick from where the previous host left. - // In other words, it shouldn't start receiving from start of the stream. - var eventProcessorHostSecond = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName); - var runResult = await RunGenericScenario(eventProcessorHostSecond, epo); - - // We should have received only 1 event from each partition. - Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task HostShouldRecoverAfterReceiverDisconnection() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - string[] PartitionIds = GetPartitionIds(connectionString); - // We will target one partition and do validation on it. - var targetPartition = PartitionIds.First(); - int targetPartitionOpens = 0; - int targetPartitionCloses = 0; - int targetPartitionErrors = 0; - PartitionReceiver externalReceiver = null; - - var eventProcessorHost = new EventProcessorHost( - "ephhost", - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - try - { - var processorFactory = new TestEventProcessorFactory(); - - processorFactory.OnCreateProcessor += (f, createArgs) => - { - var processor = createArgs.Item2; - string partitionId = createArgs.Item1.PartitionId; - string hostName = createArgs.Item1.Owner; - processor.OnOpen += (_, partitionContext) => - { - TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor opened"); - if (partitionId == targetPartition) - { - Interlocked.Increment(ref targetPartitionOpens); - } - }; - processor.OnClose += (_, closeArgs) => - { - TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); - if (partitionId == targetPartition && closeArgs.Item2 == CloseReason.Shutdown) - { - Interlocked.Increment(ref targetPartitionCloses); - } - }; - processor.OnProcessError += (_, errorArgs) => - { - TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); - if (partitionId == targetPartition && errorArgs.Item2 is ReceiverDisconnectedException) - { - Interlocked.Increment(ref targetPartitionErrors); - } - }; - }; - - var epo = EventProcessorOptions.DefaultOptions; - epo.ReceiveTimeout = TimeSpan.FromSeconds(10); - await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo); - - // Wait 15 seconds then create a new epoch receiver. - // This will trigger ReceiverDisconnectedExcetion in the host. - await Task.Delay(15000); - - TestUtility.Log("Creating a new receiver with epoch 2. This will trigger ReceiverDisconnectedException in the host."); - var ehClient = EventHubClient.CreateFromConnectionString(connectionString); - externalReceiver = ehClient.CreateEpochReceiver(PartitionReceiver.DefaultConsumerGroupName, - targetPartition, EventPosition.FromStart(), 2); - await externalReceiver.ReceiveAsync(100, TimeSpan.FromSeconds(5)); - - // Give another 1 minute for host to recover then do the validations. - await Task.Delay(60000); - - TestUtility.Log("Verifying that host was able to receive ReceiverDisconnectedException"); - Assert.True(targetPartitionErrors == 1, $"Host received {targetPartitionErrors} ReceiverDisconnectedExceptions!"); - - TestUtility.Log("Verifying that host was able to reopen the partition"); - Assert.True(targetPartitionOpens == 2, $"Host opened target partition {targetPartitionOpens} times!"); - - TestUtility.Log("Verifying that host notified by close"); - Assert.True(targetPartitionCloses == 1, $"Host closed target partition {targetPartitionCloses} times!"); - } - finally - { - TestUtility.Log("Calling UnregisterEventProcessorAsync"); - await eventProcessorHost.UnregisterEventProcessorAsync(); - - if (externalReceiver != null) - { - await externalReceiver.CloseAsync(); - } - } - } - } - - /// - /// If a host doesn't checkpoint on the processed events and shuts down, new host should start processing from the beginning. - /// - /// - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task NoCheckpointThenNewHostReadsFromStart() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - // Generate a new lease container name that will be used through out the test. - string leaseContainerName = Guid.NewGuid().ToString(); - - var epo = await GetOptionsAsync(connectionString); - - // Consume all messages with first host. - var eventProcessorHostFirst = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName); - var runResult1 = await RunGenericScenario(eventProcessorHostFirst, epo, checkpointLastEvent: false); - var totalEventsFromFirstHost = runResult1.ReceivedEvents.Sum(part => part.Value.Count); - - // Second time we initiate a host, it should receive exactly the same number of evets. - var eventProcessorHostSecond = new EventProcessorHost( - string.Empty, - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - leaseContainerName); - var runResult2 = await RunGenericScenario(eventProcessorHostSecond, epo, 0); - var totalEventsFromSecondHost = runResult2.ReceivedEvents.Sum(part => part.Value.Count); - - // Second host should have received the same number of events as the first host. - Assert.True(totalEventsFromFirstHost == totalEventsFromSecondHost, - $"Second host received {totalEventsFromSecondHost} events where as first host receive {totalEventsFromFirstHost} events."); - } - } - - /// - /// Checkpointing every message received should be Ok. No failures expected. - /// - /// - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task CheckpointEveryMessageReceived() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - var epo = await GetOptionsAsync(connectionString); - - var eventProcessorHost = new EventProcessorHost( - null, // Entity path will be picked from connection string. - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - var runResult = await RunGenericScenario(eventProcessorHost, epo, numberOfEventsToSendPerPartition: 10, - checkpointLastEvent: false, checkpoingEveryEvent: true); - - // Validate there were not failures. - Assert.True(runResult.NumberOfFailures == 0, $"RunResult returned with {runResult.NumberOfFailures} failures!"); - } - } - - /// - /// While processing events one event causes a failure. Host should be able to recover any error. - /// - /// - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task HostShouldRecoverWhenProcessEventsAsyncThrows() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - var client = EventHubClient.CreateFromConnectionString(connectionString); - var eventHubInfo = client.GetRuntimeInformationAsync().WaitAndUnwrapException(); - string[] PartitionIds = eventHubInfo.PartitionIds; - var lastReceivedAt = DateTime.Now; - var lastReceivedAtLock = new object(); - var poisonMessageReceived = false; - var poisonMessageProperty = "poison"; - var processorFactory = new TestEventProcessorFactory(); - var receivedEventCounts = new ConcurrentDictionary(); - - var eventProcessorHost = new EventProcessorHost( - null, // Entity path will be picked from connection string. - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - processorFactory.OnCreateProcessor += (f, createArgs) => - { - var processor = createArgs.Item2; - string partitionId = createArgs.Item1.PartitionId; - string hostName = createArgs.Item1.Owner; - string consumerGroupName = createArgs.Item1.ConsumerGroupName; - processor.OnOpen += (_, partitionContext) => TestUtility.Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor opened"); - processor.OnClose += (_, closeArgs) => TestUtility.Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); - processor.OnProcessError += (_, errorArgs) => - { - TestUtility.Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); - - // Throw once more here depending on where we are at exception sequence. - if (errorArgs.Item2.Message.Contains("ExceptionSequence1")) - { - throw new Exception("ExceptionSequence2"); - } - }; - processor.OnProcessEvents += (_, eventsArgs) => - { - int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; - TestUtility.Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); - if (eventCount > 0) - { - lock (lastReceivedAtLock) - { - lastReceivedAt = DateTime.Now; - } - - foreach (var e in eventsArgs.Item2.events) - { - // If this is poisoned event then throw. - if (!poisonMessageReceived && e.Properties.ContainsKey(poisonMessageProperty)) - { - poisonMessageReceived = true; - TestUtility.Log($"Received poisoned message from partition {partitionId}"); - throw new Exception("ExceptionSequence1"); - } - - // Track received events so we can validate at the end. - if (!receivedEventCounts.ContainsKey(partitionId)) - { - receivedEventCounts[partitionId] = 0; - } - - receivedEventCounts[partitionId]++; - } - } - }; - }; - - try - { - TestUtility.Log("Registering processorFactory..."); - var epo = new EventProcessorOptions() - { - MaxBatchSize = 100, - InitialOffsetProvider = pId => EventPosition.FromEnqueuedTime(DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(60))) - }; - await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo); - - TestUtility.Log("Waiting for partition ownership to settle..."); - await Task.Delay(TimeSpan.FromSeconds(5)); - - - - // Send first set of messages. - TestUtility.Log("Sending an event to each partition as the first set of messages."); - var sendTasks = new List(); - foreach (var partitionId in PartitionIds) - { - sendTasks.Add(TestUtility.SendToPartitionAsync(client, partitionId, $"{partitionId} event.")); - } - await Task.WhenAll(sendTasks); - - // Now send 1 poisoned message. This will fail one of the partition pumps. - TestUtility.Log($"Sending a poison event to partition {PartitionIds.First()}"); - var pSender = client.CreatePartitionSender(PartitionIds.First()); - var ed = new EventData(Encoding.UTF8.GetBytes("This is poison message")); - ed.Properties[poisonMessageProperty] = true; - await pSender.SendAsync(ed); - - // Wait sometime. The host should fail and then recever during this time. - await Task.Delay(30000); - - // Send second set of messages. - TestUtility.Log("Sending an event to each partition as the second set of messages."); - sendTasks.Clear(); - foreach (var partitionId in PartitionIds) - { - sendTasks.Add(TestUtility.SendToPartitionAsync(client, partitionId, $"{partitionId} event.")); - } - await Task.WhenAll(sendTasks); - - TestUtility.Log("Waiting until hosts are idle, i.e. no more messages to receive."); - while (lastReceivedAt > DateTime.Now.AddSeconds(-60)) - { - await Task.Delay(1000); - } - - TestUtility.Log("Verifying poison message was received"); - Assert.True(poisonMessageReceived, "Didn't receive poison message!"); - - TestUtility.Log("Verifying received events by each partition"); - foreach (var partitionId in PartitionIds) - { - if (!receivedEventCounts.ContainsKey(partitionId)) - { - throw new Exception($"Partition {partitionId} didn't receive any messages!"); - } - - var receivedEventCount = receivedEventCounts[partitionId]; - Assert.True(receivedEventCount >= 2, $"Partition {partitionId} received {receivedEventCount} where as at least 2 expected!"); - } - } - finally - { - TestUtility.Log("Calling UnregisterEventProcessorAsync."); - await eventProcessorHost.UnregisterEventProcessorAsync(); - } - } - } - - /// - /// This test is for manual only purpose. Fill in the tenant-id, app-id and app-secret before running. - /// - [Fact(Skip = "Manual run only")] - [LiveTest] - [DisplayTestMethodName] - public async Task SingleProcessorHostWithAadTokenProvider() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - var appAuthority = ""; - var aadAppId = ""; - var aadAppSecret = ""; - - AzureActiveDirectoryTokenProvider.AuthenticationCallback authCallback = - async (audience, authority, state) => - { - var authContext = new AuthenticationContext(authority); - var cc = new ClientCredential(aadAppId, aadAppSecret); - var authResult = await authContext.AcquireTokenAsync(audience, cc); - return authResult.AccessToken; - }; - - var tokenProvider = TokenProvider.CreateAzureActiveDirectoryTokenProvider(authCallback, appAuthority); - var epo = await GetOptionsAsync(connectionString); - var csb = new EventHubsConnectionStringBuilder(connectionString); - - var eventProcessorHost = new EventProcessorHost( - csb.Endpoint, - csb.EntityPath, - PartitionReceiver.DefaultConsumerGroupName, - tokenProvider, - CloudStorageAccount.Parse(TestUtility.StorageConnectionString), - Guid.NewGuid().ToString()); - - await RunGenericScenario(eventProcessorHost, epo); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task ReRegister() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - var eventProcessorHost = new EventProcessorHost( - null, // Entity path will be picked from connection string. - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - // Calling register for the first time should succeed. - TestUtility.Log("Registering EventProcessorHost for the first time."); - await eventProcessorHost.RegisterEventProcessorAsync(); - - // Unregister event processor should succed - TestUtility.Log("Registering EventProcessorHost for the first time."); - await eventProcessorHost.UnregisterEventProcessorAsync(); - - var epo = await GetOptionsAsync(connectionString); - - // Run a generic scenario with TestEventProcessor instead - await RunGenericScenario(eventProcessorHost, epo); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task ReRegisterAfterLeaseExpiry() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - var hostName = Guid.NewGuid().ToString(); - - var processorOptions = new EventProcessorOptions - { - InitialOffsetProvider = pId => EventPosition.FromEnd() - }; - - var eventProcessorHost = new EventProcessorHost( - hostName, - null, // Entity path will be picked from connection string. - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - var runResult = await RunGenericScenario(eventProcessorHost, processorOptions); - Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "First host: One of the partitions didn't return exactly 1 event"); - - // Allow sometime so that leases can expire. - await Task.Delay(60); - - runResult = await RunGenericScenario(eventProcessorHost); - Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "Second host: One of the partitions didn't return exactly 1 event"); - } - } - - [Fact] - [LiveTest] - [DisplayTestMethodName] - public async Task LargeHostName() - { - await using (var scope = await EventHubScope.CreateAsync(2)) - { - var longHostname = StringUtility.GetRandomString(100); - var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); - var eventProcessorHost = new EventProcessorHost( - longHostname, - null, // Entity path will be picked from connection string. - PartitionReceiver.DefaultConsumerGroupName, - connectionString, - TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); - - var epo = await GetOptionsAsync(connectionString); - await RunGenericScenario(eventProcessorHost, epo); - } - } - - private async Task>> DiscoverEndOfStream(string connectionString) + protected async Task>> DiscoverEndOfStream(string connectionString) { string[] PartitionIds = GetPartitionIds(connectionString); var ehClient = EventHubClient.CreateFromConnectionString(connectionString); @@ -1139,7 +37,7 @@ private async Task>> DiscoverEndOfStr return partitions.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); } - private async Task RunGenericScenario(EventProcessorHost eventProcessorHost, + protected async Task RunGenericScenario(EventProcessorHost eventProcessorHost, EventProcessorOptions epo = null, int numberOfEventsToSendPerPartition = 1, bool checkpointLastEvent = true, bool checkpointBatch = false, bool checkpoingEveryEvent = false) { @@ -1150,7 +48,7 @@ private async Task RunGenericScenario(EventProcessorHost { epo = new EventProcessorOptions { - ReceiveTimeout = TimeSpan.FromSeconds(15), + ReceiveTimeout = TimeSpan.FromSeconds(10), MaxBatchSize = 100 }; } @@ -1241,11 +139,12 @@ private async Task RunGenericScenario(EventProcessorHost return runResult; } - private async Task GetOptionsAsync(string connectionString) + protected async Task GetOptionsAsync(string connectionString) { var partitions = await DiscoverEndOfStream(connectionString); return new EventProcessorOptions { + ReceiveTimeout = TimeSpan.FromSeconds(10), MaxBatchSize = 100, InitialOffsetProvider = pId => EventPosition.FromOffset(partitions[pId].Item1) }; diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTests.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTests.cs new file mode 100644 index 000000000000..261f13a43ac9 --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTests.cs @@ -0,0 +1,1190 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.Azure.EventHubs.Tests.Processor +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.EventHubs.Primitives; + using Microsoft.Azure.EventHubs.Processor; + using Microsoft.IdentityModel.Clients.ActiveDirectory; + using Microsoft.WindowsAzure.Storage; + using Xunit; + + public class ProcessorTests : ProcessorTestBase + { + /// + /// Validating cases where entity path is provided through eventHubPath and EH connection string parameters + /// on the EPH constructor. + /// + [Fact] + [LiveTest] + [DisplayTestMethodName] + public void ProcessorHostEntityPathSetting() + { + var connectionString = TestUtility.BuildEventHubsConnectionString("dimmyeventhubname"); + var csb = new EventHubsConnectionStringBuilder(connectionString) + { + EntityPath = "myeh" + }; + + // Entity path provided in the connection string. + TestUtility.Log("Testing condition: Entity path provided in the connection string only."); + var eventProcessorHost = new EventProcessorHost( + null, + PartitionReceiver.DefaultConsumerGroupName, + csb.ToString(), + TestUtility.StorageConnectionString, + "dimmyeventhubname".ToLower()); + Assert.Equal("myeh", eventProcessorHost.EventHubPath); + + // Entity path provided in the eventHubPath parameter. + TestUtility.Log("Testing condition: Entity path provided in the eventHubPath only."); + csb.EntityPath = null; + eventProcessorHost = new EventProcessorHost( + "myeh2", + PartitionReceiver.DefaultConsumerGroupName, + csb.ToString(), + TestUtility.StorageConnectionString, + "dimmyeventhubname".ToLower()); + Assert.Equal("myeh2", eventProcessorHost.EventHubPath); + + // The same entity path provided in both eventHubPath parameter and the connection string. + TestUtility.Log("Testing condition: The same entity path provided in the eventHubPath and connection string."); + csb.EntityPath = "mYeH"; + eventProcessorHost = new EventProcessorHost( + "myeh", + PartitionReceiver.DefaultConsumerGroupName, + csb.ToString(), + TestUtility.StorageConnectionString, + "dimmyeventhubname".ToLower()); + Assert.Equal("myeh", eventProcessorHost.EventHubPath); + + // Entity path not provided in both eventHubPath and the connection string. + TestUtility.Log("Testing condition: Entity path not provided in both eventHubPath and connection string."); + try + { + csb.EntityPath = null; + new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + csb.ToString(), + TestUtility.StorageConnectionString, + "dimmyeventhubname".ToLower()); + throw new Exception("Entity path wasn't provided and this new call was supposed to fail"); + } + catch (ArgumentException) + { + TestUtility.Log("Caught ArgumentException as expected."); + } + + // Entity path conflict. + TestUtility.Log("Testing condition: Entity path conflict."); + try + { + csb.EntityPath = "myeh"; + new EventProcessorHost( + "myeh2", + PartitionReceiver.DefaultConsumerGroupName, + csb.ToString(), + TestUtility.StorageConnectionString, + "dimmyeventhubname".ToLower()); + throw new Exception("Entity path values conflict and this new call was supposed to fail"); + } + catch (ArgumentException) + { + TestUtility.Log("Caught ArgumentException as expected."); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task SingleProcessorHost() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var eventProcessorHost = new EventProcessorHost( + null, // Entity path will be picked from connection string. + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var epo = await GetOptionsAsync(connectionString); + await RunGenericScenario(eventProcessorHost, epo); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task MultipleProcessorHosts() + { + await using (var scope = await EventHubScope.CreateAsync(3)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + string[] PartitionIds = GetPartitionIds(connectionString); + int hostCount = 3; + + TestUtility.Log($"Testing with {hostCount} EventProcessorHost instances"); + + // Prepare partition trackers. + var partitionReceiveEvents = new ConcurrentDictionary(); + foreach (var partitionId in PartitionIds) + { + partitionReceiveEvents[partitionId] = new AsyncAutoResetEvent(false); + } + + // Prepare host trackers. + var hostReceiveEvents = new ConcurrentDictionary(); + + var containerName = Guid.NewGuid().ToString(); + var hosts = new List(); + + try + { + for (int hostId = 0; hostId < hostCount; hostId++) + { + var thisHostName = $"host-{hostId}"; + hostReceiveEvents[thisHostName] = new AsyncAutoResetEvent(false); + + TestUtility.Log("Creating EventProcessorHost"); + var eventProcessorHost = new EventProcessorHost( + thisHostName, + string.Empty, // Passing empty as entity path here since path is already in EH connection string. + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + containerName); + hosts.Add(eventProcessorHost); + TestUtility.Log($"Calling RegisterEventProcessorAsync"); + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(10), + InvokeProcessorAfterReceiveTimeout = true, + MaxBatchSize = 100, + InitialOffsetProvider = pId => EventPosition.FromEnqueuedTime(DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(60))) + }; + + var processorFactory = new TestEventProcessorFactory(); + processorFactory.OnCreateProcessor += (f, createArgs) => + { + var processor = createArgs.Item2; + string partitionId = createArgs.Item1.PartitionId; + string hostName = createArgs.Item1.Owner; + processor.OnOpen += (_, partitionContext) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor opened"); + processor.OnClose += (_, closeArgs) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); + processor.OnProcessError += (_, errorArgs) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + processor.OnProcessEvents += (_, eventsArgs) => + { + int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; + if (eventCount > 0) + { + TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); + partitionReceiveEvents[partitionId].Set(); + hostReceiveEvents[hostName].Set(); + } + }; + }; + + await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); + } + + // Allow some time for each host to own at least 1 partition. + // Partition stealing logic balances partition ownership one at a time. + TestUtility.Log("Waiting for partition ownership to settle..."); + await Task.Delay(TimeSpan.FromSeconds(60)); + + TestUtility.Log("Sending an event to each partition"); + var ehClient = EventHubClient.CreateFromConnectionString(connectionString); + var sendTasks = new List(); + foreach (var partitionId in PartitionIds) + { + sendTasks.Add(TestUtility.SendToPartitionAsync(ehClient, partitionId, $"{partitionId} event.")); + } + await Task.WhenAll(sendTasks); + + TestUtility.Log("Verifying an event was received by each partition"); + foreach (var e in partitionReceiveEvents) + { + bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.True(ret, $"Partition {e.Key} didn't receive any message!"); + } + + TestUtility.Log("Verifying at least an event was received by each host"); + foreach (var e in hostReceiveEvents) + { + bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.True(ret, $"Host {e.Key} didn't receive any message!"); + } + } + finally + { + var shutdownTasks = new List(); + foreach (var host in hosts) + { + TestUtility.Log($"Host {host} Calling UnregisterEventProcessorAsync."); + shutdownTasks.Add(host.UnregisterEventProcessorAsync()); + } + + await Task.WhenAll(shutdownTasks); + } + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task WithBlobPrefix() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + string leaseContainerName = Guid.NewGuid().ToString(); + var epo = await GetOptionsAsync(connectionString); + + // Consume all messages with first host. + // Create host with 'firsthost' prefix. + var eventProcessorHostFirst = new EventProcessorHost( + "host1", + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName, + "firsthost"); + var runResult1 = await RunGenericScenario(eventProcessorHostFirst, epo); + + // Consume all messages with second host. + // Create host with 'secondhost' prefix. + // Although on the same lease container, this second host should receive exactly the same set of messages + // as the first host. + var eventProcessorHostSecond = new EventProcessorHost( + "host2", + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName, + "secondhost"); + var runResult2 = await RunGenericScenario(eventProcessorHostSecond, epo, numberOfEventsToSendPerPartition: 0); + + // Confirm that we are looking at 2 identical sets of messages in the end. + foreach (var kvp in runResult1.ReceivedEvents) + { + Assert.True(kvp.Value.Count() == runResult2.ReceivedEvents[kvp.Key].Count, + $"The sets of messages returned from first host and the second host are different for partition {kvp.Key}."); + } + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task InvokeAfterReceiveTimeoutTrue() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + string[] PartitionIds = GetPartitionIds(connectionString); + const int ReceiveTimeoutInSeconds = 10; + + TestUtility.Log("Testing EventProcessorHost with InvokeProcessorAfterReceiveTimeout=true"); + + var emptyBatchReceiveEvents = new ConcurrentDictionary(); + foreach (var partitionId in PartitionIds) + { + emptyBatchReceiveEvents[partitionId] = new AsyncAutoResetEvent(false); + } + + var eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(ReceiveTimeoutInSeconds), + InvokeProcessorAfterReceiveTimeout = true, + InitialOffsetProvider = pId => EventPosition.FromEnd() + }; + + var processorFactory = new TestEventProcessorFactory(); + processorFactory.OnCreateProcessor += (f, createArgs) => + { + var processor = createArgs.Item2; + string partitionId = createArgs.Item1.PartitionId; + processor.OnOpen += (_, partitionContext) => TestUtility.Log($"Partition {partitionId} TestEventProcessor opened"); + processor.OnProcessEvents += (_, eventsArgs) => + { + int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; + TestUtility.Log($"Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); + if (eventCount == 0) + { + var emptyBatchReceiveEvent = emptyBatchReceiveEvents[partitionId]; + emptyBatchReceiveEvent.Set(); + } + }; + }; + + await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); + try + { + TestUtility.Log("Waiting for each partition to receive an empty batch of events..."); + foreach (var partitionId in PartitionIds) + { + var emptyBatchReceiveEvent = emptyBatchReceiveEvents[partitionId]; + bool emptyBatchReceived = await emptyBatchReceiveEvent.WaitAsync(TimeSpan.FromSeconds(ReceiveTimeoutInSeconds * 2)); + Assert.True(emptyBatchReceived, $"Partition {partitionId} didn't receive an empty batch!"); + } + } + finally + { + TestUtility.Log("Calling UnregisterEventProcessorAsync"); + await eventProcessorHost.UnregisterEventProcessorAsync(); + } + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task InvokeAfterReceiveTimeoutFalse() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + const int ReceiveTimeoutInSeconds = 10; + + TestUtility.Log("Calling RegisterEventProcessorAsync with InvokeProcessorAfterReceiveTimeout=false"); + + var eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + scope.EventHubName.ToLower()); + + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(ReceiveTimeoutInSeconds), + InvokeProcessorAfterReceiveTimeout = false, + MaxBatchSize = 100 + }; + + var emptyBatchReceiveEvent = new AsyncAutoResetEvent(false); + var processorFactory = new TestEventProcessorFactory(); + processorFactory.OnCreateProcessor += (f, createArgs) => + { + var processor = createArgs.Item2; + string partitionId = createArgs.Item1.PartitionId; + processor.OnProcessEvents += (_, eventsArgs) => + { + int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; + TestUtility.Log($"Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); + if (eventCount == 0) + { + emptyBatchReceiveEvent.Set(); + } + }; + }; + + await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); + try + { + TestUtility.Log("Verifying no empty batches arrive..."); + bool waitSucceeded = await emptyBatchReceiveEvent.WaitAsync(TimeSpan.FromSeconds(ReceiveTimeoutInSeconds * 2)); + Assert.False(waitSucceeded, "No empty batch should have been received!"); + } + finally + { + TestUtility.Log("Calling UnregisterEventProcessorAsync"); + await eventProcessorHost.UnregisterEventProcessorAsync(); + } + } + } + + /// + /// This test requires a eventhub with consumer groups $Default and cgroup1. + /// + /// + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task NonDefaultConsumerGroup() + { + var consumerGroups = new[] + { + "notdefault" + }; + await using (var scope = await EventHubScope.CreateAsync(2, consumerGroups)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var epo = await GetOptionsAsync(connectionString); + var a = Guid.NewGuid().ToString(); + // Run on non-default consumer group + var eventProcessorHost = new EventProcessorHost( + null, // Entity path will be picked from connection string. + scope.ConsumerGroups[0], + connectionString, + TestUtility.StorageConnectionString, + a); + + await RunGenericScenario(eventProcessorHost, epo); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task InitialOffsetProviderWithDateTime() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + // Send and receive single message so we can find out enqueue date-time of the last message. + var partitions = await DiscoverEndOfStream(connectionString); + + // We will use last enqueued message's enqueue date-time so EPH will pick messages only after that point. + var lastEnqueueDateTime = partitions.Max(le => le.Value.Item2); + TestUtility.Log($"Last message enqueued at {lastEnqueueDateTime}"); + + // Use a randomly generated container name so that initial offset provider will be respected. + var eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(10), + InitialOffsetProvider = partitionId => EventPosition.FromEnqueuedTime(lastEnqueueDateTime), + MaxBatchSize = 100 + }; + + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); + + // We should have received only 1 event from each partition. + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task InitialOffsetProviderWithOffset() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + // Send and receive single message so we can find out offset of the last message. + var partitions = await DiscoverEndOfStream(connectionString); + TestUtility.Log("Discovered last event offsets on each partition as below:"); + foreach (var p in partitions) + { + TestUtility.Log($"Partition {p.Key}: {p.Value.Item1}"); + } + + // Use a randomly generated container name so that initial offset provider will be respected. + var eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(10), + InitialOffsetProvider = partitionId => EventPosition.FromOffset(partitions[partitionId].Item1), + MaxBatchSize = 100 + }; + + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); + + // We should have received only 1 event from each partition. + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task InitialOffsetProviderWithEndOfStream() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + // Use a randomly generated container name so that initial offset provider will be respected. + var eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(10), + InitialOffsetProvider = partitionId => EventPosition.FromEnd(), + MaxBatchSize = 100 + }; + + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); + + // We should have received only 1 event from each partition. + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task InitialOffsetProviderOverrideBehavior() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + // Generate a new lease container name that will be used through out the test. + string leaseContainerName = Guid.NewGuid().ToString(); + TestUtility.Log($"Using lease container {leaseContainerName}"); + + var epo = await GetOptionsAsync(connectionString); + + // First host will send and receive as usual. + var eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName); + await this.RunGenericScenario(eventProcessorHost, epo); + + // Second host will use an initial offset provider. + // Since we are still on the same lease container, initial offset provider shouldn't rule. + // We should continue receiving where we left instead if start-of-stream where initial offset provider dictates. + eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName); + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(10), + InitialOffsetProvider = partitionId => EventPosition.FromStart(), + MaxBatchSize = 100 + }; + + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions, checkpointLastEvent: false); + + // We should have received only 1 event from each partition. + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task CheckpointEventDataShouldHold() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + // Generate a new lease container name that will use through out the test. + string leaseContainerName = Guid.NewGuid().ToString(); + + var epo = await GetOptionsAsync(connectionString); + + // Consume all messages with first host. + var eventProcessorHostFirst = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName); + await RunGenericScenario(eventProcessorHostFirst, epo); + + // For the second time we initiate a host and this time it should pick from where the previous host left. + // In other words, it shouldn't start receiving from start of the stream. + var eventProcessorHostSecond = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName); + var runResult = await RunGenericScenario(eventProcessorHostSecond); + + // We should have received only 1 event from each partition. + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task CheckpointBatchShouldHold() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + // Generate a new lease container name that will use through out the test. + string leaseContainerName = Guid.NewGuid().ToString(); + + var epo = await GetOptionsAsync(connectionString); + + // Consume all messages with first host. + var eventProcessorHostFirst = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName); + await RunGenericScenario(eventProcessorHostFirst, epo, checkpointLastEvent: false, checkpointBatch: true); + + // For the second time we initiate a host and this time it should pick from where the previous host left. + // In other words, it shouldn't start receiving from start of the stream. + var eventProcessorHostSecond = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName); + var runResult = await RunGenericScenario(eventProcessorHostSecond, epo); + + // We should have received only 1 event from each partition. + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task HostShouldRecoverAfterReceiverDisconnection() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + string[] PartitionIds = GetPartitionIds(connectionString); + // We will target one partition and do validation on it. + var targetPartition = PartitionIds.First(); + int targetPartitionOpens = 0; + int targetPartitionCloses = 0; + int targetPartitionErrors = 0; + PartitionReceiver externalReceiver = null; + + var eventProcessorHost = new EventProcessorHost( + "ephhost", + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + try + { + var processorFactory = new TestEventProcessorFactory(); + + processorFactory.OnCreateProcessor += (f, createArgs) => + { + var processor = createArgs.Item2; + string partitionId = createArgs.Item1.PartitionId; + string hostName = createArgs.Item1.Owner; + processor.OnOpen += (_, partitionContext) => + { + TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor opened"); + if (partitionId == targetPartition) + { + Interlocked.Increment(ref targetPartitionOpens); + } + }; + processor.OnClose += (_, closeArgs) => + { + TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); + if (partitionId == targetPartition && closeArgs.Item2 == CloseReason.Shutdown) + { + Interlocked.Increment(ref targetPartitionCloses); + } + }; + processor.OnProcessError += (_, errorArgs) => + { + TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + if (partitionId == targetPartition && errorArgs.Item2 is ReceiverDisconnectedException) + { + Interlocked.Increment(ref targetPartitionErrors); + } + }; + }; + + var epo = EventProcessorOptions.DefaultOptions; + epo.ReceiveTimeout = TimeSpan.FromSeconds(10); + await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo); + + // Wait 15 seconds then create a new epoch receiver. + // This will trigger ReceiverDisconnectedExcetion in the host. + await Task.Delay(15000); + + TestUtility.Log("Creating a new receiver with epoch 2. This will trigger ReceiverDisconnectedException in the host."); + var ehClient = EventHubClient.CreateFromConnectionString(connectionString); + externalReceiver = ehClient.CreateEpochReceiver(PartitionReceiver.DefaultConsumerGroupName, + targetPartition, EventPosition.FromStart(), 2); + await externalReceiver.ReceiveAsync(100, TimeSpan.FromSeconds(5)); + + // Give another 1 minute for host to recover then do the validations. + await Task.Delay(60000); + + TestUtility.Log("Verifying that host was able to receive ReceiverDisconnectedException"); + Assert.True(targetPartitionErrors == 1, $"Host received {targetPartitionErrors} ReceiverDisconnectedExceptions!"); + + TestUtility.Log("Verifying that host was able to reopen the partition"); + Assert.True(targetPartitionOpens == 2, $"Host opened target partition {targetPartitionOpens} times!"); + + TestUtility.Log("Verifying that host notified by close"); + Assert.True(targetPartitionCloses == 1, $"Host closed target partition {targetPartitionCloses} times!"); + } + finally + { + TestUtility.Log("Calling UnregisterEventProcessorAsync"); + await eventProcessorHost.UnregisterEventProcessorAsync(); + + if (externalReceiver != null) + { + await externalReceiver.CloseAsync(); + } + } + } + } + + /// + /// If a host doesn't checkpoint on the processed events and shuts down, new host should start processing from the beginning. + /// + /// + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task NoCheckpointThenNewHostReadsFromStart() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + // Generate a new lease container name that will be used through out the test. + string leaseContainerName = Guid.NewGuid().ToString(); + + var epo = await GetOptionsAsync(connectionString); + + // Consume all messages with first host. + var eventProcessorHostFirst = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName); + var runResult1 = await RunGenericScenario(eventProcessorHostFirst, epo, checkpointLastEvent: false); + var totalEventsFromFirstHost = runResult1.ReceivedEvents.Sum(part => part.Value.Count); + + // Second time we initiate a host, it should receive exactly the same number of evets. + var eventProcessorHostSecond = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + leaseContainerName); + var runResult2 = await RunGenericScenario(eventProcessorHostSecond, epo, 0); + var totalEventsFromSecondHost = runResult2.ReceivedEvents.Sum(part => part.Value.Count); + + // Second host should have received the same number of events as the first host. + Assert.True(totalEventsFromFirstHost == totalEventsFromSecondHost, + $"Second host received {totalEventsFromSecondHost} events where as first host receive {totalEventsFromFirstHost} events."); + } + } + + /// + /// Checkpointing every message received should be Ok. No failures expected. + /// + /// + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task CheckpointEveryMessageReceived() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var epo = await GetOptionsAsync(connectionString); + + var eventProcessorHost = new EventProcessorHost( + null, // Entity path will be picked from connection string. + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var runResult = await RunGenericScenario(eventProcessorHost, epo, numberOfEventsToSendPerPartition: 10, + checkpointLastEvent: false, checkpoingEveryEvent: true); + + // Validate there were not failures. + Assert.True(runResult.NumberOfFailures == 0, $"RunResult returned with {runResult.NumberOfFailures} failures!"); + } + } + + /// + /// While processing events one event causes a failure. Host should be able to recover any error. + /// + /// + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task HostShouldRecoverWhenProcessEventsAsyncThrows() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var client = EventHubClient.CreateFromConnectionString(connectionString); + var eventHubInfo = client.GetRuntimeInformationAsync().WaitAndUnwrapException(); + string[] PartitionIds = eventHubInfo.PartitionIds; + var lastReceivedAt = DateTime.Now; + var lastReceivedAtLock = new object(); + var poisonMessageReceived = false; + var poisonMessageProperty = "poison"; + var processorFactory = new TestEventProcessorFactory(); + var receivedEventCounts = new ConcurrentDictionary(); + + var eventProcessorHost = new EventProcessorHost( + null, // Entity path will be picked from connection string. + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + processorFactory.OnCreateProcessor += (f, createArgs) => + { + var processor = createArgs.Item2; + string partitionId = createArgs.Item1.PartitionId; + string hostName = createArgs.Item1.Owner; + string consumerGroupName = createArgs.Item1.ConsumerGroupName; + processor.OnOpen += (_, partitionContext) => TestUtility.Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor opened"); + processor.OnClose += (_, closeArgs) => TestUtility.Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); + processor.OnProcessError += (_, errorArgs) => + { + TestUtility.Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + + // Throw once more here depending on where we are at exception sequence. + if (errorArgs.Item2.Message.Contains("ExceptionSequence1")) + { + throw new Exception("ExceptionSequence2"); + } + }; + processor.OnProcessEvents += (_, eventsArgs) => + { + int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; + TestUtility.Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); + if (eventCount > 0) + { + lock (lastReceivedAtLock) + { + lastReceivedAt = DateTime.Now; + } + + foreach (var e in eventsArgs.Item2.events) + { + // If this is poisoned event then throw. + if (!poisonMessageReceived && e.Properties.ContainsKey(poisonMessageProperty)) + { + poisonMessageReceived = true; + TestUtility.Log($"Received poisoned message from partition {partitionId}"); + throw new Exception("ExceptionSequence1"); + } + + // Track received events so we can validate at the end. + if (!receivedEventCounts.ContainsKey(partitionId)) + { + receivedEventCounts[partitionId] = 0; + } + + receivedEventCounts[partitionId]++; + } + } + }; + }; + + try + { + TestUtility.Log("Registering processorFactory..."); + var epo = new EventProcessorOptions() + { + MaxBatchSize = 100, + InitialOffsetProvider = pId => EventPosition.FromEnqueuedTime(DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(60))) + }; + await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo); + + TestUtility.Log("Waiting for partition ownership to settle..."); + await Task.Delay(TimeSpan.FromSeconds(5)); + + + + // Send first set of messages. + TestUtility.Log("Sending an event to each partition as the first set of messages."); + var sendTasks = new List(); + foreach (var partitionId in PartitionIds) + { + sendTasks.Add(TestUtility.SendToPartitionAsync(client, partitionId, $"{partitionId} event.")); + } + await Task.WhenAll(sendTasks); + + // Now send 1 poisoned message. This will fail one of the partition pumps. + TestUtility.Log($"Sending a poison event to partition {PartitionIds.First()}"); + var pSender = client.CreatePartitionSender(PartitionIds.First()); + var ed = new EventData(Encoding.UTF8.GetBytes("This is poison message")); + ed.Properties[poisonMessageProperty] = true; + await pSender.SendAsync(ed); + + // Wait sometime. The host should fail and then recever during this time. + await Task.Delay(30000); + + // Send second set of messages. + TestUtility.Log("Sending an event to each partition as the second set of messages."); + sendTasks.Clear(); + foreach (var partitionId in PartitionIds) + { + sendTasks.Add(TestUtility.SendToPartitionAsync(client, partitionId, $"{partitionId} event.")); + } + await Task.WhenAll(sendTasks); + + TestUtility.Log("Waiting until hosts are idle, i.e. no more messages to receive."); + while (lastReceivedAt > DateTime.Now.AddSeconds(-60)) + { + await Task.Delay(1000); + } + + TestUtility.Log("Verifying poison message was received"); + Assert.True(poisonMessageReceived, "Didn't receive poison message!"); + + TestUtility.Log("Verifying received events by each partition"); + foreach (var partitionId in PartitionIds) + { + if (!receivedEventCounts.ContainsKey(partitionId)) + { + throw new Exception($"Partition {partitionId} didn't receive any messages!"); + } + + var receivedEventCount = receivedEventCounts[partitionId]; + Assert.True(receivedEventCount >= 2, $"Partition {partitionId} received {receivedEventCount} where as at least 2 expected!"); + } + } + finally + { + TestUtility.Log("Calling UnregisterEventProcessorAsync."); + await eventProcessorHost.UnregisterEventProcessorAsync(); + } + } + } + + /// + /// This test is for manual only purpose. Fill in the tenant-id, app-id and app-secret before running. + /// + [Fact(Skip = "Manual run only")] + [LiveTest] + [DisplayTestMethodName] + public async Task SingleProcessorHostWithAadTokenProvider() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var appAuthority = ""; + var aadAppId = ""; + var aadAppSecret = ""; + + AzureActiveDirectoryTokenProvider.AuthenticationCallback authCallback = + async (audience, authority, state) => + { + var authContext = new AuthenticationContext(authority); + var cc = new ClientCredential(aadAppId, aadAppSecret); + var authResult = await authContext.AcquireTokenAsync(audience, cc); + return authResult.AccessToken; + }; + + var tokenProvider = TokenProvider.CreateAzureActiveDirectoryTokenProvider(authCallback, appAuthority); + var epo = await GetOptionsAsync(connectionString); + var csb = new EventHubsConnectionStringBuilder(connectionString); + + var eventProcessorHost = new EventProcessorHost( + csb.Endpoint, + csb.EntityPath, + PartitionReceiver.DefaultConsumerGroupName, + tokenProvider, + CloudStorageAccount.Parse(TestUtility.StorageConnectionString), + Guid.NewGuid().ToString()); + + await RunGenericScenario(eventProcessorHost, epo); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task ReRegister() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var eventProcessorHost = new EventProcessorHost( + null, // Entity path will be picked from connection string. + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + // Calling register for the first time should succeed. + TestUtility.Log("Registering EventProcessorHost for the first time."); + await eventProcessorHost.RegisterEventProcessorAsync(); + + // Unregister event processor should succed + TestUtility.Log("Registering EventProcessorHost for the first time."); + await eventProcessorHost.UnregisterEventProcessorAsync(); + + var epo = await GetOptionsAsync(connectionString); + + // Run a generic scenario with TestEventProcessor instead + await RunGenericScenario(eventProcessorHost, epo); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task ReRegisterAfterLeaseExpiry() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var hostName = Guid.NewGuid().ToString(); + + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(10), + InitialOffsetProvider = pId => EventPosition.FromEnd() + }; + + var eventProcessorHost = new EventProcessorHost( + hostName, + null, // Entity path will be picked from connection string. + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var runResult = await RunGenericScenario(eventProcessorHost, processorOptions); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "First host: One of the partitions didn't return exactly 1 event"); + + // Allow sometime so that leases can expire. + await Task.Delay(60); + + runResult = await RunGenericScenario(eventProcessorHost); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "Second host: One of the partitions didn't return exactly 1 event"); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task LargeHostName() + { + await using (var scope = await EventHubScope.CreateAsync(2)) + { + var longHostname = StringUtility.GetRandomString(100); + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var eventProcessorHost = new EventProcessorHost( + longHostname, + null, // Entity path will be picked from connection string. + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var epo = await GetOptionsAsync(connectionString); + await RunGenericScenario(eventProcessorHost, epo); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task DontCheckpointStartOfStream() + { + await using (var scope = await EventHubScope.CreateAsync(1)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + + // Use a randomly generated container name so that initial offset provider will be respected. + var eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(10), + InitialOffsetProvider = partitionId => EventPosition.FromEnd(), + InvokeProcessorAfterReceiveTimeout = false + }; + + var processorFactory = new TestEventProcessorFactory(); + processorFactory.OnCreateProcessor += (f, createArgs) => + { + var processor = createArgs.Item2; + string partitionId = createArgs.Item1.PartitionId; + string hostName = createArgs.Item1.Owner; + processor.OnOpen += (_, partitionContext) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor opened"); + processor.OnClose += (_, closeArgs) => + { + TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); + + // Checkpoint at close. + closeArgs.Item1.CheckpointAsync().GetAwaiter().GetResult(); + }; + + processor.OnProcessError += (_, errorArgs) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + processor.OnProcessEvents += (_, eventsArgs) => + { + int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; + if (eventCount > 0) + { + TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); + } + }; + }; + + await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); + + // Wait 30 seconds and then unregister. Host will checkpoint on close w/o receiving any events. + // This checkpoint attempt should be omitted. + await Task.Delay(TimeSpan.FromSeconds(30)); + await eventProcessorHost.UnregisterEventProcessorAsync(); + + // Now send a single message. This message won't be received by next host. + var ehClient = EventHubClient.CreateFromConnectionString(connectionString); + await ehClient.SendAsync(new EventData(Encoding.UTF8.GetBytes("Hello EventHub!"))); + await Task.Delay(TimeSpan.FromSeconds(10)); + + var runResult = await RunGenericScenario(eventProcessorHost, processorOptions, numberOfEventsToSendPerPartition: 1); + + // Confirm that we received just 1 event. + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "Didn't receive exactly 1 event."); + } + } + } +} +