From e50a28a5f59b148038dd740ff5776871b563e71c Mon Sep 17 00:00:00 2001 From: ReubenBond Date: Mon, 8 Apr 2024 19:40:23 -0700 Subject: [PATCH] Refactoring 2 --- .../Core/DefaultClientServices.cs | 2 +- .../Networking/ClientOutboundConnection.cs | 5 ++- .../ClientOutboundConnectionFactory.cs | 6 +--- src/Orleans.Core/Networking/Connection.cs | 6 ++-- .../Networking/ConnectionShared.cs | 31 ++++++++----------- .../Rebalancing/IImbalanceToleranceRule.cs | 2 +- ...ssageSink.cs => IMessageStatisticsSink.cs} | 4 +-- .../Hosting/ActiveRebalancingExtensions.cs | 2 +- .../Hosting/DefaultSiloServices.cs | 2 +- .../Messaging/MessageCenter.cs | 10 +++--- .../Networking/GatewayConnectionListener.cs | 7 +---- .../Networking/GatewayInboundConnection.cs | 5 ++- .../Networking/SiloConnection.cs | 5 ++- .../Networking/SiloConnectionFactory.cs | 7 +---- .../Networking/SiloConnectionListener.cs | 7 +---- .../ActivationRebalancer.MessageSink.cs | 2 +- 16 files changed, 37 insertions(+), 66 deletions(-) rename src/Orleans.Core/Placement/Rebalancing/{IMessageSink.cs => IMessageStatisticsSink.cs} (59%) diff --git a/src/Orleans.Core/Core/DefaultClientServices.cs b/src/Orleans.Core/Core/DefaultClientServices.cs index 1c05084dfd..d0c995dd07 100644 --- a/src/Orleans.Core/Core/DefaultClientServices.cs +++ b/src/Orleans.Core/Core/DefaultClientServices.cs @@ -111,7 +111,7 @@ public static void AddDefaultServices(IClientBuilder builder) services.AddSingleton(); // Networking - services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); diff --git a/src/Orleans.Core/Networking/ClientOutboundConnection.cs b/src/Orleans.Core/Networking/ClientOutboundConnection.cs index c04b60365e..d27555787a 100644 --- a/src/Orleans.Core/Networking/ClientOutboundConnection.cs +++ b/src/Orleans.Core/Networking/ClientOutboundConnection.cs @@ -27,9 +27,8 @@ internal sealed class ClientOutboundConnection : Connection ConnectionOptions connectionOptions, ConnectionCommon connectionShared, ConnectionPreambleHelper connectionPreambleHelper, - ClusterOptions clusterOptions, - IMessageSink rebalancerGateway) - : base(rebalancerGateway, connection, middleware, connectionShared) + ClusterOptions clusterOptions) + : base(connection, middleware, connectionShared) { this.messageCenter = messageCenter; this.connectionManager = connectionManager; diff --git a/src/Orleans.Core/Networking/ClientOutboundConnectionFactory.cs b/src/Orleans.Core/Networking/ClientOutboundConnectionFactory.cs index d6ecf1501c..038e75526a 100644 --- a/src/Orleans.Core/Networking/ClientOutboundConnectionFactory.cs +++ b/src/Orleans.Core/Networking/ClientOutboundConnectionFactory.cs @@ -14,7 +14,6 @@ internal sealed class ClientOutboundConnectionFactory : ConnectionFactory private readonly ClientConnectionOptions clientConnectionOptions; private readonly ClusterOptions clusterOptions; private readonly ConnectionPreambleHelper connectionPreambleHelper; - private readonly IMessageSink rebalancerGateway; private readonly object initializationLock = new object(); private volatile bool isInitialized; private ClientMessageCenter messageCenter; @@ -24,12 +23,10 @@ internal sealed class ClientOutboundConnectionFactory : ConnectionFactory IOptions connectionOptions, IOptions clientConnectionOptions, IOptions clusterOptions, - IMessageSink rebalancerGateway, ConnectionCommon connectionShared, ConnectionPreambleHelper connectionPreambleHelper) : base(connectionShared.ServiceProvider.GetRequiredKeyedService(ServicesKey), connectionShared.ServiceProvider, connectionOptions) { - this.rebalancerGateway = rebalancerGateway; this.connectionShared = connectionShared; this.clientConnectionOptions = clientConnectionOptions.Value; this.clusterOptions = clusterOptions.Value; @@ -49,8 +46,7 @@ protected override Connection CreateConnection(SiloAddress address, ConnectionCo this.ConnectionOptions, this.connectionShared, this.connectionPreambleHelper, - this.clusterOptions, - this.rebalancerGateway); + this.clusterOptions); } protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder) diff --git a/src/Orleans.Core/Networking/Connection.cs b/src/Orleans.Core/Networking/Connection.cs index 9ad4a0e0a5..d28da7748e 100644 --- a/src/Orleans.Core/Networking/Connection.cs +++ b/src/Orleans.Core/Networking/Connection.cs @@ -30,7 +30,6 @@ internal abstract class Connection private static readonly ObjectPool MessageHandlerPool = ObjectPool.Create(new MessageHandlerPoolPolicy()); private readonly ConnectionCommon shared; - private readonly IMessageSink _rebalancerGateway; private readonly ConnectionDelegate middleware; private readonly Channel outgoingMessages; private readonly ChannelWriter outgoingMessageWriter; @@ -43,12 +42,10 @@ internal abstract class Connection private Task _closeTask; protected Connection( - IMessageSink rebalancerGateway, ConnectionContext connection, ConnectionDelegate middleware, ConnectionCommon shared) { - _rebalancerGateway = rebalancerGateway ?? throw new ArgumentNullException(nameof(rebalancerGateway)); this.Context = connection ?? throw new ArgumentNullException(nameof(connection)); this.middleware = middleware ?? throw new ArgumentNullException(nameof(middleware)); this.shared = shared; @@ -356,6 +353,7 @@ private async Task ProcessOutgoing() Exception error = default; var serializer = this.shared.ServiceProvider.GetRequiredService(); + var messageStatisticsSink = this.shared.MessageStatisticsSink; try { var output = this._transport.Output; @@ -377,7 +375,7 @@ private async Task ProcessOutgoing() inflight.Add(message); var (headerLength, bodyLength) = serializer.Write(output, message); RecordMessageSend(message, headerLength + bodyLength, headerLength); - _rebalancerGateway.RecordMessage(message); + messageStatisticsSink.RecordMessage(message); message = null; } } diff --git a/src/Orleans.Core/Networking/ConnectionShared.cs b/src/Orleans.Core/Networking/ConnectionShared.cs index a6b21f76cc..fde60cf471 100644 --- a/src/Orleans.Core/Networking/ConnectionShared.cs +++ b/src/Orleans.Core/Networking/ConnectionShared.cs @@ -1,24 +1,19 @@ -using System; +using System; +using Orleans.Placement.Rebalancing; namespace Orleans.Runtime.Messaging { - internal sealed class ConnectionCommon + internal sealed class ConnectionCommon( + IServiceProvider serviceProvider, + MessageFactory messageFactory, + MessagingTrace messagingTrace, + NetworkingTrace networkingTrace, + IMessageStatisticsSink messageStatisticsSink) { - public ConnectionCommon( - IServiceProvider serviceProvider, - MessageFactory messageFactory, - MessagingTrace messagingTrace, - NetworkingTrace networkingTrace) - { - this.ServiceProvider = serviceProvider; - this.MessageFactory = messageFactory; - this.MessagingTrace = messagingTrace; - this.NetworkingTrace = networkingTrace; - } - - public MessageFactory MessageFactory { get; } - public IServiceProvider ServiceProvider { get; } - public NetworkingTrace NetworkingTrace { get; } - public MessagingTrace MessagingTrace { get; } + public MessageFactory MessageFactory { get; } = messageFactory; + public IServiceProvider ServiceProvider { get; } = serviceProvider; + public NetworkingTrace NetworkingTrace { get; } = networkingTrace; + public IMessageStatisticsSink MessageStatisticsSink { get; } = messageStatisticsSink; + public MessagingTrace MessagingTrace { get; } = messagingTrace; } } diff --git a/src/Orleans.Core/Placement/Rebalancing/IImbalanceToleranceRule.cs b/src/Orleans.Core/Placement/Rebalancing/IImbalanceToleranceRule.cs index 645489e471..195d8e2821 100644 --- a/src/Orleans.Core/Placement/Rebalancing/IImbalanceToleranceRule.cs +++ b/src/Orleans.Core/Placement/Rebalancing/IImbalanceToleranceRule.cs @@ -6,7 +6,7 @@ namespace Orleans.Placement.Rebalancing; public interface IImbalanceToleranceRule { /// - /// Checks if this rule is statisfied by . + /// Checks if this rule is satisfied by . /// /// The imbalance between the exchanging silo pair that will be, if this method were to return bool IsSatisfiedBy(uint imbalance); diff --git a/src/Orleans.Core/Placement/Rebalancing/IMessageSink.cs b/src/Orleans.Core/Placement/Rebalancing/IMessageStatisticsSink.cs similarity index 59% rename from src/Orleans.Core/Placement/Rebalancing/IMessageSink.cs rename to src/Orleans.Core/Placement/Rebalancing/IMessageStatisticsSink.cs index ba7f2bc92f..277d1287dd 100644 --- a/src/Orleans.Core/Placement/Rebalancing/IMessageSink.cs +++ b/src/Orleans.Core/Placement/Rebalancing/IMessageStatisticsSink.cs @@ -2,12 +2,12 @@ namespace Orleans.Placement.Rebalancing; -internal interface IMessageSink +internal interface IMessageStatisticsSink { void RecordMessage(Message message); } -internal sealed class NoOpActiveRebalancerGateway : IMessageSink +internal sealed class NoOpMessageStatisticsSink : IMessageStatisticsSink { public void RecordMessage(Message message) { } } \ No newline at end of file diff --git a/src/Orleans.Runtime/Hosting/ActiveRebalancingExtensions.cs b/src/Orleans.Runtime/Hosting/ActiveRebalancingExtensions.cs index 7faa0880dc..7cfd156976 100644 --- a/src/Orleans.Runtime/Hosting/ActiveRebalancingExtensions.cs +++ b/src/Orleans.Runtime/Hosting/ActiveRebalancingExtensions.cs @@ -40,7 +40,7 @@ public static ISiloBuilder AddActiveRebalancing(this ISiloBuilder builder) services.AddSingleton(); services.AddSingleton(); - services.AddFromExisting(); + services.AddFromExisting(); services.AddFromExisting, ActivationRebalancer>(); return services; diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs index 3a0edb2883..a4ed94cf48 100644 --- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs +++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs @@ -355,7 +355,7 @@ internal static void AddDefaultServices(ISiloBuilder builder) (sp, _) => sp.GetRequiredService()); // Networking - services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); diff --git a/src/Orleans.Runtime/Messaging/MessageCenter.cs b/src/Orleans.Runtime/Messaging/MessageCenter.cs index d2be403926..650cc0b04e 100644 --- a/src/Orleans.Runtime/Messaging/MessageCenter.cs +++ b/src/Orleans.Runtime/Messaging/MessageCenter.cs @@ -26,7 +26,7 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable private readonly SiloMessagingOptions messagingOptions; private readonly PlacementService placementService; private readonly GrainLocator _grainLocator; - private readonly IMessageSink _rebalancerGateway; + private readonly IMessageStatisticsSink _messageStatisticsSink; private readonly ILogger log; private readonly Catalog catalog; private bool stopped; @@ -45,7 +45,7 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable IOptions messagingOptions, PlacementService placementService, GrainLocator grainLocator, - IMessageSink rebalancerGateway) + IMessageStatisticsSink messageStatisticsSink) { this.catalog = catalog; this.messagingOptions = messagingOptions.Value; @@ -54,7 +54,7 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable this.messagingTrace = messagingTrace; this.placementService = placementService; _grainLocator = grainLocator; - _rebalancerGateway = rebalancerGateway; + _messageStatisticsSink = messageStatisticsSink; this.log = logger; this.messageFactory = messageFactory; this._siloAddress = siloDetails.SiloAddress; @@ -77,7 +77,7 @@ public bool TryDeliverToProxy(Message msg) if (this.Gateway is Gateway gateway && gateway.TryDeliverToProxy(msg) || this.hostedClient is HostedClient client && client.TryDispatchToClient(msg)) { - _rebalancerGateway.RecordMessage(msg); + _messageStatisticsSink.RecordMessage(msg); return true; } @@ -538,7 +538,7 @@ public void ReceiveMessage(Message msg) } targetActivation.ReceiveMessage(msg); - _rebalancerGateway.RecordMessage(msg); + _messageStatisticsSink.RecordMessage(msg); } } catch (Exception ex) diff --git a/src/Orleans.Runtime/Networking/GatewayConnectionListener.cs b/src/Orleans.Runtime/Networking/GatewayConnectionListener.cs index 14409c36f8..b3482b1d56 100644 --- a/src/Orleans.Runtime/Networking/GatewayConnectionListener.cs +++ b/src/Orleans.Runtime/Networking/GatewayConnectionListener.cs @@ -7,7 +7,6 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Configuration; -using Orleans.Placement.Rebalancing; namespace Orleans.Runtime.Messaging { @@ -15,7 +14,6 @@ internal sealed class GatewayConnectionListener : ConnectionListener, ILifecycle { internal static readonly object ServicesKey = new object(); private readonly ILocalSiloDetails localSiloDetails; - private readonly IMessageSink rebalancerGateway; private readonly MessageCenter messageCenter; private readonly ConnectionCommon connectionShared; private readonly ConnectionPreambleHelper connectionPreambleHelper; @@ -31,7 +29,6 @@ internal sealed class GatewayConnectionListener : ConnectionListener, ILifecycle IOptions siloConnectionOptions, OverloadDetector overloadDetector, ILocalSiloDetails localSiloDetails, - IMessageSink rebalancerGateway, IOptions endpointOptions, MessageCenter messageCenter, ConnectionManager connectionManager, @@ -44,7 +41,6 @@ internal sealed class GatewayConnectionListener : ConnectionListener, ILifecycle this.overloadDetector = overloadDetector; this.gateway = messageCenter.Gateway; this.localSiloDetails = localSiloDetails; - this.rebalancerGateway = rebalancerGateway; this.messageCenter = messageCenter; this.connectionShared = connectionShared; this.connectionPreambleHelper = connectionPreambleHelper; @@ -65,8 +61,7 @@ protected override Connection CreateConnection(ConnectionContext context) this.ConnectionOptions, this.messageCenter, this.connectionShared, - this.connectionPreambleHelper, - this.rebalancerGateway); + this.connectionPreambleHelper); } protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder) diff --git a/src/Orleans.Runtime/Networking/GatewayInboundConnection.cs b/src/Orleans.Runtime/Networking/GatewayInboundConnection.cs index 4245b1429c..a30572a2a1 100644 --- a/src/Orleans.Runtime/Networking/GatewayInboundConnection.cs +++ b/src/Orleans.Runtime/Networking/GatewayInboundConnection.cs @@ -28,9 +28,8 @@ internal sealed class GatewayInboundConnection : Connection ConnectionOptions connectionOptions, MessageCenter messageCenter, ConnectionCommon connectionShared, - ConnectionPreambleHelper connectionPreambleHelper, - IMessageSink rebalancerGateway) - : base(rebalancerGateway, connection, middleware, connectionShared) + ConnectionPreambleHelper connectionPreambleHelper) + : base(connection, middleware, connectionShared) { this.connectionOptions = connectionOptions; this.gateway = gateway; diff --git a/src/Orleans.Runtime/Networking/SiloConnection.cs b/src/Orleans.Runtime/Networking/SiloConnection.cs index 90fffe485a..9d6cd46ab8 100644 --- a/src/Orleans.Runtime/Networking/SiloConnection.cs +++ b/src/Orleans.Runtime/Networking/SiloConnection.cs @@ -33,9 +33,8 @@ internal sealed class SiloConnection : Connection ConnectionOptions connectionOptions, ConnectionCommon connectionShared, ProbeRequestMonitor probeMonitor, - ConnectionPreambleHelper connectionPreambleHelper, - IMessageSink rebalancerGateway) - : base(rebalancerGateway, connection, middleware, connectionShared) + ConnectionPreambleHelper connectionPreambleHelper) + : base(connection, middleware, connectionShared) { this.messageCenter = messageCenter; this.connectionManager = connectionManager; diff --git a/src/Orleans.Runtime/Networking/SiloConnectionFactory.cs b/src/Orleans.Runtime/Networking/SiloConnectionFactory.cs index c096f8ff11..456642894c 100644 --- a/src/Orleans.Runtime/Networking/SiloConnectionFactory.cs +++ b/src/Orleans.Runtime/Networking/SiloConnectionFactory.cs @@ -5,7 +5,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Orleans.Configuration; -using Orleans.Placement.Rebalancing; namespace Orleans.Runtime.Messaging { @@ -13,7 +12,6 @@ internal sealed class SiloConnectionFactory : ConnectionFactory { internal static readonly object ServicesKey = new object(); private readonly ILocalSiloDetails localSiloDetails; - private readonly IMessageSink rebalancerGateway; private readonly ConnectionCommon connectionShared; private readonly ProbeRequestMonitor probeRequestMonitor; private readonly ConnectionPreambleHelper connectionPreambleHelper; @@ -30,7 +28,6 @@ internal sealed class SiloConnectionFactory : ConnectionFactory IOptions connectionOptions, IOptions siloConnectionOptions, ILocalSiloDetails localSiloDetails, - IMessageSink rebalancerGateway, ConnectionCommon connectionShared, ProbeRequestMonitor probeRequestMonitor, ConnectionPreambleHelper connectionPreambleHelper) @@ -39,7 +36,6 @@ internal sealed class SiloConnectionFactory : ConnectionFactory this.serviceProvider = serviceProvider; this.siloConnectionOptions = siloConnectionOptions.Value; this.localSiloDetails = localSiloDetails; - this.rebalancerGateway = rebalancerGateway; this.connectionShared = connectionShared; this.probeRequestMonitor = probeRequestMonitor; this.connectionPreambleHelper = connectionPreambleHelper; @@ -71,8 +67,7 @@ protected override Connection CreateConnection(SiloAddress address, ConnectionCo this.ConnectionOptions, this.connectionShared, this.probeRequestMonitor, - this.connectionPreambleHelper, - this.rebalancerGateway); + this.connectionPreambleHelper); } protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder) diff --git a/src/Orleans.Runtime/Networking/SiloConnectionListener.cs b/src/Orleans.Runtime/Networking/SiloConnectionListener.cs index 6ebc207c6a..98ca50c3dc 100644 --- a/src/Orleans.Runtime/Networking/SiloConnectionListener.cs +++ b/src/Orleans.Runtime/Networking/SiloConnectionListener.cs @@ -6,7 +6,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Orleans.Configuration; -using Orleans.Placement.Rebalancing; namespace Orleans.Runtime.Messaging { @@ -14,7 +13,6 @@ internal sealed class SiloConnectionListener : ConnectionListener, ILifecyclePar { internal static readonly object ServicesKey = new object(); private readonly ILocalSiloDetails localSiloDetails; - private readonly IMessageSink rebalancerGateway; private readonly SiloConnectionOptions siloConnectionOptions; private readonly MessageCenter messageCenter; private readonly EndpointOptions endpointOptions; @@ -30,7 +28,6 @@ internal sealed class SiloConnectionListener : ConnectionListener, ILifecyclePar MessageCenter messageCenter, IOptions endpointOptions, ILocalSiloDetails localSiloDetails, - IMessageSink rebalancerGateway, ConnectionManager connectionManager, ConnectionCommon connectionShared, ProbeRequestMonitor probeRequestMonitor, @@ -40,7 +37,6 @@ internal sealed class SiloConnectionListener : ConnectionListener, ILifecyclePar this.siloConnectionOptions = siloConnectionOptions.Value; this.messageCenter = messageCenter; this.localSiloDetails = localSiloDetails; - this.rebalancerGateway = rebalancerGateway; this.connectionManager = connectionManager; this.connectionShared = connectionShared; this.probeRequestMonitor = probeRequestMonitor; @@ -62,8 +58,7 @@ protected override Connection CreateConnection(ConnectionContext context) this.ConnectionOptions, this.connectionShared, this.probeRequestMonitor, - this.connectionPreambleHelper, - this.rebalancerGateway); + this.connectionPreambleHelper); } protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder) diff --git a/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancer.MessageSink.cs b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancer.MessageSink.cs index 04dc0b0831..fa30c4f136 100644 --- a/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancer.MessageSink.cs +++ b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancer.MessageSink.cs @@ -7,7 +7,7 @@ namespace Orleans.Runtime.Placement.Rebalancing; -internal partial class ActivationRebalancer : IMessageSink +internal partial class ActivationRebalancer : IMessageStatisticsSink { private readonly CancellationTokenSource _shutdownCts = new(); private Task? _processPendingEdgesTask;