Skip to content

Commit

Permalink
ReubenBond - refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Apr 9, 2024
1 parent f392763 commit 46cebe5
Show file tree
Hide file tree
Showing 36 changed files with 1,043 additions and 1,048 deletions.
2 changes: 1 addition & 1 deletion src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public static void AddDefaultServices(IClientBuilder builder)
services.AddSingleton<SharedMemoryPool>();

// Networking
services.TryAddSingleton<IActiveRebalancerGateway, NoOpActiveRebalancerGateway>();
services.TryAddSingleton<IMessageSink, NoOpActiveRebalancerGateway>();
services.TryAddSingleton<ConnectionCommon>();
services.TryAddSingleton<ConnectionManager>();
services.TryAddSingleton<ConnectionPreambleHelper>();
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Networking/ClientOutboundConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal sealed class ClientOutboundConnection : Connection
ConnectionCommon connectionShared,
ConnectionPreambleHelper connectionPreambleHelper,
ClusterOptions clusterOptions,
IActiveRebalancerGateway rebalancerGateway)
IMessageSink rebalancerGateway)
: base(rebalancerGateway, connection, middleware, connectionShared)
{
this.messageCenter = messageCenter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal sealed class ClientOutboundConnectionFactory : ConnectionFactory
private readonly ClientConnectionOptions clientConnectionOptions;
private readonly ClusterOptions clusterOptions;
private readonly ConnectionPreambleHelper connectionPreambleHelper;
private readonly IActiveRebalancerGateway rebalancerGateway;
private readonly IMessageSink rebalancerGateway;
private readonly object initializationLock = new object();
private volatile bool isInitialized;
private ClientMessageCenter messageCenter;
Expand All @@ -24,7 +24,7 @@ internal sealed class ClientOutboundConnectionFactory : ConnectionFactory
IOptions<ConnectionOptions> connectionOptions,
IOptions<ClientConnectionOptions> clientConnectionOptions,
IOptions<ClusterOptions> clusterOptions,
IActiveRebalancerGateway rebalancerGateway,
IMessageSink rebalancerGateway,
ConnectionCommon connectionShared,
ConnectionPreambleHelper connectionPreambleHelper)
: base(connectionShared.ServiceProvider.GetRequiredKeyedService<IConnectionFactory>(ServicesKey), connectionShared.ServiceProvider, connectionOptions)
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Core/Networking/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal abstract class Connection

private static readonly ObjectPool<MessageHandler> MessageHandlerPool = ObjectPool.Create(new MessageHandlerPoolPolicy());
private readonly ConnectionCommon shared;
private readonly IActiveRebalancerGateway _rebalancerGateway;
private readonly IMessageSink _rebalancerGateway;
private readonly ConnectionDelegate middleware;
private readonly Channel<Message> outgoingMessages;
private readonly ChannelWriter<Message> outgoingMessageWriter;
Expand All @@ -43,7 +43,7 @@ internal abstract class Connection
private Task _closeTask;

protected Connection(
IActiveRebalancerGateway rebalancerGateway,
IMessageSink rebalancerGateway,
ConnectionContext connection,
ConnectionDelegate middleware,
ConnectionCommon shared)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
using System.Threading.Tasks;
using System.Collections.Immutable;
using Orleans.Runtime;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System;

namespace Orleans.Placement.Rebalancing;

[Alias("IActivationRebalancerSystemTarget")]
internal interface IActivationRebalancerSystemTarget : ISystemTarget
{
static IActivationRebalancerSystemTarget GetReference(IGrainFactory grainFactory, SiloAddress targetSilo)
=> grainFactory.GetGrain<IActivationRebalancerSystemTarget>(SystemTargetGrainId.Create(Constants.ActivationRebalancerType, targetSilo).GrainId);

ValueTask TriggerExchangeRequest();

ValueTask<AcceptExchangeResponse> AcceptExchangeRequest(AcceptExchangeRequest request);

/// <summary>
/// For use in testing only!
/// </summary>
ValueTask ResetCounters();
}

// We use a readonly struct so that we can fully decouple the message-passing and potentially modifications to the Silo fields.
/// <summary>
/// Data structure representing a 'communication edge' between a source and target.
/// </summary>
[GenerateSerializer, Immutable, DebuggerDisplay("Source: [{Source.Id} - {Source.Silo}] | Target: [{Target.Id} - {Target.Silo}]")]
internal readonly struct Edge(EdgeVertex source, EdgeVertex target) : IEquatable<Edge>
{
[Id(0)]
public EdgeVertex Source { get; } = source;

[Id(1)]
public EdgeVertex Target { get; } = target;

public static bool operator ==(Edge left, Edge right) => left.Equals(right);
public static bool operator !=(Edge left, Edge right) => !left.Equals(right);

public override bool Equals([NotNullWhen(true)] object obj) => obj is Edge other && Equals(other);
public bool Equals(Edge other) => Source == other.Source && Target == other.Target;

public override int GetHashCode() => HashCode.Combine(Source, Target);
}

/// <summary>
/// Data structure representing one side of a <see cref="Edge"/>.
/// </summary>
[GenerateSerializer, Immutable]
public readonly struct EdgeVertex(
GrainId id,
SiloAddress silo,
bool isMigratable) : IEquatable<EdgeVertex>
{
[Id(0)]
public readonly GrainId Id = id;

[Id(1)]
public readonly SiloAddress Silo = silo;

[Id(2)]
public readonly bool IsMigratable = isMigratable;

public static bool operator ==(EdgeVertex left, EdgeVertex right) => left.Equals(right);
public static bool operator !=(EdgeVertex left, EdgeVertex right) => !left.Equals(right);

public override bool Equals([NotNullWhen(true)] object obj) => obj is EdgeVertex other && Equals(other);
public bool Equals(EdgeVertex other) => Id == other.Id && Silo == other.Silo && IsMigratable == other.IsMigratable;

public override int GetHashCode() => HashCode.Combine(Id, Silo, IsMigratable);
}

/// <summary>
/// A candidate vertex to be transferred to another silo.
/// </summary>
[GenerateSerializer, DebuggerDisplay("Id = {Id} | Accumulated = {AccumulatedTransferScore}")]
internal sealed class CandidateVertex
{
/// <summary>
/// The id of the candidate grain.
/// </summary>
[Id(0), Immutable]
public GrainId Id { get; init; }

/// <summary>
/// The cost reduction expected from migrating the vertex with <see cref="Id"/> to another silo.
/// </summary>
[Id(1)]
public long AccumulatedTransferScore { get; set; }

/// <summary>
/// These are all the vertices connected to the vertex with <see cref="Id"/>.
/// </summary>
/// <remarks>These will be important when this vertex is removed from the max-sorted heap on the receiver silo.</remarks>
[Id(2), Immutable]
public ImmutableArray<CandidateConnectedVertex> ConnectedVertices { get; init; } = [];
}

[GenerateSerializer, Immutable]
public readonly struct CandidateConnectedVertex(GrainId id, long transferScore)
{
public readonly GrainId Id { get; } = id;
public readonly long TransferScore { get; } = transferScore;

public static bool operator ==(CandidateConnectedVertex left, CandidateConnectedVertex right) => left.Equals(right);
public static bool operator !=(CandidateConnectedVertex left, CandidateConnectedVertex right) => !left.Equals(right);

public override bool Equals([NotNullWhen(true)] object obj) => obj is CandidateConnectedVertex other && Equals(other);
public bool Equals(CandidateConnectedVertex other) => Id == other.Id && TransferScore == other.TransferScore;

public override int GetHashCode() => HashCode.Combine(Id, TransferScore);
}

[GenerateSerializer, Immutable]
internal sealed class AcceptExchangeRequest(SiloAddress sendingSilo, ImmutableArray<CandidateVertex> exchangeSet, int activationCountSnapshot)
{
[Id(0)]
public SiloAddress SendingSilo { get; } = sendingSilo;

[Id(1)]
public ImmutableArray<CandidateVertex> ExchangeSet { get; } = exchangeSet;

[Id(2)]
public int ActivationCountSnapshot { get; } = activationCountSnapshot;
}

[GenerateSerializer, Immutable]
internal sealed class AcceptExchangeResponse(AcceptExchangeResponse.ResponseType type, ImmutableArray<GrainId> exchangeSet)
{
public static readonly AcceptExchangeResponse CachedExchangedRecently = new(ResponseType.ExchangedRecently, []);
public static readonly AcceptExchangeResponse CachedMutualExchangeAttempt = new(ResponseType.MutualExchangeAttempt, []);

[Id(0)]
public ResponseType Type { get; } = type;

[Id(1)]
public ImmutableArray<GrainId> ExchangeSet { get; } = exchangeSet;

[GenerateSerializer]
public enum ResponseType
{
/// <summary>
/// The exchange was accepted and an exchange set is returned.
/// </summary>
Success = 0,

/// <summary>
/// The other silo has been recently involved in another exchange.
/// </summary>
ExchangedRecently = 1,

/// <summary>
/// An attempt to do an exchange between this and the other silo was about to happen at the same time.
/// </summary>
MutualExchangeAttempt = 2
}
}
91 changes: 0 additions & 91 deletions src/Orleans.Core/Placement/Rebalancing/IActiveRebalancerGrain.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ public interface IImbalanceToleranceRule
/// Checks if this rule is statisfied by <paramref name="imbalance"/>.
/// </summary>
/// <param name="imbalance">The imbalance between the exchanging silo pair that will be, if this method were to return <see langword="true"/></param>
bool IsStatisfiedBy(uint imbalance);
bool IsSatisfiedBy(uint imbalance);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

namespace Orleans.Placement.Rebalancing;

internal interface IActiveRebalancerGateway
internal interface IMessageSink
{
void RecordMessage(Message message);
}

internal sealed class NoOpActiveRebalancerGateway : IActiveRebalancerGateway
internal sealed class NoOpActiveRebalancerGateway : IMessageSink
{
public void RecordMessage(Message message) { }
}
1 change: 1 addition & 0 deletions src/Orleans.Core/Runtime/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ internal static class Constants
public static readonly GrainType StreamPullingAgentType = SystemTargetGrainId.CreateGrainType("stream.agent");
public static readonly GrainType ManifestProviderType = SystemTargetGrainId.CreateGrainType("manifest");
public static readonly GrainType ActivationMigratorType = SystemTargetGrainId.CreateGrainType("migrator");
public static readonly GrainType ActivationRebalancerType = SystemTargetGrainId.CreateGrainType("rebalancer");

public static readonly GrainId SiloDirectConnectionId = GrainId.Create(
GrainType.Create(GrainTypePrefix.SystemPrefix + "silo"),
Expand Down
Loading

0 comments on commit 46cebe5

Please sign in to comment.