Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed May 21, 2024
1 parent c5cc8e9 commit 169db61
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,7 @@ public sealed class ActiveRebalancingOptions
/// In order to preserve memory, the most heaviest links are recorded in a probabilistic way, so there is an inherent error associated with that.
/// That error is inversely proportional to this value, so values under 100 are not recommended. If you notice that the system is not converging fast enough, do consider increasing this number.
/// </remarks>
public uint MaxEdgeCount { get; set; } =



10 *



DEFAULT_MAX_EDGE_COUNT;
public uint MaxEdgeCount { get; set; } = DEFAULT_MAX_EDGE_COUNT;

/// <summary>
/// The default value of <see cref="MaxEdgeCount"/>.
Expand Down
20 changes: 11 additions & 9 deletions src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ public async ValueTask<AcceptExchangeResponse> AcceptExchangeRequest(AcceptExcha
var remoteActivations = request.ActivationCountSnapshot;
var localActivations = GetLocalActivationCount();

var imbalance = CalculateImbalance(remoteActivations, localActivations);
var initialImbalance = CalculateImbalance(remoteActivations, localActivations);
int imbalance = initialImbalance;
_logger.LogInformation("Imbalance is {Imbalance} (remote: {RemoteCount} vs local {LocalCount})", imbalance, remoteActivations, localActivations);

var (localHeap, remoteHeap) = CreateCandidateHeaps(localSet, remoteSet);
Expand Down Expand Up @@ -318,22 +319,23 @@ bool TryMigrateRemoteToLocal()

bool TryMigrateCore(MaxHeap<CandidateVertexHeapElement> sourceHeap, int localDelta, int remoteDelta, [NotNullWhen(true)] out CandidateVertexHeapElement? chosenVertex)
{
chosenVertex = null;
if (sourceHeap.Count == 0)
var anticipatedImbalance = CalculateImbalance(localActivations + localDelta, remoteActivations + remoteDelta);
if (anticipatedImbalance >= initialImbalance && !_toleranceRule.IsSatisfiedBy((uint)anticipatedImbalance))
{
// Taking from this heap would not improve imbalance.
chosenVertex = null;
return false;
}

var anticipatedImbalance = CalculateImbalance(localActivations + localDelta, remoteActivations + remoteDelta);
if (anticipatedImbalance > imbalance && !_toleranceRule.IsSatisfiedBy((uint)anticipatedImbalance))
if (!sourceHeap.TryPop(out chosenVertex))
{
// Heap is empty.
return false;
}

chosenVertex = sourceHeap.Pop();
if (chosenVertex.AccumulatedTransferScore <= 0)
if (chosenVertex.AccumulatedTransferScore < 0)
{
// If it got affected by a previous run, and the score is not positive, simply pop and ignore it.
// If it got affected by a previous run, and the score is negative, simply pop and ignore it.
return false;
}

Expand All @@ -355,7 +357,7 @@ bool TryMigrateCore(MaxHeap<CandidateVertexHeapElement> sourceHeap, int localDel
}
}

private static int CalculateImbalance(int left, int right) => Math.Abs(Math.Abs(left) - Math.Abs(right));
private static int CalculateImbalance(int left, int right) => (int)Math.Abs(Math.Abs((long)left) - Math.Abs((long)right));
private static (MaxHeap<CandidateVertexHeapElement> Local, MaxHeap<CandidateVertexHeapElement> Remote) CreateCandidateHeaps(List<CandidateVertex> local, ImmutableArray<CandidateVertex> remote)
{
Dictionary<GrainId, CandidateVertex> sourceIndex = new(local.Count + remote.Length);
Expand Down
12 changes: 12 additions & 0 deletions src/Orleans.Runtime/Placement/Rebalancing/MaxHeap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ public TElement Peek()
return _nodes[0]!;
}

public bool TryPop([NotNullWhen(true)] out TElement value)
{
if (_size > 0)
{
value = Pop();
return true;
}

value = default!;
return false;
}

/// <summary>
/// Removes and returns the maximal element from the <see cref="MaxHeap{TElement}"/>.
/// </summary>
Expand Down

0 comments on commit 169db61

Please sign in to comment.