Skip to content

Commit

Permalink
Allow GatewayManager initialization to be retried (#6459)
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
  • Loading branch information
ReubenBond committed Apr 7, 2020
1 parent a8227b5 commit 82456b9
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 90 deletions.
91 changes: 47 additions & 44 deletions src/Orleans.Core/Messaging/GatewayManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -20,21 +21,20 @@ namespace Orleans.Messaging
/// </summary>
internal class GatewayManager : IGatewayListListener, IDisposable
{
internal readonly IGatewayListProvider ListProvider;
private readonly object lockable = new object();
private readonly SafeRandom rand = new SafeRandom();
private readonly Dictionary<SiloAddress, DateTime> knownDead = new Dictionary<SiloAddress, DateTime>();
private readonly IGatewayListProvider gatewayListProvider;
private readonly ILogger logger;
private readonly ConnectionManager connectionManager;
private readonly GatewayOptions gatewayOptions;
private AsyncTaskSafeTimer gatewayRefreshTimer;
private readonly Dictionary<SiloAddress, DateTime> knownDead;
private List<SiloAddress> cachedLiveGateways;
private List<SiloAddress> knownGateways;
private DateTime lastRefreshTime;
private int roundRobinCounter;
private readonly SafeRandom rand;
private readonly ILogger logger;
private readonly ILoggerFactory loggerFactory;
private readonly ConnectionManager connectionManager;
private readonly object lockable;

private readonly GatewayOptions gatewayOptions;
private bool gatewayRefreshCallInitiated;
private List<SiloAddress> knownGateways;
private bool gatewayListProviderInitialized;

public GatewayManager(
IOptions<GatewayOptions> gatewayOptions,
Expand All @@ -43,44 +43,47 @@ internal class GatewayManager : IGatewayListListener, IDisposable
ConnectionManager connectionManager)
{
this.gatewayOptions = gatewayOptions.Value;
knownDead = new Dictionary<SiloAddress, DateTime>();
rand = new SafeRandom();
logger = loggerFactory.CreateLogger<GatewayManager>();
this.loggerFactory = loggerFactory;
this.logger = loggerFactory.CreateLogger<GatewayManager>();
this.connectionManager = connectionManager;
lockable = new object();
gatewayRefreshCallInitiated = false;

ListProvider = gatewayListProvider;
this.gatewayListProvider = gatewayListProvider;
this.gatewayRefreshTimer = new AsyncTaskSafeTimer(
loggerFactory.CreateLogger<SafeTimer>(),
RefreshSnapshotLiveGateways_TimerCallback,
null,
this.gatewayOptions.GatewayListRefreshPeriod,
this.gatewayOptions.GatewayListRefreshPeriod);
}

var knownGateways = ListProvider.GetGateways().GetAwaiter().GetResult();
public async Task StartAsync(CancellationToken cancellationToken)
{
if (!gatewayListProviderInitialized)
{
await this.gatewayListProvider.InitializeGatewayListProvider();
gatewayListProviderInitialized = true;
}

var knownGateways = await this.gatewayListProvider.GetGateways();
if (knownGateways.Count == 0)
{
string gatewayProviderType = gatewayListProvider.GetType().FullName;
string err = $"Could not find any gateway in {gatewayProviderType}. Orleans client cannot initialize.";
logger.Error(ErrorCode.GatewayManager_NoGateways, err);
throw new OrleansException(err);
var err = $"Could not find any gateway in {this.gatewayListProvider.GetType().FullName}. Orleans client cannot initialize.";
this.logger.LogError((int)ErrorCode.GatewayManager_NoGateways, err);
throw new SiloUnavailableException(err);
}

logger.Info(ErrorCode.GatewayManager_FoundKnownGateways, "Found {0} knownGateways from Gateway listProvider {1}", knownGateways.Count, Utils.EnumerableToString(knownGateways));
this.logger.LogInformation(
(int)ErrorCode.GatewayManager_FoundKnownGateways,
"Found {GatewayCount} gateways: {Gateways}",
knownGateways.Count,
Utils.EnumerableToString(knownGateways));

if (ListProvider is IGatewayListObservable)
if (this.gatewayListProvider is IGatewayListObservable observable)
{
((IGatewayListObservable)ListProvider).SubscribeToGatewayNotificationEvents(this);
observable.SubscribeToGatewayNotificationEvents(this);
}

roundRobinCounter = this.gatewayOptions.PreferedGatewayIndex >= 0 ? this.gatewayOptions.PreferedGatewayIndex : rand.Next(knownGateways.Count);

this.knownGateways = cachedLiveGateways = knownGateways.Select(gw => gw.ToGatewayAddress()).ToList();

lastRefreshTime = DateTime.UtcNow;
gatewayRefreshTimer = new AsyncTaskSafeTimer(
this.loggerFactory.CreateLogger<SafeTimer>(),
RefreshSnapshotLiveGateways_TimerCallback,
null,
this.gatewayOptions.GatewayListRefreshPeriod,
this.gatewayOptions.GatewayListRefreshPeriod);
this.roundRobinCounter = this.gatewayOptions.PreferedGatewayIndex >= 0 ? this.gatewayOptions.PreferedGatewayIndex : this.rand.Next(knownGateways.Count);
this.knownGateways = this.cachedLiveGateways = knownGateways.Select(gw => gw.ToGatewayAddress()).ToList();
this.lastRefreshTime = DateTime.UtcNow;
}

public void Stop()
Expand All @@ -91,10 +94,10 @@ public void Stop()
}
gatewayRefreshTimer = null;

if (ListProvider != null && ListProvider is IGatewayListObservable)
if (gatewayListProvider is IGatewayListObservable observable)
{
Utils.SafeExecute(
() => ((IGatewayListObservable)ListProvider).UnSubscribeFromGatewayNotificationEvents(this),
() => observable.UnSubscribeFromGatewayNotificationEvents(this),
logger);
}
}
Expand Down Expand Up @@ -187,7 +190,7 @@ internal void ExpediteUpdateLiveGatewaysSnapshot()
{
// If there is already an expedited refresh call in place, don't call again, until the previous one is finished.
// We don't want to issue too many Gateway refresh calls.
if (ListProvider == null || gatewayRefreshCallInitiated) return;
if (gatewayListProvider == null || gatewayRefreshCallInitiated) return;

// Initiate gateway list refresh asynchronously. The Refresh timer will keep ticking regardless.
// We don't want to block the client with synchronously Refresh call.
Expand All @@ -212,7 +215,7 @@ public void GatewayListNotification(IEnumerable<Uri> gateways)
{
try
{
UpdateLiveGatewaysSnapshot(gateways.Select(gw => gw.ToGatewayAddress()), ListProvider.MaxStaleness);
UpdateLiveGatewaysSnapshot(gateways.Select(gw => gw.ToGatewayAddress()), gatewayListProvider.MaxStaleness);
}
catch (Exception exc)
{
Expand All @@ -225,18 +228,18 @@ internal async Task RefreshSnapshotLiveGateways_TimerCallback(object context)
{
try
{
if (ListProvider is null) return;
if (gatewayListProvider is null) return;

// the listProvider.GetGateways() is not under lock.
var allGateways = await ListProvider.GetGateways();
var allGateways = await gatewayListProvider.GetGateways();
var refreshedGateways = allGateways.Select(gw => gw.ToGatewayAddress()).ToList();
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug("Discovered {GatewayCount} gateways: {Gateways}", refreshedGateways.Count, Utils.EnumerableToString(refreshedGateways));
}

// the next one will grab the lock.
UpdateLiveGatewaysSnapshot(refreshedGateways, ListProvider.MaxStaleness);
UpdateLiveGatewaysSnapshot(refreshedGateways, gatewayListProvider.MaxStaleness);
}
catch (Exception exc)
{
Expand Down
62 changes: 19 additions & 43 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne

private readonly ConcurrentDictionary<CorrelationId, CallbackData> callbacks;
private InvokableObjectManager localObjects;

private ClientMessageCenter transport;
private bool firstMessageReceived;
private bool disposing;

Expand All @@ -53,7 +51,6 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne

private MessageFactory messageFactory;
private IPAddress localAddress;
private IGatewayListProvider gatewayListProvider;
private readonly ILoggerFactory loggerFactory;
private readonly IOptions<StatisticsOptions> statisticsOptions;
private readonly ApplicationRequestsStatisticsGroup appRequestStatistics;
Expand All @@ -79,7 +76,7 @@ public IStreamProviderRuntime CurrentStreamProviderRuntime

public IGrainReferenceRuntime GrainReferenceRuntime { get; private set; }

internal ClientMessageCenter MessageCenter => this.transport;
internal ClientMessageCenter MessageCenter { get; private set; }

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope",
Justification = "MessageCenter is IDisposable but cannot call Dispose yet as it lives past the end of this method call.")]
Expand Down Expand Up @@ -175,8 +172,6 @@ internal void ConsumeServices(IServiceProvider services)
throw new InvalidOperationException("TestOnlyThrowExceptionDuringInit");
}

this.gatewayListProvider = this.ServiceProvider.GetRequiredService<IGatewayListProvider>();

var statisticsLevel = statisticsOptions.Value.CollectionLevel;
if (statisticsLevel.CollectThreadTimeTrackingStats())
{
Expand All @@ -195,7 +190,7 @@ internal void ConsumeServices(IServiceProvider services)

private async Task StreamingInitialize()
{
var implicitSubscriberTable = await transport.GetImplicitStreamSubscriberTable(this.InternalGrainFactory);
var implicitSubscriberTable = await MessageCenter.GetImplicitStreamSubscriberTable(this.InternalGrainFactory);
clientProviderRuntime.StreamingInitialize(implicitSubscriberTable);
}

Expand All @@ -211,39 +206,20 @@ public async Task Start(Func<Exception, Task<bool>> retryFilter = null)
// used for testing to (carefully!) allow two clients in the same process
private async Task StartInternal(Func<Exception, Task<bool>> retryFilter)
{
// Initialize the gateway list provider, since information from the cluster is required to successfully
// initialize subsequent services.
var initializedGatewayProvider = new[] {false};
await ExecuteWithRetries(async () =>
{
if (!initializedGatewayProvider[0])
{
await this.gatewayListProvider.InitializeGatewayListProvider();
initializedGatewayProvider[0] = true;
}
var gateways = await this.gatewayListProvider.GetGateways();
if (gateways.Count == 0)
{
var gatewayProviderType = this.gatewayListProvider.GetType().GetParseableName();
var err = $"Could not find any gateway in {gatewayProviderType}. Orleans client cannot initialize.";
logger.Error(ErrorCode.GatewayManager_NoGateways, err);
throw new SiloUnavailableException(err);
}
},
retryFilter);
var gatewayManager = this.ServiceProvider.GetRequiredService<GatewayManager>();
await ExecuteWithRetries(async () => await gatewayManager.StartAsync(CancellationToken.None), retryFilter);

var generation = -SiloAddress.AllocateNewGeneration(); // Client generations are negative
transport = ActivatorUtilities.CreateInstance<ClientMessageCenter>(this.ServiceProvider, localAddress, generation, handshakeClientId);
transport.RegisterLocalMessageHandler(Message.Categories.Application, this.HandleMessage);
transport.Start();
CurrentActivationAddress = ActivationAddress.NewActivationAddress(transport.MyAddress, handshakeClientId);
MessageCenter = ActivatorUtilities.CreateInstance<ClientMessageCenter>(this.ServiceProvider, localAddress, generation, handshakeClientId);
MessageCenter.RegisterLocalMessageHandler(Message.Categories.Application, this.HandleMessage);
MessageCenter.Start();
CurrentActivationAddress = ActivationAddress.NewActivationAddress(MessageCenter.MyAddress, handshakeClientId);

// Keeping this thread handling it very simple for now. Just queue task on thread pool.
Task.Run(this.RunClientMessagePump).Ignore();

await ExecuteWithRetries(
async () => this.GrainTypeResolver = await transport.GetGrainTypeResolver(this.InternalGrainFactory),
async () => this.GrainTypeResolver = await MessageCenter.GetGrainTypeResolver(this.InternalGrainFactory),
retryFilter);

this.typeMapRefreshTimer = new AsyncTaskSafeTimer(
Expand All @@ -253,7 +229,7 @@ private async Task StartInternal(Func<Exception, Task<bool>> retryFilter)
this.typeMapRefreshInterval,
this.typeMapRefreshInterval);

ClientStatistics.Start(transport, clientId);
ClientStatistics.Start(MessageCenter, clientId);

await ExecuteWithRetries(StreamingInitialize, retryFilter);

Expand All @@ -279,7 +255,7 @@ private async Task RefreshGrainTypeResolver(object _)
{
try
{
GrainTypeResolver = await transport.GetGrainTypeResolver(this.InternalGrainFactory);
GrainTypeResolver = await MessageCenter.GetGrainTypeResolver(this.InternalGrainFactory);
}
catch(Exception ex)
{
Expand All @@ -291,7 +267,7 @@ private async Task RunClientMessagePump()
{
incomingMessagesThreadTimeTracking?.OnStartExecution();

var reader = transport.GetReader(Message.Categories.Application);
var reader = MessageCenter.GetReader(Message.Categories.Application);

while (true)
{
Expand Down Expand Up @@ -330,8 +306,8 @@ private void HandleMessage(Message message)
if (!handshakeClientId.Equals(message.TargetGrain))
{
clientId = message.TargetGrain;
transport.UpdateClientId(clientId);
CurrentActivationAddress = ActivationAddress.GetAddress(transport.MyAddress, clientId, CurrentActivationAddress.Activation);
MessageCenter.UpdateClientId(clientId);
CurrentActivationAddress = ActivationAddress.GetAddress(MessageCenter.MyAddress, clientId, CurrentActivationAddress.Activation);
}
else
{
Expand Down Expand Up @@ -364,7 +340,7 @@ public void SendResponse(Message request, Response response)
OrleansOutsideRuntimeClientEvent.Log.SendResponse(message);
message.BodyObject = response;

transport.SendMessage(message);
MessageCenter.SendMessage(message);
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope",
Expand Down Expand Up @@ -418,7 +394,7 @@ private void SendRequestMessage(GrainReference target, Message message, TaskComp
}

if (logger.IsEnabled(LogLevel.Trace)) logger.Trace("Send {0}", message);
transport.SendMessage(message);
MessageCenter.SendMessage(message);
}

public void ReceiveResponse(Message response)
Expand Down Expand Up @@ -488,9 +464,9 @@ public void Reset(bool cleanup)

Utils.SafeExecute(() =>
{
if (transport != null)
if (MessageCenter != null)
{
transport.Stop();
MessageCenter.Stop();
}
}, logger, "Client.Stop-Transport");
Utils.SafeExecute(() =>
Expand Down Expand Up @@ -595,7 +571,7 @@ public void Dispose()
}
});

Utils.SafeExecute(() => transport?.Dispose());
Utils.SafeExecute(() => MessageCenter?.Dispose());
if (ClientStatistics != null)
{
Utils.SafeExecute(() => ClientStatistics.Dispose());
Expand Down
8 changes: 5 additions & 3 deletions test/TesterInternal/GatewaySelectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Orleans.Internal;
using Xunit;
using Xunit.Abstractions;
using System.Threading;

namespace UnitTests.MessageCenterTests
{
Expand All @@ -34,13 +35,13 @@ public GatewaySelectionTest(ITestOutputHelper output)
}

[Fact, TestCategory("BVT"), TestCategory("Gateway")]
public void GatewaySelection()
public async Task GatewaySelection()
{
var listProvider = new TestListProvider(gatewayAddressUris);
Test_GatewaySelection(listProvider);
await Test_GatewaySelection(listProvider);
}

protected void Test_GatewaySelection(IGatewayListProvider listProvider)
protected async Task Test_GatewaySelection(IGatewayListProvider listProvider)
{
IList<Uri> gatewayUris = listProvider.GetGateways().GetResult();
Assert.True(gatewayUris.Count > 0, $"Found some gateways. Data = {Utils.EnumerableToString(gatewayUris)}");
Expand All @@ -51,6 +52,7 @@ protected void Test_GatewaySelection(IGatewayListProvider listProvider)
}).ToList();

var gatewayManager = new GatewayManager(Options.Create(new GatewayOptions()), listProvider, NullLoggerFactory.Instance, null);
await gatewayManager.StartAsync(CancellationToken.None);

var counts = new int[4];

Expand Down

0 comments on commit 82456b9

Please sign in to comment.