Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose client closed flag publicly #7365

Merged
merged 12 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected override async Task OnSendAsync(IEnumerable<EventData> eventDatas, str
{
// Evaluate retry condition?
TimeSpan? retryInterval = this.RetryPolicy.GetNextRetryInterval(ex, timeoutHelper.RemainingTime(), ++retryCount);
if (retryInterval != null && !this.EventHubClient.CloseCalled)
if (retryInterval != null && !this.EventHubClient.IsClosed)
{
await Task.Delay(retryInterval.Value).ConfigureAwait(false);
shouldRetry = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected override async Task<IList<EventData>> OnReceiveAsync(int maxMessageCou
{
// Evaluate retry condition?
TimeSpan? retryInterval = this.RetryPolicy.GetNextRetryInterval(ex, timeoutHelper.RemainingTime(), ++retryCount);
if (retryInterval != null && !this.EventHubClient.CloseCalled)
if (retryInterval != null && !this.EventHubClient.IsClosed)
{
await Task.Delay(retryInterval.Value).ConfigureAwait(false);
shouldRetry = true;
Expand Down
3 changes: 0 additions & 3 deletions sdk/eventhub/Microsoft.Azure.EventHubs/src/EventHubClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace Microsoft.Azure.EventHubs
public abstract class EventHubClient : ClientEntity
{
readonly Lazy<EventDataSender> innerSender;
bool closeCalled = false;

internal EventHubClient(EventHubsConnectionStringBuilder csb)
: base($"{nameof(EventHubClient)}{ClientEntity.GetNextId()}({csb.EntityPath})")
Expand Down Expand Up @@ -448,8 +447,6 @@ public EventDataBatch CreateBatch(BatchOptions options)
/// </summary>
public IWebProxy WebProxy { get; set; }

internal bool CloseCalled => this.closeCalled;

internal EventDataSender CreateEventSender(string partitionId = null)
{
return this.OnCreateEventSender(partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public void SetReceiveHandler(IPartitionReceiveHandler receiveHandler, bool invo
/// <returns>An asynchronous operation</returns>
public sealed override Task CloseAsync()
{
this.closeCalled = true;

EventHubsEventSource.Log.ClientCloseStart(this.ClientId);
try
{
Expand Down
2 changes: 2 additions & 0 deletions sdk/eventhub/Microsoft.Azure.EventHubs/src/PartitionSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ public async Task SendAsync(EventDataBatch eventDataBatch)
/// <returns>An asynchronous operation</returns>
public override async Task CloseAsync()
{
this.closeCalled = true;

EventHubsEventSource.Log.ClientCloseStart(this.ClientId);
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public abstract class ClientEntity
static int nextId;
RetryPolicy retryPolicy;

/// <summary />
protected bool closeCalled = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this field protected and named this way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, missed to notice your comment. I see your point. I will remove it and set the property setter as protected instead in another PR.


/// <summary></summary>
/// <param name="clientId"></param>
protected ClientEntity(string clientId)
Expand Down Expand Up @@ -61,6 +64,12 @@ public RetryPolicy RetryPolicy
/// <returns>The asynchronous operation</returns>
public abstract Task CloseAsync();

/// <summary>
/// Returns a boolean representing whether client object is closed or not.
/// </summary>
/// <value>Returns <see cref="System.Boolean" />.</value>
public bool IsClosed => this.closeCalled;

/// <summary>
/// Registers a <see cref="EventHubsPlugin"/> to be used with this client.
/// </summary>
Expand Down
70 changes: 70 additions & 0 deletions sdk/eventhub/Microsoft.Azure.EventHubs/tests/Client/MiscTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,75 @@ await using (var scope = await EventHubScope.CreateAsync(1))
}
}
}

[Fact]
[LiveTest]
[DisplayTestMethodName]
public async Task ClosingSenderEntity()
{
await using (var scope = await EventHubScope.CreateAsync(1))
{
var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName);
var ehClient = EventHubClient.CreateFromConnectionString(connectionString);
var ehSender = ehClient.CreatePartitionSender("0");

await ehSender.CloseAsync();
Assert.True(ehSender.IsClosed, "ehSender.IsClosed is not true.");
Assert.True(!ehClient.IsClosed, "ehClient.IsClosed is not false.");
}
}

[Fact]
[LiveTest]
[DisplayTestMethodName]
public async Task ClosingReceiverEntity()
{
await using (var scope = await EventHubScope.CreateAsync(1))
{
var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName);
var ehClient = EventHubClient.CreateFromConnectionString(connectionString);
var ehReceiver = ehClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, "0", EventPosition.FromStart());

await ehReceiver.CloseAsync();
Assert.True(ehReceiver.IsClosed, "ehReceiver.IsClosed is not true.");
Assert.True(!ehClient.IsClosed, "ehClient.IsClosed is not false.");
}
}

[Fact]
[LiveTest]
[DisplayTestMethodName]
public async Task ClosingEventHubClientClosesSenderEntities()
{
await using (var scope = await EventHubScope.CreateAsync(1))
{
var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName);
var ehClient = EventHubClient.CreateFromConnectionString(connectionString);
var ehSender0 = ehClient.CreatePartitionSender("0");
var ehSender1 = ehClient.CreatePartitionSender("1");

await ehClient.CloseAsync();
Assert.True(ehSender0.IsClosed, "ehSender0.IsClosed is not true.");
Assert.True(ehSender1.IsClosed, "ehSender1.IsClosed is not true.");
}
}

[Fact]
[LiveTest]
[DisplayTestMethodName]
public async Task ClosingEventHubClientClosesReceiverEntities()
{
await using (var scope = await EventHubScope.CreateAsync(1))
{
var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName);
var ehClient = EventHubClient.CreateFromConnectionString(connectionString);
var ehReceiver = ehClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, "0", EventPosition.FromStart());
var ehReceiverEpoch = ehClient.CreateEpochReceiver(PartitionReceiver.DefaultConsumerGroupName, "0", EventPosition.FromEnd(), 0);

await ehClient.CloseAsync();
Assert.True(ehReceiver.IsClosed, "ehReceiver.IsClosed is not true.");
Assert.True(ehReceiverEpoch.IsClosed, "ehReceiverEpoch.IsClosed is not true.");
}
}
}
}