Skip to content

Commit

Permalink
Improve shutdown order
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed May 14, 2024
1 parent e3dc117 commit 186d654
Showing 1 changed file with 43 additions and 24 deletions.
67 changes: 43 additions & 24 deletions src/Orleans.Streaming/QueueBalancer/LeaseBasedQueueBalancer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,21 @@ void StartMaintenanceTasks()
public override async Task Shutdown()
{
if (Cancellation.IsCancellationRequested) return;
_myQueues.Clear();
_responsibility = 0;

// Stop acquiring and renewing leases.
_leaseMaintenanceTimer.Dispose();
_leaseAcquisitionTimer.Dispose();
await Task.WhenAll(_leaseMaintenanceTimerTask, _leaseAcquisitionTimerTask);

// Release all owned leases.
var shutdownTask = _executor.AddNext(async () =>
{
_responsibility = 0;
await ReleaseLeasesToMeetResponsibility();
});

// release all owned leases
var releaseTask = _executor.AddNext(ReleaseLeasesToMeetResponsibility);
await Task.WhenAll(base.Shutdown(), releaseTask, _leaseMaintenanceTimerTask, _leaseAcquisitionTimerTask);
// Signal shutdown.
await base.Shutdown();
}

/// <inheritdoc/>
Expand All @@ -121,12 +128,11 @@ public override IEnumerable<QueueId> GetMyQueues()
private async Task PeriodicallyMaintainLeases()
{
await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding);
Func<Task> maintainLeases = MaintainLeases;
while (await _leaseMaintenanceTimer.WaitForNextTickAsync())
{
try
{
await _executor.AddNext(maintainLeases);
await _executor.AddNext(MaintainLeases);
}
catch (Exception ex)
{
Expand All @@ -142,8 +148,8 @@ async Task MaintainLeases()
{
bool allLeasesRenewed = await RenewLeases();

// If we lost some leases during renew after leaseAcquisitionTimer stopped, restart it
if (!allLeasesRenewed && !Cancellation.IsCancellationRequested)
// If we lost some leases during renew after leaseAcquisitionTimer stopped, restart it.
if (!allLeasesRenewed)
{
// Make the acquisition timer fire immediately.
_leaseAcquisitionTimer.Period = TimeSpan.Zero;
Expand All @@ -162,7 +168,7 @@ private async Task PeriodicallyAcquireLeasesToMeetResponsibility()
while (await _leaseAcquisitionTimer.WaitForNextTickAsync())
{
// Set the period for the next round.
// It may be mutated by another method.
// It may be mutated by another method, but not concurrently.
_leaseAcquisitionTimer.Period = _options.LeaseAcquisitionPeriod;

try
Expand Down Expand Up @@ -216,15 +222,14 @@ private async Task ReleaseLeasesToMeetResponsibility()
return;
}

// Remove oldest acquired queues first, this provides max recovery time for the queues
// being moved.
// TODO: Consider making this behavior configurable/pluggable - jbragg
// Remove oldest acquired queues first, this provides max recovery time for the queues being moved.
AcquiredLease[] queuesToGiveUp = _myQueues
.OrderBy(queue => queue.LeaseOrder)
.Take(queueCountToRelease)
.Select(queue => queue.AcquiredLease)
.ToArray();
// Remove queues from list even if release fails, since we can let the lease expire

// Remove queues from list even if release fails, since we can let the lease expire.
// TODO: mark for removal instead so we don't renew, and only remove leases that have not expired. - jbragg
for (int index = _myQueues.Count - 1; index >= 0; index--)
{
Expand All @@ -236,7 +241,7 @@ private async Task ReleaseLeasesToMeetResponsibility()

await _leaseProvider.Release(_options.LeaseCategory, queuesToGiveUp);

// remove queuesToGiveUp from myQueue list after the balancer released the leases on them
// Remove queuesToGiveUp from myQueue list after the balancer released the leases on them.
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("Released leases for {QueueCount} queues. Holding leases for {QueueCount} of an expected {MinQueueCount} queues.", queueCountToRelease, _myQueues.Count, _responsibility);
Expand All @@ -261,7 +266,12 @@ private async Task AcquireLeasesToMeetExpectation(int expectedTotalLeaseCount, T
var possibleLeaseCount = _queueSelector.Count - _myQueues.Count;
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("Holding leased for {QueueCount} queues. Trying to acquire {acquireQueueCount} queues to reach {TargetQueueCount} of a possible {PossibleLeaseCount}", _myQueues.Count, leasesToAcquire, expectedTotalLeaseCount, possibleLeaseCount);
Logger.LogDebug(
"Holding leased for {QueueCount} queues. Trying to acquire {acquireQueueCount} queues to reach {TargetQueueCount} of a possible {PossibleLeaseCount}",
_myQueues.Count,
leasesToAcquire,
expectedTotalLeaseCount,
possibleLeaseCount);
}

// Try to acquire leases until we have no more to acquire or no more possible
Expand Down Expand Up @@ -320,7 +330,12 @@ private async Task AcquireLeasesToMeetExpectation(int expectedTotalLeaseCount, T
leasesToAcquire = expectedTotalLeaseCount - _myQueues.Count;
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("Holding leased for {QueueCount} queues. Trying to acquire {acquireQueueCount} queues to reach {TargetQueueCount} of a possible {PossibleLeaseCount} lease", _myQueues.Count, leasesToAcquire, expectedTotalLeaseCount, possibleLeaseCount);
Logger.LogDebug(
"Holding leased for {QueueCount} queues. Trying to acquire {acquireQueueCount} queues to reach {TargetQueueCount} of a possible {PossibleLeaseCount} lease",
_myQueues.Count,
leasesToAcquire,
expectedTotalLeaseCount,
possibleLeaseCount);
}

if (sw.Elapsed > timeout)
Expand Down Expand Up @@ -405,7 +420,8 @@ private Task NotifyOnChange(HashSet<QueueId> oldQueues)
{
if (Cancellation.IsCancellationRequested) return Task.CompletedTask;
var newQueues = new HashSet<QueueId>(_myQueues.Select(queue => queue.QueueId));
//if queue changed, notify listeners

// If queue changed, notify listeners.
return !oldQueues.SetEquals(newQueues)
? NotifyListeners()
: Task.CompletedTask;
Expand Down Expand Up @@ -440,12 +456,10 @@ private async Task ScheduleUpdateResponsibilities(HashSet<SiloAddress> activeSil
/// <param name="activeSilos">number of active silos hosting queues</param>
/// <returns>bool - true indicates that the balancer should try to acquire one
/// more queue than the non-greedy balancers</returns>
private bool AmGreedy(int overflow, HashSet<SiloAddress> activeSilos)
private bool ShouldBeGreedy(int overflow, HashSet<SiloAddress> activeSilos)
{
// If using multiple stream providers, this will select the same silos to be greedy for
// all providers, aggravating imbalance as stream provider count increases.
// TODO: consider making this behavior configurable/pluggable - jbragg
// TODO: use heap? - jbragg
return activeSilos.OrderBy(silo => silo)
.Take(overflow)
.Contains(SiloAddress);
Expand All @@ -457,19 +471,24 @@ private async Task UpdateResponsibilities(HashSet<SiloAddress> activeSilos)
var activeSiloCount = Math.Max(1, activeSilos.Count);
_responsibility = _allQueuesCount / activeSiloCount;
var overflow = _allQueuesCount % activeSiloCount;
if (overflow != 0 && AmGreedy(overflow, activeSilos))
if (overflow != 0 && ShouldBeGreedy(overflow, activeSilos))
{
_responsibility++;
}

if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("Updating Responsibilities for {QueueCount} queue over {SiloCount} silos. Need {MinQueueCount} queues, have {MyQueueCount}",
_allQueuesCount, activeSiloCount, _responsibility, _myQueues.Count);
Logger.LogDebug(
"Updating Responsibilities for {QueueCount} queue over {SiloCount} silos. Need {MinQueueCount} queues, have {MyQueueCount}",
_allQueuesCount,
activeSiloCount,
_responsibility,
_myQueues.Count);
}

if (_myQueues.Count < _responsibility && _leaseAcquisitionTimer.Period == Timeout.InfiniteTimeSpan)
{
// Ensure the acquisition timer is running.
_leaseAcquisitionTimer.Period = _options.LeaseAcquisitionPeriod;
}

Expand Down

0 comments on commit 186d654

Please sign in to comment.