Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit max number of expired leases acquired #11062

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -166,12 +166,12 @@ public Task DeleteCheckpointAsync(string partitionId)

public Task<bool> LeaseStoreExistsAsync()
{
return this.eventHubContainer.ExistsAsync(this.defaultRequestOptions, this.operationContext);
return this.eventHubContainer.ExistsAsync(null, this.operationContext);
}

public Task<bool> CreateLeaseStoreIfNotExistsAsync()
{
return this.eventHubContainer.CreateIfNotExistsAsync(this.defaultRequestOptions, this.operationContext);
return this.eventHubContainer.CreateIfNotExistsAsync(null, this.operationContext);
}

public async Task<bool> DeleteLeaseStoreAsync()
Expand Down
Expand Up @@ -90,6 +90,10 @@ async Task RunAsync()
{
await this.RunLoopAsync(this.cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (TaskCanceledException) when (this.cancellationTokenSource.IsCancellationRequested)
{
// Expected during host shutdown.
}
catch (Exception e)
{
// Ideally RunLoop should never throw.
Expand Down Expand Up @@ -249,7 +253,7 @@ async Task RunAsync()
try
{
allLeases[subjectLease.PartitionId] = subjectLease;
if (subjectLease.Owner == this.host.HostName)
if (subjectLease.Owner == this.host.HostName && !(await subjectLease.IsExpired().ConfigureAwait(false)))
{
ourLeaseCount++;

Expand Down Expand Up @@ -307,54 +311,7 @@ async Task RunAsync()
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished.");

// Check any expired leases that we can grab here.
var expiredLeaseTasks = new List<Task>();
foreach (var possibleLease in allLeases.Values)
{
var subjectLease = possibleLease;

if (await subjectLease.IsExpired().ConfigureAwait(false))
{
expiredLeaseTasks.Add(Task.Run(async () =>
{
try
{
// Get fresh content of lease subject to acquire.
var downloadedLease = await leaseManager.GetLeaseAsync(subjectLease.PartitionId).ConfigureAwait(false);
allLeases[subjectLease.PartitionId] = downloadedLease;

// Check expired once more here incase another host have already leased this since we populated the list.
if (await downloadedLease.IsExpired().ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Trying to acquire lease.");
if (await leaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Acquired lease.");
leasesOwnedByOthers.TryRemove(downloadedLease.PartitionId, out var removedLease);
Interlocked.Increment(ref ourLeaseCount);
}
else
{
// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[subjectLease.PartitionId].Owner = null;

ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName,
"Failed to acquire lease for partition " + downloadedLease.PartitionId, null);
}
}
}
catch (Exception e)
{
ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, subjectLease.PartitionId, "Failure during acquiring lease", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases);

// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[subjectLease.PartitionId].Owner = null;
}
}, cancellationToken));
}
}

await Task.WhenAll(expiredLeaseTasks).ConfigureAwait(false);
ourLeaseCount += await this.AcquireExpiredLeasesAsync(allLeases, leasesOwnedByOthers, ourLeaseCount, cancellationToken);
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Expired lease check is finished.");

// Grab more leases if available and needed for load balancing
Expand Down Expand Up @@ -476,6 +433,88 @@ async Task RunAsync()
}
}

async Task<int> AcquireExpiredLeasesAsync(
ConcurrentDictionary<string, Lease> allLeases,
ConcurrentDictionary<string, Lease> leasesOwnedByOthers,
int ownedCount,
CancellationToken cancellationToken)
{
var totalAcquired = 0;
var hosts = new List<string>();

// Find distinct hosts actively owning leases.
foreach (var lease in allLeases.Values)
{
if (lease.Owner != null && !hosts.Contains(lease.Owner) && !(await lease.IsExpired().ConfigureAwait(false)))
{
hosts.Add(lease.Owner);
}
}

// Calculate how many more leases we can own.
var hostCount = hosts.Count() == 0 ? 1 : hosts.Count();
var targetLeaseCount = (int)Math.Ceiling((double)allLeases.Count / (double)hostCount) - ownedCount;

// Attempt to acquire expired leases now up to allowed target lease count.
var tasks = new List<Task>();
foreach (var possibleLease in allLeases.Values)
{
// Break if we already acquired enough number of leases.
if (targetLeaseCount <= 0)
{
break;
}

var subjectLease = possibleLease;

if (await subjectLease.IsExpired().ConfigureAwait(false))
{
targetLeaseCount--;
tasks.Add(Task.Run(async () =>
{
try
{
// Get fresh content of lease subject to acquire.
var downloadedLease = await this.host.LeaseManager.GetLeaseAsync(subjectLease.PartitionId).ConfigureAwait(false);
allLeases[subjectLease.PartitionId] = downloadedLease;

// Check expired once more here incase another host have already leased this since we populated the list.
if (await downloadedLease.IsExpired().ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Trying to acquire lease.");
if (await this.host.LeaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Acquired lease.");
leasesOwnedByOthers.TryRemove(downloadedLease.PartitionId, out var removedLease);
Interlocked.Increment(ref totalAcquired);
}
else
{
// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[subjectLease.PartitionId].Owner = null;

ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName,
"Failed to acquire lease for partition " + downloadedLease.PartitionId, null);
}
}
}
catch (Exception e)
{
ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, subjectLease.PartitionId, "Failure during acquiring lease", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases);

// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[subjectLease.PartitionId].Owner = null;
}
}, cancellationToken));
}
}

await Task.WhenAll(tasks).ConfigureAwait(false);

return totalAcquired;
}

async Task CheckAndAddPumpAsync(string partitionId, Lease lease)
{
PartitionPump capturedPump;
Expand Down
Expand Up @@ -94,6 +94,23 @@ public async Task CloseAsync(CloseReason reason)
{
ProcessorEventSource.Log.PartitionPumpCloseStart(this.Host.HostName, this.PartitionContext.PartitionId, reason.ToString());
this.PumpStatus = PartitionPumpStatus.Closing;

// Release lease as the first thing since closing receiver can take up to operation timeout.
// This helps other available hosts discover lease available sooner.
if (reason != CloseReason.LeaseLost)
{
// Since this pump is dead, release the lease.
try
{
await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false);
}
catch (Exception e)
{
// Log and ignore any failure since expired lease will be picked by another host.
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease);
}
}

try
{
this.cancellationTokenSource.Cancel();
Expand Down Expand Up @@ -121,20 +138,6 @@ public async Task CloseAsync(CloseReason reason)
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, "Closing Event Processor");
}

if (reason != CloseReason.LeaseLost)
{
// Since this pump is dead, release the lease.
try
{
await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false);
}
catch (Exception e)
{
// Log and ignore any failure since expired lease will be picked by another host.
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease);
}
}

this.PumpStatus = PartitionPumpStatus.Closed;
ProcessorEventSource.Log.PartitionPumpCloseStop(this.Host.HostName, this.PartitionContext.PartitionId);
}
Expand Down
Expand Up @@ -549,19 +549,19 @@ await using (var scope = await EventHubScope.CreateAsync(2))
try
{
TestUtility.Log("Starting nonepoch receiver");
await nonEpochReceiver.ReceiveAsync(10);
await nonEpochReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10));

await Task.Delay(TimeSpan.FromSeconds(10));

TestUtility.Log("Starting epoch receiver");
await epochReceiver.ReceiveAsync(10);
await epochReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10));

await Task.Delay(TimeSpan.FromSeconds(10));

try
{
TestUtility.Log("Restarting nonepoch receiver, this should fail");
await nonEpochReceiver.ReceiveAsync(10);
await nonEpochReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10));
throw new InvalidOperationException("Non-Epoch receiver should have encountered an exception by now!");
}
catch (ReceiverDisconnectedException ex) when (ex.Message.Contains("non-epoch receiver is not allowed"))
Expand Down Expand Up @@ -640,15 +640,15 @@ await using (var scope = await EventHubScope.CreateAsync(2))
});

// Issue a receive call so link will become active.
await newReceiver.ReceiveAsync(10);
await newReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10));
receivers.Add(newReceiver);
}

try
{
// Attempt to create 6th receiver. This should fail.
var failReceiver = ehClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, "1", EventPosition.FromStart());
await failReceiver.ReceiveAsync(10);
await failReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(10));
throw new InvalidOperationException("6th receiver should have encountered QuotaExceededException.");
}
catch (QuotaExceededException ex)
Expand Down
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.EventHubs.Tests.Processor

public class NegativeCases : ProcessorTestBase
{

[Fact]
[LiveTest]
[DisplayTestMethodName]
Expand Down