Skip to content

Commit

Permalink
Clean up SafeTimer usage, replace with PeriodicTimer where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Apr 21, 2024
1 parent 2b50ea1 commit 79be326
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 449 deletions.
5 changes: 2 additions & 3 deletions src/Orleans.Core/Core/ClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static void ValidateSystemConfiguration(IServiceProvider serviceProvider)
/// <inheritdoc />
public async Task StartAsync(CancellationToken cancellationToken)
{
await _runtimeClient.Start(cancellationToken).ConfigureAwait(false);
await _runtimeClient.StartAsync(cancellationToken).ConfigureAwait(false);
await _clusterClientLifecycle.OnStart(cancellationToken).ConfigureAwait(false);
}

Expand All @@ -71,8 +71,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
_logger.LogInformation("Client shutting down");

await _clusterClientLifecycle.OnStop(cancellationToken).ConfigureAwait(false);

_runtimeClient?.Reset();
await _runtimeClient.StopAsync(cancellationToken).WaitAsync(cancellationToken);
}
finally
{
Expand Down
6 changes: 5 additions & 1 deletion src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public static void AddDefaultServices(IClientBuilder builder)

services.Add(ServiceDescriptor);

// Common services
services.AddLogging();
services.AddOptions();
services.TryAddSingleton<TimeProvider>(TimeProvider.System);

// Options logging
services.TryAddSingleton(typeof(IOptionFormatter<>), typeof(DefaultOptionsFormatter<>));
services.TryAddSingleton(typeof(IOptionFormatterResolver<>), typeof(DefaultOptionsFormatterResolver<>));
Expand All @@ -64,7 +69,6 @@ public static void AddDefaultServices(IClientBuilder builder)
services.AddFromExisting<IHostEnvironmentStatistics, OldEnvironmentStatistics>();
#pragma warning restore 618

services.AddLogging();
services.TryAddSingleton<GrainBindingsResolver>();
services.TryAddSingleton<LocalClientDetails>();
services.TryAddSingleton<OutsideRuntimeClient>();
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Core/Messaging/ClientMessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ private async Task EstablishInitialConnection(CancellationToken cancellationToke
}
}

public void Stop()
public async Task StopAsync(CancellationToken cancellationToken)
{
Running = false;
gatewayManager.Stop();
await gatewayManager.StopAsync(cancellationToken);
}

public void DispatchLocalMessage(Message message)
Expand Down
79 changes: 50 additions & 29 deletions src/Orleans.Core/Messaging/GatewayManager.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -27,28 +28,29 @@ internal class GatewayManager : IDisposable
private readonly ILogger logger;
private readonly ConnectionManager connectionManager;
private readonly GatewayOptions gatewayOptions;
private AsyncTaskSafeTimer gatewayRefreshTimer;
private List<SiloAddress> cachedLiveGateways;
private HashSet<SiloAddress> cachedLiveGatewaysSet;
private List<SiloAddress> knownGateways;
private readonly PeriodicTimer gatewayRefreshTimer;
private List<SiloAddress> cachedLiveGateways = [];
private HashSet<SiloAddress> cachedLiveGatewaysSet = [];
private List<SiloAddress> knownGateways = [];
private DateTime lastRefreshTime;
private int roundRobinCounter;
private bool gatewayRefreshCallInitiated;
private bool gatewayListProviderInitialized;

private readonly ILogger<SafeTimer> timerLogger;
private Task? gatewayRefreshTimerTask;

public GatewayManager(
IOptions<GatewayOptions> gatewayOptions,
IGatewayListProvider gatewayListProvider,
ILoggerFactory loggerFactory,
ConnectionManager connectionManager)
ConnectionManager connectionManager,
TimeProvider timeProvider)
{
this.gatewayOptions = gatewayOptions.Value;
this.logger = loggerFactory.CreateLogger<GatewayManager>();
this.connectionManager = connectionManager;
this.gatewayListProvider = gatewayListProvider;
this.timerLogger = loggerFactory.CreateLogger<SafeTimer>();

this.gatewayRefreshTimer = new PeriodicTimer(this.gatewayOptions.GatewayListRefreshPeriod, timeProvider);
}

public async Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -59,13 +61,6 @@ public async Task StartAsync(CancellationToken cancellationToken)
gatewayListProviderInitialized = true;
}

this.gatewayRefreshTimer = new AsyncTaskSafeTimer(
this.timerLogger,
RefreshSnapshotLiveGateways_TimerCallback,
null,
this.gatewayOptions.GatewayListRefreshPeriod,
this.gatewayOptions.GatewayListRefreshPeriod);

var knownGateways = await this.gatewayListProvider.GetGateways();
if (knownGateways.Count == 0)
{
Expand All @@ -81,19 +76,28 @@ public async Task StartAsync(CancellationToken cancellationToken)
Utils.EnumerableToString(knownGateways));

this.roundRobinCounter = this.gatewayOptions.PreferredGatewayIndex >= 0 ? this.gatewayOptions.PreferredGatewayIndex : Random.Shared.Next(knownGateways.Count);
this.knownGateways = this.cachedLiveGateways = knownGateways.Select(gw => gw.ToGatewayAddress()).ToList();
var newGateways = new List<SiloAddress>();
foreach (var gatewayUri in knownGateways)
{
if (gatewayUri?.ToGatewayAddress() is { } gatewayAddress)
{
newGateways.Add(gatewayAddress);
}
}

this.knownGateways = this.cachedLiveGateways = newGateways;
this.cachedLiveGatewaysSet = new HashSet<SiloAddress>(cachedLiveGateways);
this.lastRefreshTime = DateTime.UtcNow;
this.gatewayRefreshTimerTask ??= PeriodicallyRefreshGatewaySnapshot();
}

public void Stop()
public async Task StopAsync(CancellationToken cancellationToken)
{
if (gatewayRefreshTimer != null)
gatewayRefreshTimer.Dispose();
if (gatewayRefreshTimerTask is { } task)
{
Utils.SafeExecute(gatewayRefreshTimer.Dispose, logger);
await task.WaitAsync(cancellationToken);
}

gatewayRefreshTimer = null;
}

public void MarkAsDead(SiloAddress gateway)
Expand Down Expand Up @@ -217,22 +221,39 @@ internal void ExpediteUpdateLiveGatewaysSnapshot()
{
try
{
await RefreshSnapshotLiveGateways_TimerCallback(null);
gatewayRefreshCallInitiated = false;
await RefreshGatewaySnapshot();
}
catch
finally
{
// Intentionally ignore any exceptions here.
gatewayRefreshCallInitiated = false;
}
});
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
internal async Task RefreshSnapshotLiveGateways_TimerCallback(object context)
internal async Task PeriodicallyRefreshGatewaySnapshot()
{
await Task.Yield();

if (gatewayListProvider is null)
{
return;
}

while (await gatewayRefreshTimer.WaitForNextTickAsync())
{
await RefreshGatewaySnapshot();
}
}

private async Task RefreshGatewaySnapshot()
{
try
{
if (gatewayListProvider is null) return;
if (gatewayListProvider is null)
{
return;
}

// the listProvider.GetGateways() is not under lock.
var allGateways = await gatewayListProvider.GetGateways();
Expand All @@ -242,7 +263,7 @@ internal async Task RefreshSnapshotLiveGateways_TimerCallback(object context)
}
catch (Exception exc)
{
logger.LogError((int)ErrorCode.ProxyClient_GetGateways, exc, "Exception occurred during RefreshSnapshotLiveGateways_TimerCallback -> listProvider.GetGateways()");
logger.LogError((int)ErrorCode.ProxyClient_GetGateways, exc, "Error refreshing gateways.");
}
}

Expand Down Expand Up @@ -367,7 +388,7 @@ private async Task CloseEvictedGatewayConnections(List<SiloAddress> liveGateways

public void Dispose()
{
this.gatewayRefreshTimer?.Dispose();
this.gatewayRefreshTimer.Dispose();
}
}
}
77 changes: 50 additions & 27 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne
private readonly ILoggerFactory loggerFactory;

private readonly SharedCallbackData sharedCallbackData;
private SafeTimer callbackTimer;
private readonly PeriodicTimer callbackTimer;
private Task callbackTimerTask;

public GrainAddress CurrentActivationAddress
{
get;
Expand All @@ -60,16 +62,18 @@ public string CurrentActivationIdentity
ILoggerFactory loggerFactory,
IOptions<ClientMessagingOptions> clientMessagingOptions,
MessagingTrace messagingTrace,
IServiceProvider serviceProvider)
IServiceProvider serviceProvider,
TimeProvider timeProvider)
{
TimeProvider = timeProvider;
this.ServiceProvider = serviceProvider;
_localClientDetails = localClientDetails;
this.loggerFactory = loggerFactory;
this.messagingTrace = messagingTrace;
this.logger = loggerFactory.CreateLogger<OutsideRuntimeClient>();
callbacks = new ConcurrentDictionary<CorrelationId, CallbackData>();
this.clientMessagingOptions = clientMessagingOptions.Value;

this.callbackTimer = new PeriodicTimer(TimeSpan.FromTicks(Math.Min(this.clientMessagingOptions.ResponseTimeout.Ticks, TimeSpan.FromSeconds(1).Ticks)), timeProvider);
this.sharedCallbackData = new SharedCallbackData(
msg => this.UnregisterCallback(msg.Id),
this.loggerFactory.CreateLogger<CallbackData>(),
Expand Down Expand Up @@ -104,10 +108,7 @@ internal void ConsumeServices()
this.messagingTrace,
this.loggerFactory.CreateLogger<ClientGrainContext>());

var timerLogger = this.loggerFactory.CreateLogger<SafeTimer>();
var minTicks = Math.Min(this.clientMessagingOptions.ResponseTimeout.Ticks, TimeSpan.FromSeconds(1).Ticks);
var period = TimeSpan.FromTicks(minTicks);
this.callbackTimer = new SafeTimer(timerLogger, this.OnCallbackExpiryTick, null, period, period);
this.callbackTimerTask = Task.Run(MonitorCallbackExpiry);

this.GrainReferenceRuntime = this.ServiceProvider.GetRequiredService<IGrainReferenceRuntime>();

Expand All @@ -129,7 +130,9 @@ internal void ConsumeServices()

public IServiceProvider ServiceProvider { get; private set; }

public async Task Start(CancellationToken cancellationToken)
public TimeProvider TimeProvider { get; }

public async Task StartAsync(CancellationToken cancellationToken)
{
ConsumeServices();

Expand All @@ -140,6 +143,22 @@ public async Task Start(CancellationToken cancellationToken)
logger.LogInformation((int)ErrorCode.ProxyClient_StartDone, "Started client with address {ActivationAddress} and id {ClientId}", CurrentActivationAddress.ToString(), _localClientDetails.ClientId);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
this.callbackTimer.Dispose();
if (this.callbackTimerTask is { } task)
{
await task.WaitAsync(cancellationToken);
}

if (MessageCenter is { } messageCenter)
{
await messageCenter.StopAsync(cancellationToken);
}

ConstructorReset();
}

// used for testing to (carefully!) allow two clients in the same process
private async Task StartInternal(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -313,18 +332,6 @@ private void UnregisterCallback(CorrelationId id)
callbacks.TryRemove(id, out _);
}

public void Reset()
{
Utils.SafeExecute(() =>
{
if (MessageCenter != null)
{
MessageCenter.Stop();
}
}, logger, "Client.Stop-Transport");
ConstructorReset();
}

private void ConstructorReset()
{
Utils.SafeExecute(() => this.Dispose());
Expand Down Expand Up @@ -380,7 +387,7 @@ public void Dispose()
if (this.disposing) return;
this.disposing = true;

Utils.SafeExecute(() => this.callbackTimer?.Dispose());
Utils.SafeExecute(() => this.callbackTimer.Dispose());

Utils.SafeExecute(() => MessageCenter?.Dispose());

Expand Down Expand Up @@ -434,14 +441,30 @@ public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousN
}
}

private void OnCallbackExpiryTick(object state)
private async Task MonitorCallbackExpiry()
{
var currentStopwatchTicks = ValueStopwatch.GetTimestamp();
foreach (var pair in callbacks)
while (await callbackTimer.WaitForNextTickAsync())
{
var callback = pair.Value;
if (callback.IsCompleted) continue;
if (callback.IsExpired(currentStopwatchTicks)) callback.OnTimeout();
try
{
var currentStopwatchTicks = ValueStopwatch.GetTimestamp();
foreach (var (_, callback) in callbacks)
{
if (callback.IsCompleted)
{
continue;
}

if (callback.IsExpired(currentStopwatchTicks))
{
callback.OnTimeout();
}
}
}
catch (Exception ex)
{
logger.LogWarning(ex, "Error while processing callback expiry.");
}
}
}

Expand Down
Loading

0 comments on commit 79be326

Please sign in to comment.