Skip to content

Commit

Permalink
Make timers typed, fix NonReentrantGrainTimer change logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Apr 30, 2024
1 parent 2767c63 commit 47bfa8b
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 96 deletions.
32 changes: 24 additions & 8 deletions src/Orleans.Core.Abstractions/Core/Grain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Orleans.Core;
using Orleans.Runtime;
using Orleans.Serialization.TypeSystem;
using Orleans.Timers;

namespace Orleans
{
Expand Down Expand Up @@ -89,22 +90,37 @@ protected Grain(IGrainContext grainContext, IGrainRuntime? grainRuntime = null)
/// will be logged, but will not prevent the next timer tick from being queued.
/// </para>
/// </remarks>
/// <param name="asyncCallback">Callback function to be invoked when timer ticks.</param>
/// <param name="callback">Callback function to be invoked when timer ticks.</param>
/// <param name="state">State object that will be passed as argument when calling the asyncCallback.</param>
/// <param name="dueTime">Due time for first timer tick.</param>
/// <param name="period">Period of subsequent timer ticks.</param>
/// <param name="reentrant">
/// Whether the timer callback can be interleaved with grain calls. Defaults to <see langword="true"/>.
/// </param>
/// <returns>Handle for this Timer.</returns>
/// <seealso cref="IDisposable"/>
protected IGrainTimer RegisterTimer(Func<object?, Task> asyncCallback, object? state, TimeSpan dueTime, TimeSpan period, bool reentrant = true)
protected IGrainTimer RegisterTimer(Func<object?, Task> callback, object? state, TimeSpan dueTime, TimeSpan period)
{
ArgumentNullException.ThrowIfNull(callback);

EnsureRuntime();
return Runtime.TimerRegistry.RegisterTimer(GrainContext ?? RuntimeContext.Current, callback, state, dueTime, period);
}

/// <summary>
/// Creates a grain timer.
/// </summary>
/// <param name="callback">The timer callback, which will fire whenever the timer becomes due.</param>
/// <param name="state">The state object passed to the callback.</param>
/// <param name="options">
/// The options for creating the timer.
/// </param>
/// <returns>
/// An <see cref="IGrainTimer"/> instance which represents the timer.
/// </returns>
protected IGrainTimer RegisterTimer<T>(Func<T, Task> callback, T state, TimerCreationOptions options)
{
if (asyncCallback == null)
throw new ArgumentNullException(nameof(asyncCallback));
ArgumentNullException.ThrowIfNull(callback);

EnsureRuntime();
return Runtime.TimerRegistry.RegisterTimer(GrainContext ?? RuntimeContext.Current, asyncCallback, state, dueTime, period, reentrant);
return Runtime.TimerRegistry.RegisterTimer(GrainContext ?? RuntimeContext.Current, callback, state, options);
}

/// <summary>
Expand Down
22 changes: 18 additions & 4 deletions src/Orleans.Core.Abstractions/Timers/ITimerRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,25 @@ public interface ITimerRegistry
/// The time interval between invocations of <paramref name="callback"/>.
/// Specify <see cref="System.Threading.Timeout.InfiniteTimeSpan"/> to disable periodic signaling.
/// </param>
/// <param name="reentrant">
/// Whether the timer callback can be interleaved with grain calls. Defaults to <see langword="true"/>.
/// <returns>
/// An <see cref="IGrainTimer"/> instance which represents the timer.
/// </returns>
IGrainTimer RegisterTimer(IGrainContext grainContext, Func<object?, Task> callback, object? state, TimeSpan dueTime, TimeSpan period);

/// <summary>
/// Creates a grain timer.
/// </summary>
/// <param name="grainContext">The grain which the timer is associated with.</param>
/// <param name="callback">The timer callback, which will fire whenever the timer becomes due.</param>
/// <param name="state">The state object passed to the callback.</param>
/// <param name="options">
/// The options for creating the timer.
/// </param>
/// <typeparam name="T">
/// The type of <paramref name="state"/>.
/// </typeparam>
/// <returns>
/// An <see cref="IGrainTimer"/> instance which represents the timer.
/// </returns>
IGrainTimer RegisterTimer(IGrainContext grainContext, Func<object?, Task> callback, object? state, TimeSpan dueTime, TimeSpan period, bool reentrant = true);
}
IGrainTimer RegisterTimer<T>(IGrainContext grainContext, Func<T, Task> callback, T state, TimerCreationOptions options);
}
36 changes: 36 additions & 0 deletions src/Orleans.Core.Abstractions/Timers/TimerCreationOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#nullable enable
using System;
using System.Threading;

namespace Orleans.Runtime;

/// <summary>
/// Options for registering grain timers.
/// </summary>
public readonly struct TimerCreationOptions()
{
/// <summary>
/// The amount of time to delay before the timer callback is invoked.
/// Specify <see cref="Timeout.InfiniteTimeSpan"/> to prevent the timer from starting.
/// Specify <see cref="TimeSpan.Zero"/> to invoke the callback promptly.
/// </summary>
public required TimeSpan DueTime { get; init; }

/// <summary>
/// The time interval between invocations of callback.
/// Specify <see cref="Timeout.InfiniteTimeSpan"/> to disable periodic signaling.
/// </summary>
public TimeSpan Period { get; init; } = Timeout.InfiniteTimeSpan;

/// <summary>
/// Gets a value indicating whether callbacks scheduled by this timer are allowed to interleave execution with other timers and grain calls.
/// Defaults to <see langword="false"/>.
/// </summary>
/// <remarks>
/// If this value is <see langword="false"/>, the timer callback will be treated akin to a grain call. If the grain scheduling this timer is reentrant
/// (i.e., it has the <see cref="Concurrency.ReentrantAttribute"/> attributed applied to its implementation class), the timer callback will be allowed
/// to interleave with other grain calls and timers regardless of the value of this property.
/// If this value is <see langword="true"/>, the timer callback will be allowed to interleave with other timers and grain calls.
/// </remarks>
public bool Reentrant { get; init; }
}
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Core/SystemTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ internal IGrainTimer RegisterGrainTimer(Func<object, Task> asyncCallback, object
{
var ctxt = RuntimeContext.Current;

var timer = new GrainTimer(this, this.timerLogger, asyncCallback, state, RuntimeClient.TimeProvider);
var timer = new GrainTimer<object>(this, this.timerLogger, asyncCallback, state, RuntimeClient.TimeProvider);
timer.Change(dueTime, period);
return timer;
}
Expand Down
17 changes: 11 additions & 6 deletions src/Orleans.Runtime/Timers/GrainTimer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,26 @@

namespace Orleans.Runtime;

internal sealed class GrainTimer : IGrainTimer
internal abstract class GrainTimer
{
protected static readonly TimeSpan MinimumPeriod = TimeSpan.FromMilliseconds(1);
}

internal sealed class GrainTimer<T> : GrainTimer, IGrainTimer
{
private static readonly TimeSpan MinimumPeriod = TimeSpan.FromMilliseconds(1);
private readonly PeriodicTimer _timer;
private readonly Func<object?, Task> _callback;
private readonly Func<T, Task> _callback;
private readonly ILogger _logger;
private readonly IGrainContext _grainContext;
private readonly Task _processingTask;
private readonly object? _state;
private readonly T _state;
private readonly TimeProvider _timeProvider;
private TimeSpan _dueTime;
private TimeSpan _period;
private DateTime _previousTickCompletedTime;
private int _tickCount;

public GrainTimer(IGrainContext grainContext, ILogger logger, Func<object?, Task> callback, object? state, TimeProvider timeProvider)
public GrainTimer(IGrainContext grainContext, ILogger logger, Func<T, Task> callback, T state, TimeProvider timeProvider)
{
ArgumentNullException.ThrowIfNull(grainContext);
ArgumentNullException.ThrowIfNull(logger);
Expand All @@ -47,6 +51,7 @@ public GrainTimer(IGrainContext grainContext, ILogger logger, Func<object?, Task
_dueTime = Timeout.InfiniteTimeSpan;
_period = Timeout.InfiniteTimeSpan;
_previousTickCompletedTime = timeProvider.GetUtcNow().UtcDateTime;

// Avoid capturing async locals.
using (new ExecutionContextSuppressor())
{
Expand Down Expand Up @@ -150,7 +155,7 @@ public bool CheckTimerFreeze(DateTime lastCheckTime, Func<string> callerName)
_dueTime,
_period,
_logger,
() => $"{nameof(GrainTimer)}.{callerName()}",
() => $"{nameof(GrainTimer<T>)}.{callerName()}",
ErrorCode.Timer_SafeTimerIsNotTicking,
true);
}
Expand Down
118 changes: 46 additions & 72 deletions src/Orleans.Runtime/Timers/NonReentrantGrainTimer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,28 @@
using Orleans.Timers;

namespace Orleans.Runtime;

internal sealed class NonReentrantGrainTimer : IGrainTimer, IInvokable
internal abstract class NonReentrantGrainTimer
{
private static readonly TimerCallback TimerCallback = (state) => ((NonReentrantGrainTimer)state!).ScheduleTickOnActivation();
private static readonly MethodInfo InvokeTickAsyncMethod = typeof(NonReentrantGrainTimer).GetMethod(nameof(InvokeTickAsync), BindingFlags.Instance | BindingFlags.NonPublic)!;
private static readonly GrainInterfaceType TimerInterfaceType = GrainInterfaceType.Create("Orleans.Runtime.IGrainTimer");
protected static readonly GrainInterfaceType TimerInterfaceType = GrainInterfaceType.Create("Orleans.Runtime.IGrainTimer");
protected static readonly TimerCallback TimerCallback = (state) => ((NonReentrantGrainTimer)state!).ScheduleTickOnActivation();
protected static readonly MethodInfo InvokeTickAsyncMethod = typeof(NonReentrantGrainTimer).GetMethod(nameof(InvokeTickAsync), BindingFlags.Instance | BindingFlags.NonPublic)!;
protected abstract void ScheduleTickOnActivation();
protected abstract ValueTask<Response> InvokeTickAsync(CancellationToken cancellationToken);
}

internal sealed class NonReentrantGrainTimer<T> : NonReentrantGrainTimer, IGrainTimer, IInvokable
{
private readonly ITimer _timer;
private readonly Func<object?, Task> _callback;
private readonly Func<T, Task> _callback;
private readonly IGrainContext _grainContext;
private readonly object? _state;
private readonly T _state;
private readonly TimerRegistry _shared;
private bool _changed;
private bool _firing;
private TimeSpan _dueTime;
private TimeSpan _period;
private DateTime _previousTickCompletedTime;
private int _tickCount;

public NonReentrantGrainTimer(TimerRegistry shared, IGrainContext grainContext, Func<object?, Task> callback, object? state)
public NonReentrantGrainTimer(TimerRegistry shared, IGrainContext grainContext, Func<T, Task> callback, T state)
{
ArgumentNullException.ThrowIfNull(shared);
ArgumentNullException.ThrowIfNull(grainContext);
Expand All @@ -50,7 +54,6 @@ public NonReentrantGrainTimer(TimerRegistry shared, IGrainContext grainContext,
_state = state;
_dueTime = Timeout.InfiniteTimeSpan;
_period = Timeout.InfiniteTimeSpan;
_previousTickCompletedTime = _shared.TimeProvider.GetUtcNow().UtcDateTime;

// Avoid capturing async locals.
using (new ExecutionContextSuppressor())
Expand All @@ -73,8 +76,11 @@ private static void ThrowInvalidSchedulingContext()
+ "which will be the case if you create it inside Task.Run.");
}

private void ScheduleTickOnActivation()
protected override void ScheduleTickOnActivation()
{
// Indicate that the timer is firing so that the effect of the next change call is deferred until after the tick completes.
_firing = true;

// Note: this does not execute on the activation's execution context.
var msg = _shared.MessageFactory.CreateMessage(this, InvokeMethodOptions.OneWay);
msg.SetInfiniteTimeToLive();
Expand All @@ -90,7 +96,7 @@ private void ScheduleTickOnActivation()
_grainContext.ReceiveMessage(msg);
}

public ValueTask<Response> InvokeTickAsync(CancellationToken cancellationToken)
protected override ValueTask<Response> InvokeTickAsync(CancellationToken cancellationToken)
{
try
{
Expand All @@ -99,7 +105,7 @@ public ValueTask<Response> InvokeTickAsync(CancellationToken cancellationToken)
Logger.LogTrace((int)ErrorCode.TimerBeforeCallback, "About to invoke callback for timer {TimerName}", GetDiagnosticName());
}

++_tickCount;
_changed = false;
var task = _callback(_state);

// If the task is not completed, we need to await the tick asynchronously.
Expand All @@ -113,19 +119,31 @@ public ValueTask<Response> InvokeTickAsync(CancellationToken cancellationToken)
Logger.LogTrace((int)ErrorCode.TimerAfterCallback, "Completed timer callback for timer {TimerName}", GetDiagnosticName());
}

OnTickCompleted();
return new(Response.Completed);
}
catch (Exception exc)
{
OnTickCompleted();
return new(OnCallbackException(exc));
}
finally
{
_previousTickCompletedTime = _shared.TimeProvider.GetUtcNow().UtcDateTime;
}

// Schedule the next tick.
private void OnTickCompleted()
{
// Schedule the next tick.
if (!_changed)
{
// If the timer was not modified during the tick, schedule the next tick based on the period.
_timer.Change(_period, Timeout.InfiniteTimeSpan);
}
else
{
// If the timer was modified during the tick, schedule the next tick based on the new due time.
_timer.Change(_dueTime, Timeout.InfiniteTimeSpan);
}

_firing = false;
}

private Response OnCallbackException(Exception exc)
Expand Down Expand Up @@ -157,23 +175,26 @@ private async ValueTask<Response> AwaitCallbackTask(Task task, CancellationToken
}
finally
{
_previousTickCompletedTime = _shared.TimeProvider.GetUtcNow().UtcDateTime;

// Schedule the next tick.
_timer.Change(_period, Timeout.InfiniteTimeSpan);
OnTickCompleted();
}
}

public void Change(TimeSpan dueTime, TimeSpan period)
{
ValidateArguments(dueTime, period);

_changed = true;
_dueTime = dueTime;
_period = period;

// The callback resets the timer, so the next tick will fire at the 'dueTime' and
// subsequent ticks will be scheduled by the logic in this class.
_timer.Change(dueTime, Timeout.InfiniteTimeSpan);
// If the timer is currently firing, the change will be deferred until after the tick completes.
// Otherwise, perform the change now.
if (!_firing)
{
// This method resets the timer, so the next tick will be scheduled at the new due time and subsequent
// ticks will be scheduled after the specified period.
_timer.Change(dueTime, Timeout.InfiniteTimeSpan);
}
}

private static void ValidateArguments(TimeSpan dueTime, TimeSpan period)
Expand All @@ -191,53 +212,6 @@ private static void ValidateArguments(TimeSpan dueTime, TimeSpan period)
ArgumentOutOfRangeException.ThrowIfGreaterThan(periodTm, MaxSupportedTimeout, nameof(period));
}

public bool CheckTimerFreeze(DateTime lastCheckTime, Func<string> callerName)
{
return CheckTimerDelay(
_previousTickCompletedTime,
_tickCount,
_dueTime,
_period,
Logger,
() => $"{nameof(GrainTimer)}.{callerName()}",
ErrorCode.Timer_SafeTimerIsNotTicking,
true);
}

public static bool CheckTimerDelay(DateTime previousTickTime, int totalNumTicks,
TimeSpan dueTime, TimeSpan timerFrequency, ILogger logger, Func<string> getName, ErrorCode errorCode, bool freezeCheck)
{
TimeSpan timeSinceLastTick = DateTime.UtcNow - previousTickTime;
TimeSpan expectedTimeToNextTick = totalNumTicks == 0 ? dueTime : timerFrequency;
TimeSpan exceptedTimeWithSlack;
if (expectedTimeToNextTick >= TimeSpan.FromSeconds(6))
{
exceptedTimeWithSlack = expectedTimeToNextTick + TimeSpan.FromSeconds(3);
}
else
{
exceptedTimeWithSlack = expectedTimeToNextTick.Multiply(1.5);
}
if (timeSinceLastTick <= exceptedTimeWithSlack) return true;

// did not tick in the last period.
var logLevel = freezeCheck ? LogLevel.Error : LogLevel.Warning;
if (logger.IsEnabled(logLevel))
{
var Title = freezeCheck ? "Watchdog Freeze Alert: " : "";
logger.Log(
logLevel,
(int)errorCode, "{Title}{Name} did not fire on time. Last fired at {LastFired}, {TimeSinceLastTick} since previous fire, should have fired after {ExpectedTimeToNextTick}.",
Title,
getName?.Invoke() ?? "",
LogFormatter.PrintDate(previousTickTime),
timeSinceLastTick,
expectedTimeToNextTick);
}

return false;
}

private string GetDiagnosticName() => $"GrainTimer TimerCallbackHandler:{_callback?.Target}->{_callback?.Method}";

public void Dispose()
Expand Down
Loading

0 comments on commit 47bfa8b

Please sign in to comment.