diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpEventDataSender.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpEventDataSender.cs index ae4f26f76afdc..a352a86106720 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpEventDataSender.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpEventDataSender.cs @@ -86,7 +86,7 @@ protected override async Task OnSendAsync(IEnumerable eventDatas, str { // Evaluate retry condition? TimeSpan? retryInterval = this.RetryPolicy.GetNextRetryInterval(ex, timeoutHelper.RemainingTime(), ++retryCount); - if (retryInterval != null && !this.EventHubClient.CloseCalled) + if (retryInterval != null && !this.EventHubClient.IsClosed) { await Task.Delay(retryInterval.Value).ConfigureAwait(false); shouldRetry = true; diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpPartitionReceiver.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpPartitionReceiver.cs index 51d2c3a607d2e..7963d6795072f 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpPartitionReceiver.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/src/Amqp/AmqpPartitionReceiver.cs @@ -104,7 +104,7 @@ await this.ReceiveLinkManager.GetOrCreateAsync( { // Evaluate retry condition? TimeSpan? retryInterval = this.RetryPolicy.GetNextRetryInterval(ex, timeoutHelper.RemainingTime(), ++retryCount); - if (retryInterval != null && !this.EventHubClient.CloseCalled) + if (retryInterval != null && !this.EventHubClient.IsClosed) { await Task.Delay(retryInterval.Value).ConfigureAwait(false); shouldRetry = true; diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/src/EventHubClient.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/src/EventHubClient.cs index b8d5300bd73cf..ac9f0fa235125 100755 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/src/EventHubClient.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/src/EventHubClient.cs @@ -19,7 +19,6 @@ namespace Microsoft.Azure.EventHubs public abstract class EventHubClient : ClientEntity { readonly Lazy innerSender; - bool closeCalled = false; internal EventHubClient(EventHubsConnectionStringBuilder csb) : base($"{nameof(EventHubClient)}{ClientEntity.GetNextId()}({csb.EntityPath})") @@ -448,8 +447,6 @@ public EventDataBatch CreateBatch(BatchOptions options) /// public IWebProxy WebProxy { get; set; } - internal bool CloseCalled => this.closeCalled; - internal EventDataSender CreateEventSender(string partitionId = null) { return this.OnCreateEventSender(partitionId); diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/src/PartitionReceiver.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/src/PartitionReceiver.cs index 8e217b41441d9..186ef011014ab 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/src/PartitionReceiver.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/src/PartitionReceiver.cs @@ -224,6 +224,8 @@ public void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invo /// An asynchronous operation public sealed override Task CloseAsync() { + this.closeCalled = true; + EventHubsEventSource.Log.ClientCloseStart(this.ClientId); try { diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/src/PartitionSender.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/src/PartitionSender.cs index ddc8944210c08..9698905e3901d 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/src/PartitionSender.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/src/PartitionSender.cs @@ -179,6 +179,8 @@ public async Task SendAsync(EventDataBatch eventDataBatch) /// An asynchronous operation public override async Task CloseAsync() { + this.closeCalled = true; + EventHubsEventSource.Log.ClientCloseStart(this.ClientId); try { diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/src/Primitives/ClientEntity.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/src/Primitives/ClientEntity.cs index 5ade18d018db5..679ce17c84219 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/src/Primitives/ClientEntity.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/src/Primitives/ClientEntity.cs @@ -20,6 +20,9 @@ public abstract class ClientEntity static int nextId; RetryPolicy retryPolicy; + /// + protected bool closeCalled = false; + /// /// protected ClientEntity(string clientId) @@ -61,6 +64,12 @@ public RetryPolicy RetryPolicy /// The asynchronous operation public abstract Task CloseAsync(); + /// + /// Returns a boolean representing whether client object is closed or not. + /// + /// Returns . + public bool IsClosed => this.closeCalled; + /// /// Registers a to be used with this client. /// diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Client/MiscTests.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Client/MiscTests.cs index 6991d4773338e..07a1d24c47029 100755 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Client/MiscTests.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Client/MiscTests.cs @@ -125,5 +125,75 @@ public async Task SendAndReceiveLargeMessage() } } } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task ClosingSenderEntity() + { + await using (var scope = await EventHubScope.CreateAsync(1)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var ehClient = EventHubClient.CreateFromConnectionString(connectionString); + var ehSender = ehClient.CreatePartitionSender("0"); + + await ehSender.CloseAsync(); + Assert.True(ehSender.IsClosed, "ehSender.IsClosed is not true."); + Assert.True(!ehClient.IsClosed, "ehClient.IsClosed is not false."); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task ClosingReceiverEntity() + { + await using (var scope = await EventHubScope.CreateAsync(1)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var ehClient = EventHubClient.CreateFromConnectionString(connectionString); + var ehReceiver = ehClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, "0", EventPosition.FromStart()); + + await ehReceiver.CloseAsync(); + Assert.True(ehReceiver.IsClosed, "ehReceiver.IsClosed is not true."); + Assert.True(!ehClient.IsClosed, "ehClient.IsClosed is not false."); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task ClosingEventHubClientClosesSenderEntities() + { + await using (var scope = await EventHubScope.CreateAsync(1)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var ehClient = EventHubClient.CreateFromConnectionString(connectionString); + var ehSender0 = ehClient.CreatePartitionSender("0"); + var ehSender1 = ehClient.CreatePartitionSender("1"); + + await ehClient.CloseAsync(); + Assert.True(ehSender0.IsClosed, "ehSender0.IsClosed is not true."); + Assert.True(ehSender1.IsClosed, "ehSender1.IsClosed is not true."); + } + } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task ClosingEventHubClientClosesReceiverEntities() + { + await using (var scope = await EventHubScope.CreateAsync(1)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + var ehClient = EventHubClient.CreateFromConnectionString(connectionString); + var ehReceiver = ehClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, "0", EventPosition.FromStart()); + var ehReceiverEpoch = ehClient.CreateEpochReceiver(PartitionReceiver.DefaultConsumerGroupName, "0", EventPosition.FromEnd(), 0); + + await ehClient.CloseAsync(); + Assert.True(ehReceiver.IsClosed, "ehReceiver.IsClosed is not true."); + Assert.True(ehReceiverEpoch.IsClosed, "ehReceiverEpoch.IsClosed is not true."); + } + } } } \ No newline at end of file