diff --git a/src/Orleans.Core.Abstractions/Core/Grain.cs b/src/Orleans.Core.Abstractions/Core/Grain.cs index 7c2ad0a7d2..5d310bdfff 100644 --- a/src/Orleans.Core.Abstractions/Core/Grain.cs +++ b/src/Orleans.Core.Abstractions/Core/Grain.cs @@ -95,7 +95,7 @@ protected Grain(IGrainContext grainContext, IGrainRuntime? grainRuntime = null) /// Period of subsequent timer ticks. /// Handle for this Timer. /// - protected IDisposable RegisterTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period) + protected IGrainTimer RegisterTimer(Func asyncCallback, object? state, TimeSpan dueTime, TimeSpan period) { if (asyncCallback == null) throw new ArgumentNullException(nameof(asyncCallback)); diff --git a/src/Orleans.Core.Abstractions/Runtime/IGrainTimer.cs b/src/Orleans.Core.Abstractions/Runtime/IGrainTimer.cs index 7e33d77150..0b1db9ce00 100644 --- a/src/Orleans.Core.Abstractions/Runtime/IGrainTimer.cs +++ b/src/Orleans.Core.Abstractions/Runtime/IGrainTimer.cs @@ -1,27 +1,22 @@ using System; -using System.Threading.Tasks; +using System.Threading; -namespace Orleans.Runtime -{ - /// - /// Represents a grain timer and its functionality. - /// - internal interface IGrainTimer : IDisposable - { - /// - /// Starts the timer. - /// - void Start(); - - /// - /// Stops the timer. - /// - void Stop(); +namespace Orleans.Runtime; - /// - /// Gets the currently executing grain timer task. - /// - /// The currently executing grain timer task. - Task GetCurrentlyExecutingTickTask(); - } +/// +/// Represents a timer belonging to a grain. +/// +public interface IGrainTimer : IDisposable, IAsyncDisposable +{ + /// Changes the start time and the interval between method invocations for a timer, using values to measure time intervals. + /// + /// A representing the amount of time to delay before invoking the callback method specified when the was constructed. + /// Specify to prevent the timer from restarting. Specify to restart the timer immediately. + /// + /// + /// The time interval between invocations of the callback method specified when the timer was constructed. + /// Specify to disable periodic signaling. + /// + /// The or parameter, in milliseconds, is less than -1 or greater than 4294967294. + void Change(TimeSpan dueTime, TimeSpan period); } \ No newline at end of file diff --git a/src/Orleans.Core.Abstractions/Timers/ITimerRegistry.cs b/src/Orleans.Core.Abstractions/Timers/ITimerRegistry.cs index 55b83b08da..e1341f1e2b 100644 --- a/src/Orleans.Core.Abstractions/Timers/ITimerRegistry.cs +++ b/src/Orleans.Core.Abstractions/Timers/ITimerRegistry.cs @@ -1,32 +1,32 @@ +#nullable enable using System; using System.Threading.Tasks; using Orleans.Runtime; -namespace Orleans.Timers +namespace Orleans.Timers; + +/// +/// Functionality for managing grain timers. +/// +public interface ITimerRegistry { /// - /// Functionality for managing grain timers. + /// Creates a grain timer. /// - public interface ITimerRegistry - { - /// - /// Creates a grain timer. - /// - /// The grain which the timer is associated with. - /// The timer callback, which will fire whenever the timer becomes due. - /// The state object passed to the callback. - /// - /// The amount of time to delay before the is invoked. - /// Specify to prevent the timer from starting. - /// Specify to invoke the callback promptly. - /// - /// - /// The time interval between invocations of . - /// Specify to disable periodic signaling. - /// - /// - /// An object which will cancel the timer upon disposal. - /// - IDisposable RegisterTimer(IGrainContext grainContext, Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period); - } + /// The grain which the timer is associated with. + /// The timer callback, which will fire whenever the timer becomes due. + /// The state object passed to the callback. + /// + /// The amount of time to delay before the is invoked. + /// Specify to prevent the timer from starting. + /// Specify to invoke the callback promptly. + /// + /// + /// The time interval between invocations of . + /// Specify to disable periodic signaling. + /// + /// + /// An instance which represents the timer. + /// + IGrainTimer RegisterTimer(IGrainContext grainContext, Func callback, object? state, TimeSpan dueTime, TimeSpan period); } \ No newline at end of file diff --git a/src/Orleans.Core/Runtime/IRuntimeClient.cs b/src/Orleans.Core/Runtime/IRuntimeClient.cs index 389acea6bf..c2101bcca3 100644 --- a/src/Orleans.Core/Runtime/IRuntimeClient.cs +++ b/src/Orleans.Core/Runtime/IRuntimeClient.cs @@ -11,6 +11,11 @@ namespace Orleans.Runtime /// internal interface IRuntimeClient { + /// + /// Gets the time provider used by the system. + /// + TimeProvider TimeProvider { get; } + /// /// Grain Factory to get and cast grain references. /// diff --git a/src/Orleans.Core/Timers/AsyncTaskSafeTimer.cs b/src/Orleans.Core/Timers/AsyncTaskSafeTimer.cs deleted file mode 100644 index a9000fe9c2..0000000000 --- a/src/Orleans.Core/Timers/AsyncTaskSafeTimer.cs +++ /dev/null @@ -1,216 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; - -namespace Orleans.Runtime -{ - /// - /// An internal class for implementing async timers in Orleans. - /// - internal sealed class AsyncTaskSafeTimer : IDisposable - { - private const uint MaxSupportedTimeout = 0xfffffffe; - - private readonly Func _callback; - private readonly ILogger _logger; - private Timer _timer; - private TimeSpan _dueTime; - private TimeSpan _period; - private bool _timerStarted; - private DateTime _previousTickTime; - private int _totalNumTicks; - - internal AsyncTaskSafeTimer(ILogger logger, Func asyncTaskCallback, object state) - { - ArgumentNullException.ThrowIfNull(asyncTaskCallback); - - _callback = asyncTaskCallback; - _period = Constants.INFINITE_TIMESPAN; - _dueTime = Constants.INFINITE_TIMESPAN; - _totalNumTicks = 0; - _logger = logger; - - _timer = NonCapturingTimer.Create(HandleAsyncTaskTimerCallback, state, Constants.INFINITE_TIMESPAN, Constants.INFINITE_TIMESPAN); - } - - public bool IsStarted => _timerStarted; - - public void Start(TimeSpan due, TimeSpan period) - { - ObjectDisposedException.ThrowIf(_timer is null, this); - if (_timerStarted) throw new InvalidOperationException($"Calling start is not allowed, since it was already created in a started mode with specified due."); - if (period == TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(period), period, "Cannot use TimeSpan.Zero for timer period"); - - long dueTm = (long)_dueTime.TotalMilliseconds; - if (dueTm < -1) throw new ArgumentOutOfRangeException(nameof(due), "The due time must not be less than -1."); - if (dueTm > MaxSupportedTimeout) throw new ArgumentOutOfRangeException(nameof(due), "The due time interval must be less than 2^32-2."); - - long periodTm = (long)period.TotalMilliseconds; - if (periodTm < -1) throw new ArgumentOutOfRangeException(nameof(period), "The period must not be less than -1."); - if (periodTm > MaxSupportedTimeout) throw new ArgumentOutOfRangeException(nameof(period), "The period interval must be less than 2^32-2."); - - _period = period; - _dueTime = due; - _timerStarted = true; - _previousTickTime = DateTime.UtcNow; - _timer.Change(due, Constants.INFINITE_TIMESPAN); - } - - public void Stop() - { - _period = Constants.INFINITE_TIMESPAN; - _dueTime = Constants.INFINITE_TIMESPAN; - _timerStarted = false; - _timer?.Change(Constants.INFINITE_TIMESPAN, Constants.INFINITE_TIMESPAN); - } - - public void Dispose() - { - DisposeTimer(); - } - - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] - internal void DisposeTimer() - { - _timerStarted = false; - var result = Interlocked.CompareExchange(ref _timer, null, _timer); - result?.Dispose(); - } - - public bool CheckTimerFreeze(DateTime lastCheckTime, Func callerName) - { - return CheckTimerDelay(_previousTickTime, _totalNumTicks, - _dueTime, _period, _logger, () => $"{nameof(AsyncTaskSafeTimer)}.{callerName()}", ErrorCode.Timer_SafeTimerIsNotTicking, true); - } - - public static bool CheckTimerDelay(DateTime previousTickTime, int totalNumTicks, - TimeSpan dueTime, TimeSpan timerFrequency, ILogger logger, Func getName, ErrorCode errorCode, bool freezeCheck) - { - TimeSpan timeSinceLastTick = DateTime.UtcNow - previousTickTime; - TimeSpan expectedTimeToNextTick = totalNumTicks == 0 ? dueTime : timerFrequency; - TimeSpan exceptedTimeWithSlack; - if (expectedTimeToNextTick >= TimeSpan.FromSeconds(6)) - { - exceptedTimeWithSlack = expectedTimeToNextTick + TimeSpan.FromSeconds(3); - } - else - { - exceptedTimeWithSlack = expectedTimeToNextTick.Multiply(1.5); - } - if (timeSinceLastTick <= exceptedTimeWithSlack) return true; - - // did not tick in the last period. - var logLevel = freezeCheck ? LogLevel.Error : LogLevel.Warning; - if (logger.IsEnabled(logLevel)) - { - var Title = freezeCheck ? "Watchdog Freeze Alert: " : ""; - logger.Log( - logLevel, - (int)errorCode, "{Title}{Name} did not fire on time. Last fired at {LastFired}, {TimeSinceLastTick} since previous fire, should have fired after {ExpectedTimeToNextTick}.", - Title, - getName?.Invoke() ?? "", - LogFormatter.PrintDate(previousTickTime), - timeSinceLastTick, - expectedTimeToNextTick); - } - - return false; - } - - /// - /// Changes the start time and the interval between method invocations for a timer, using TimeSpan values to measure time intervals. - /// - /// A TimeSpan representing the amount of time to delay before invoking the callback method specified when the Timer was constructed. Specify negative one (-1) milliseconds to prevent the timer from restarting. Specify zero (0) to restart the timer immediately. - /// The time interval between invocations of the callback method specified when the Timer was constructed. Specify negative one (-1) milliseconds to disable periodic signaling. - /// true if the timer was successfully updated; otherwise, false. - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] - private bool Change(TimeSpan newDueTime, TimeSpan period) - { - if (period == TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(period), period, "Cannot use TimeSpan.Zero for timer period."); - - if (_timer == null) return false; - - _period = period; - - if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug((int)ErrorCode.TimerChanging, "Changing timer to DueTime={DueTime} Period={Period}.", newDueTime, period); - - try - { - // Queue first new timer tick - return _timer.Change(newDueTime, Constants.INFINITE_TIMESPAN); - } - catch (Exception exc) - { - _logger.LogWarning((int)ErrorCode.TimerChangeError, exc, "Error changing timer period - timer not changed."); - return false; - } - } - - private async void HandleAsyncTaskTimerCallback(object state) - { - if (_timer == null) return; - - // There is a subtle race/issue here w.r.t unobserved promises. - // It may happen than the callback will resolve some promises on which the higher level application code is depends upon - // and this promise's await will fire before the below code (after await or Finally) even runs. - // In the unit test case this may lead to the situation where unit test has finished, but p1 or p2 or p3 have not been observed yet. - // To properly fix this we may use a mutex/monitor to delay execution of asyncCallbackFunc until all CWs and Finally in the code below were scheduled - // (not until CW lambda was run, but just until CW function itself executed). - // This however will relay on scheduler executing these in separate threads to prevent deadlock, so needs to be done carefully. - // In particular, need to make sure we execute asyncCallbackFunc in another thread (so use StartNew instead of ExecuteWithSafeTryCatch). - - try - { - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace((int)ErrorCode.TimerBeforeCallback, "About to make async task timer callback for timer."); - await _callback(state); - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace((int)ErrorCode.TimerAfterCallback, "Completed async task timer callback."); - } - catch (Exception exc) - { - _logger.LogWarning((int)ErrorCode.TimerCallbackError, exc, "Ignored exception during async task timer callback."); - } - finally - { - _previousTickTime = DateTime.UtcNow; - - // Queue next timer callback - QueueNextTimerTick(); - } - } - - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] - private void QueueNextTimerTick() - { - try - { - if (_timer == null) return; - - _totalNumTicks++; - - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace((int)ErrorCode.TimerChanging, "About to queue next tick for timer."); - - if (_period == Constants.INFINITE_TIMESPAN) - { - DisposeTimer(); - - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace((int)ErrorCode.TimerStopped, "Timer is now stopped and disposed."); - } - else - { - _timer.Change(_period, Constants.INFINITE_TIMESPAN); - - if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace((int)ErrorCode.TimerNextTick, "Queued next tick for timer in {TimerFrequency}.", _period); - } - } - catch (ObjectDisposedException ode) - { - _logger.LogWarning((int)ErrorCode.TimerDisposeError, ode, "Timer already disposed - will not queue next timer tick."); - } - catch (Exception exc) - { - _logger.LogError((int)ErrorCode.TimerQueueTickError, exc, "Error queueing next timer tick - WARNING: timer is now stopped."); - } - } - } -} diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index d061d4a3a3..22496d4dab 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -577,7 +577,7 @@ private void StopAllTimers() foreach (var timer in Timers) { - timer.Stop(); + timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); } } } @@ -595,9 +595,11 @@ private Task WaitForAllTimersToFinish(CancellationToken cancellationToken) var timerCopy = Timers.ToList(); // need to copy since OnTimerDisposed will change the timers set. foreach (var timer in timerCopy) { - // first call dispose, then wait to finish. - Utils.SafeExecute(timer.Dispose, _shared.Logger, "timer.Dispose has thrown"); - tasks.Add(timer.GetCurrentlyExecutingTickTask()); + var task = timer.DisposeAsync(); + if (!task.IsCompletedSuccessfully) + { + tasks.Add(task.AsTask()); + } } return Task.WhenAll(tasks).WithCancellation(cancellationToken); diff --git a/src/Orleans.Runtime/Core/SystemTarget.cs b/src/Orleans.Runtime/Core/SystemTarget.cs index 05d785768d..7b1d209758 100644 --- a/src/Orleans.Runtime/Core/SystemTarget.cs +++ b/src/Orleans.Runtime/Core/SystemTarget.cs @@ -7,6 +7,7 @@ using Orleans.GrainReferences; using Orleans.Runtime.Scheduler; using Orleans.Serialization.Invocation; +using Orleans.Timers; namespace Orleans.Runtime { @@ -168,25 +169,23 @@ internal void HandleResponse(Message response) /// /// /// The time interval between invocations of . - /// Specify to disable periodic signalling. + /// Specify to disable periodic signaling. /// - /// The timer name. /// /// An object which will cancel the timer upon disposal. /// - public IDisposable RegisterTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period, string name = null) - => RegisterGrainTimer(asyncCallback, state, dueTime, period, name); + public IGrainTimer RegisterTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period) + => RegisterGrainTimer(asyncCallback, state, dueTime, period); /// - /// Internal version of that returns the inner IGrainTimer + /// Internal version of that returns the inner IGrainTimer /// - internal IGrainTimer RegisterGrainTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period, string name = null) + internal IGrainTimer RegisterGrainTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period) { var ctxt = RuntimeContext.Current; - name = name ?? ctxt.GrainId + "Timer"; - var timer = GrainTimer.FromTaskCallback(this.timerLogger, asyncCallback, state, dueTime, period, name, grainContext: this); - timer.Start(); + var timer = new GrainTimer(this, this.timerLogger, asyncCallback, state, RuntimeClient.TimeProvider); + timer.Change(dueTime, period); return timer; } diff --git a/src/Orleans.Runtime/Timers/GrainTimer.cs b/src/Orleans.Runtime/Timers/GrainTimer.cs index 4b66e8be93..c0e8255360 100644 --- a/src/Orleans.Runtime/Timers/GrainTimer.cs +++ b/src/Orleans.Runtime/Timers/GrainTimer.cs @@ -1,176 +1,205 @@ #nullable enable using System; +using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using Orleans.Runtime.Scheduler; +using Orleans.Runtime.Internal; -namespace Orleans.Runtime +namespace Orleans.Runtime; + +internal sealed class GrainTimer : IGrainTimer { - internal sealed class GrainTimer : IGrainTimer + private static readonly TimeSpan MinimumPeriod = TimeSpan.FromMilliseconds(1); + private readonly PeriodicTimer _timer; + private readonly Func _callback; + private readonly ILogger _logger; + private readonly IGrainContext _grainContext; + private readonly Task _processingTask; + private readonly object? _state; + private readonly TimeProvider _timeProvider; + private TimeSpan _dueTime; + private TimeSpan _period; + private DateTime _previousTickCompletedTime; + private int _tickCount; + + public GrainTimer(IGrainContext grainContext, ILogger logger, Func callback, object? state, TimeProvider timeProvider) { - private readonly Func asyncCallback; - private readonly TimeSpan dueTime; - private readonly TimeSpan timerFrequency; - private readonly ILogger logger; - private readonly IGrainContext grainContext; - private readonly string name; - private DateTime previousTickTime; - private int totalNumTicks; - private volatile AsyncTaskSafeTimer? timer; - private readonly object? state; - private volatile Task? currentlyExecutingTickTask; - - private GrainTimer(IGrainContext grainContext, ILogger logger, Func asyncCallback, object? state, TimeSpan dueTime, TimeSpan period, string? name) - { - if (RuntimeContext.Current is null) - { - throw new InvalidSchedulingContextException( - "Current grain context is null. " - + "Please make sure you are not trying to create a Timer from outside Orleans Task Scheduler, " - + "which will be the case if you create it inside Task.Run."); - } + ArgumentNullException.ThrowIfNull(grainContext); + ArgumentNullException.ThrowIfNull(logger); + ArgumentNullException.ThrowIfNull(callback); - this.grainContext = grainContext; - this.logger = logger; - this.name = name ?? string.Empty; - this.asyncCallback = asyncCallback; - timer = new AsyncTaskSafeTimer(logger, - static stateObj => ((GrainTimer)stateObj).TimerTick(), - this); - this.state = state; - this.dueTime = dueTime; - timerFrequency = period; - previousTickTime = DateTime.UtcNow; - totalNumTicks = 0; - } - - internal static IGrainTimer FromTaskCallback( - ILogger logger, - Func asyncCallback, - object state, - TimeSpan dueTime, - TimeSpan period, - string name, - IGrainContext grainContext) + if (RuntimeContext.Current is null) { - return new GrainTimer(grainContext, logger, asyncCallback, state, dueTime, period, name); + ThrowInvalidSchedulingContext(); } - public void Start() + if (!Equals(RuntimeContext.Current, grainContext)) { - if (timer is not { } asyncTimer) - { - throw new ObjectDisposedException(GetDiagnosticName(), "The timer was already disposed."); - } - - asyncTimer.Start(dueTime, timerFrequency); + ThrowIncorrectGrainContext(); } - public void Stop() + // Avoid capturing async locals. + using (new ExecutionContextSuppressor()) { - // Stop the timer from ticking, but don't dispose it yet since it might be mid-tick. - timer?.Stop(); + _timeProvider = timeProvider; + _grainContext = grainContext; + _logger = logger; + _callback = callback; + _timer = new PeriodicTimer(Timeout.InfiniteTimeSpan, timeProvider); + _state = state; + _dueTime = Timeout.InfiniteTimeSpan; + _period = Timeout.InfiniteTimeSpan; + _previousTickCompletedTime = DateTime.UtcNow; + _processingTask = ProcessTimerTicks(); } + } - private async Task TimerTick() - { - // Schedule call back to grain context - // AsyncSafeTimer ensures that calls to this method are serialized. - var workItem = new AsyncClosureWorkItem(ForwardToAsyncCallback, this.name, grainContext); - grainContext.Scheduler.QueueWorkItem(workItem); - await workItem.Task; - } + [DoesNotReturn] + private static void ThrowIncorrectGrainContext() => throw new InvalidOperationException("Current grain context differs from specified grain context."); - private async Task ForwardToAsyncCallback() + [DoesNotReturn] + private static void ThrowInvalidSchedulingContext() + { + throw new InvalidSchedulingContextException( + "Current grain context is null. " + + "Please make sure you are not trying to create a Timer from outside Orleans Task Scheduler, " + + "which will be the case if you create it inside Task.Run."); + } + + private async Task ProcessTimerTicks() + { + // Yield immediately to let the caller continue. + await Task.Yield(); + + while (await _timer.WaitForNextTickAsync()) { - // AsyncSafeTimer ensures that calls to this method are serialized. try { - var tickTask = currentlyExecutingTickTask = InvokeTimerCallback(); - await tickTask; + if (_logger.IsEnabled(LogLevel.Trace)) + { + _logger.LogTrace((int)ErrorCode.TimerBeforeCallback, "About to make timer callback for timer {TimerName}", GetDiagnosticName()); + } + + ++_tickCount; + await _callback(_state); + + if (_logger.IsEnabled(LogLevel.Trace)) + { + _logger.LogTrace((int)ErrorCode.TimerAfterCallback, "Completed timer callback for timer {TimerName}", GetDiagnosticName()); + } } catch (Exception exc) { - logger.LogError( + _logger.LogError( (int)ErrorCode.Timer_GrainTimerCallbackError, exc, "Caught and ignored exception thrown from timer callback for timer {TimerName}", GetDiagnosticName()); } - finally - { - previousTickTime = DateTime.UtcNow; - currentlyExecutingTickTask = null; - // If this is not a repeating timer, then we can dispose of the timer. - if (timerFrequency == Constants.INFINITE_TIMESPAN) + _previousTickCompletedTime = _timeProvider.GetUtcNow().UtcDateTime; + + // Resume regular ticking. + if (_timer.Period != _period) + { + try + { + _timer.Period = _period; + } + catch (ObjectDisposedException) { - DisposeTimer(); + return; } } } + } - private async Task InvokeTimerCallback() - { - // This is called under a lock, so ensure that the method yields before invoking a callback - // which could take a different lock and potentially cause a deadlock. - await Task.Yield(); + public void Change(TimeSpan dueTime, TimeSpan period) + { + ValidateArguments(dueTime, period); - // If the timer was stopped or disposed since this was scheduled, terminate without invoking the callback. - if (timer is not { IsStarted: true }) - { - return; - } + _dueTime = dueTime; + _period = period; - // Clear any previous RequestContext, so it does not leak into this call by mistake. - RequestContext.Clear(); - totalNumTicks++; + // Periodic timer supports periods greater or equal to 1ms. + // Timers are not high-resolution enough for this to be relevant. + _timer.Period = _dueTime > MinimumPeriod ? _dueTime : MinimumPeriod; + } - if (logger.IsEnabled(LogLevel.Trace)) - { - logger.LogTrace((int)ErrorCode.TimerBeforeCallback, "About to make timer callback for timer {TimerName}", GetDiagnosticName()); - } + private static void ValidateArguments(TimeSpan dueTime, TimeSpan period) + { + // See https://github.com/dotnet/runtime/blob/78b5f40a60d9e095abb2b0aabd8c062b171fb9ab/src/libraries/System.Private.CoreLib/src/System/Threading/Timer.cs#L824-L825 + const uint MaxSupportedTimeout = 0xfffffffe; - await asyncCallback(state); + // See https://github.com/dotnet/runtime/blob/78b5f40a60d9e095abb2b0aabd8c062b171fb9ab/src/libraries/System.Private.CoreLib/src/System/Threading/Timer.cs#L927-L930 + long dueTm = (long)dueTime.TotalMilliseconds; + ArgumentOutOfRangeException.ThrowIfLessThan(dueTm, -1, nameof(dueTime)); + ArgumentOutOfRangeException.ThrowIfGreaterThan(dueTm, MaxSupportedTimeout, nameof(dueTime)); - if (logger.IsEnabled(LogLevel.Trace)) - { - logger.LogTrace((int)ErrorCode.TimerAfterCallback, "Completed timer callback for timer {TimerName}", GetDiagnosticName()); - } - } + long periodTm = (long)period.TotalMilliseconds; + ArgumentOutOfRangeException.ThrowIfLessThan(periodTm, -1, nameof(period)); + ArgumentOutOfRangeException.ThrowIfGreaterThan(periodTm, MaxSupportedTimeout, nameof(period)); + } - public Task GetCurrentlyExecutingTickTask() => currentlyExecutingTickTask ?? Task.CompletedTask; + public bool CheckTimerFreeze(DateTime lastCheckTime, Func callerName) + { + return CheckTimerDelay( + _previousTickCompletedTime, + _tickCount, + _dueTime, + _period, + _logger, + () => $"{nameof(GrainTimer)}.{callerName()}", + ErrorCode.Timer_SafeTimerIsNotTicking, + true); + } - private string GetDiagnosticName() => name switch + public static bool CheckTimerDelay(DateTime previousTickTime, int totalNumTicks, + TimeSpan dueTime, TimeSpan timerFrequency, ILogger logger, Func getName, ErrorCode errorCode, bool freezeCheck) + { + TimeSpan timeSinceLastTick = DateTime.UtcNow - previousTickTime; + TimeSpan expectedTimeToNextTick = totalNumTicks == 0 ? dueTime : timerFrequency; + TimeSpan exceptedTimeWithSlack; + if (expectedTimeToNextTick >= TimeSpan.FromSeconds(6)) { - { Length: > 0 } => $"GrainTimer.{name} TimerCallbackHandler:{asyncCallback?.Target}->{asyncCallback?.Method}", - _ => $"GrainTimer TimerCallbackHandler:{asyncCallback?.Target}->{asyncCallback?.Method}" - }; - - public void Dispose() + exceptedTimeWithSlack = expectedTimeToNextTick + TimeSpan.FromSeconds(3); + } + else { - DisposeTimer(); + exceptedTimeWithSlack = expectedTimeToNextTick.Multiply(1.5); } + if (timeSinceLastTick <= exceptedTimeWithSlack) return true; - private void DisposeTimer() + // did not tick in the last period. + var logLevel = freezeCheck ? LogLevel.Error : LogLevel.Warning; + if (logger.IsEnabled(logLevel)) { - var asyncTimer = Interlocked.CompareExchange(ref timer, null, timer); - if (asyncTimer == null) - { - return; - } - - try - { - asyncTimer.Dispose(); - } - catch (Exception ex) - { - logger.LogError(ex, "Error disposing timer {TimerName}", GetDiagnosticName()); - } - - grainContext.GetComponent()?.OnTimerDisposed(this); + var Title = freezeCheck ? "Watchdog Freeze Alert: " : ""; + logger.Log( + logLevel, + (int)errorCode, "{Title}{Name} did not fire on time. Last fired at {LastFired}, {TimeSinceLastTick} since previous fire, should have fired after {ExpectedTimeToNextTick}.", + Title, + getName?.Invoke() ?? "", + LogFormatter.PrintDate(previousTickTime), + timeSinceLastTick, + expectedTimeToNextTick); } + + return false; + } + + private string GetDiagnosticName() => $"GrainTimer TimerCallbackHandler:{_callback?.Target}->{_callback?.Method}"; + + public void Dispose() + { + _timer.Dispose(); + _grainContext.GetComponent()?.OnTimerDisposed(this); + } + + public async ValueTask DisposeAsync() + { + Dispose(); + await _processingTask; } } diff --git a/src/Orleans.Runtime/Timers/TimerRegistry.cs b/src/Orleans.Runtime/Timers/TimerRegistry.cs index 20aa430f62..a15ddb5e40 100644 --- a/src/Orleans.Runtime/Timers/TimerRegistry.cs +++ b/src/Orleans.Runtime/Timers/TimerRegistry.cs @@ -3,22 +3,18 @@ using Microsoft.Extensions.Logging; using Orleans.Runtime; -namespace Orleans.Timers +namespace Orleans.Timers; + +internal class TimerRegistry(ILoggerFactory loggerFactory, TimeProvider timeProvider) : ITimerRegistry { - internal class TimerRegistry : ITimerRegistry - { - private readonly ILogger timerLogger; - public TimerRegistry(ILoggerFactory loggerFactory) - { - this.timerLogger = loggerFactory.CreateLogger(); - } + private readonly ILogger _timerLogger = loggerFactory.CreateLogger(); + private readonly TimeProvider _timeProvider = timeProvider; - public IDisposable RegisterTimer(IGrainContext grainContext, Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period) - { - var timer = GrainTimer.FromTaskCallback(this.timerLogger, asyncCallback, state, dueTime, period, name: string.Empty, grainContext: grainContext); - grainContext?.GetComponent().OnTimerCreated(timer); - timer.Start(); - return timer; - } + public IGrainTimer RegisterTimer(IGrainContext grainContext, Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period) + { + var timer = new GrainTimer(grainContext, _timerLogger, asyncCallback, state, _timeProvider); + grainContext?.GetComponent().OnTimerCreated(timer); + timer.Change(dueTime, period); + return timer; } } diff --git a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs index 011e9fe82d..989ea9d330 100644 --- a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs +++ b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs @@ -2,12 +2,14 @@ using System.Collections.Generic; using System.Diagnostics.Metrics; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Orleans.Configuration; using Orleans.Internal; using Orleans.Runtime; using Orleans.Streams.Filtering; +using Orleans.Timers; namespace Orleans.Streams { @@ -140,7 +142,7 @@ private void InitializeInternal() } // Setup a reader for a new receiver. - // Even if the receiver failed to initialise, treat it as OK and start pumping it. It's receiver responsibility to retry initialization. + // Even if the receiver failed to initialize, treat it as OK and start pumping it. It's receiver responsibility to retry initialization. var randomTimerOffset = RandomTimeSpan.Next(this.options.GetQueueMsgsTimerPeriod); timer = RegisterGrainTimer(AsyncTimerCallback, QueueId, randomTimerOffset, this.options.GetQueueMsgsTimerPeriod); @@ -154,15 +156,17 @@ public async Task Shutdown() // Stop pulling from queues that are not in my range anymore. logger.LogInformation((int)ErrorCode.PersistentStreamPullingAgent_05, "Shutdown of {Name} responsible for queue: {Queue}", GetType().Name, QueueId.ToStringWithHashCode()); - if (timer != null) + var asyncTimer = timer; + timer = null; + if (asyncTimer != null) { - var tmp = timer; - timer = null; - Utils.SafeExecute(tmp.Dispose, this.logger); - try { - await tmp.GetCurrentlyExecutingTickTask().WithTimeout(TimeSpan.FromSeconds(5)); + var task = asyncTimer.DisposeAsync(); + if (!task.IsCompletedSuccessfully) + { + await task.AsTask().WithTimeout(TimeSpan.FromSeconds(5)); + } } catch (Exception ex) { diff --git a/test/DefaultCluster.Tests/TimerOrleansTest.cs b/test/DefaultCluster.Tests/TimerOrleansTest.cs index b4d2a9c50e..df4a8bed4d 100644 --- a/test/DefaultCluster.Tests/TimerOrleansTest.cs +++ b/test/DefaultCluster.Tests/TimerOrleansTest.cs @@ -120,7 +120,7 @@ public async Task TimerOrleansTest_Migration() last = await grain.GetCounter(); stopwatch.Stop(); - double maximalNumTicks = stopwatch.Elapsed.Divide(period); + int maximalNumTicks = (int)Math.Round(stopwatch.Elapsed.Divide(period), MidpointRounding.ToPositiveInfinity); Assert.True( last <= maximalNumTicks, $"Assert: last <= maximalNumTicks. Actual: last = {last}, maximalNumTicks = {maximalNumTicks}");