Skip to content

Commit

Permalink
Non-reentrant timers
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Apr 21, 2024
1 parent a9551f1 commit a7f2806
Show file tree
Hide file tree
Showing 12 changed files with 780 additions and 190 deletions.
7 changes: 5 additions & 2 deletions src/Orleans.Core.Abstractions/Core/Grain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,18 @@ protected Grain(IGrainContext grainContext, IGrainRuntime? grainRuntime = null)
/// <param name="state">State object that will be passed as argument when calling the asyncCallback.</param>
/// <param name="dueTime">Due time for first timer tick.</param>
/// <param name="period">Period of subsequent timer ticks.</param>
/// <param name="reentrant">
/// Whether the timer callback can be interleaved with grain calls. Defaults to <see langword="true"/>.
/// </param>
/// <returns>Handle for this Timer.</returns>
/// <seealso cref="IDisposable"/>
protected IGrainTimer RegisterTimer(Func<object?, Task> asyncCallback, object? state, TimeSpan dueTime, TimeSpan period)
protected IGrainTimer RegisterTimer(Func<object?, Task> asyncCallback, object? state, TimeSpan dueTime, TimeSpan period, bool reentrant = true)
{
if (asyncCallback == null)
throw new ArgumentNullException(nameof(asyncCallback));

EnsureRuntime();
return Runtime.TimerRegistry.RegisterTimer(GrainContext ?? RuntimeContext.Current, asyncCallback, state, dueTime, period);
return Runtime.TimerRegistry.RegisterTimer(GrainContext ?? RuntimeContext.Current, asyncCallback, state, dueTime, period, reentrant);
}

/// <summary>
Expand Down
22 changes: 0 additions & 22 deletions src/Orleans.Core.Abstractions/Core/IGrainContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,28 +179,6 @@ internal interface ICollectibleGrainContext : IGrainContext
void DelayDeactivation(TimeSpan timeSpan);
}

/// <summary>
/// Provides functionality to record the creation and deletion of grain timers.
/// </summary>
internal interface IGrainTimerRegistry
{
/// <summary>
/// Signals to the registry that a timer was created.
/// </summary>
/// <param name="timer">
/// The timer.
/// </param>
void OnTimerCreated(IGrainTimer timer);

/// <summary>
/// Signals to the registry that a timer was disposed.
/// </summary>
/// <param name="timer">
/// The timer.
/// </param>
void OnTimerDisposed(IGrainTimer timer);
}

/// <summary>
/// Functionality to schedule tasks on a grain.
/// </summary>
Expand Down
5 changes: 4 additions & 1 deletion src/Orleans.Core.Abstractions/Timers/ITimerRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ public interface ITimerRegistry
/// The time interval between invocations of <paramref name="callback"/>.
/// Specify <see cref="System.Threading.Timeout.InfiniteTimeSpan"/> to disable periodic signaling.
/// </param>
/// <param name="reentrant">
/// Whether the timer callback can be interleaved with grain calls. Defaults to <see langword="true"/>.
/// </param>
/// <returns>
/// An <see cref="IGrainTimer"/> instance which represents the timer.
/// </returns>
IGrainTimer RegisterTimer(IGrainContext grainContext, Func<object?, Task> callback, object? state, TimeSpan dueTime, TimeSpan period);
IGrainTimer RegisterTimer(IGrainContext grainContext, Func<object?, Task> callback, object? state, TimeSpan dueTime, TimeSpan period, bool reentrant = true);
}
8 changes: 7 additions & 1 deletion src/Orleans.Core/Messaging/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@

namespace Orleans.Runtime
{
internal interface IMessage
{
bool IsReadOnly { get; }
bool IsAlwaysInterleave { get; }
}

[Id(101)]
internal sealed class Message : ISpanFormattable
internal sealed class Message : IMessage, ISpanFormattable
{
public const int LENGTH_HEADER_SIZE = 8;
public const int LENGTH_META_HEADER = 4;
Expand Down
126 changes: 126 additions & 0 deletions src/Orleans.Runtime/Catalog/ActivationData.Diagnostics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
using System;
using System.Collections.Generic;
using Orleans.Configuration;

namespace Orleans.Runtime;

internal partial class ActivationData
{
public void AnalyzeWorkload(DateTime now, IMessageCenter messageCenter, MessageFactory messageFactory, SiloMessagingOptions options)
{
var slowRunningRequestDuration = options.RequestProcessingWarningTime;
var longQueueTimeDuration = options.RequestQueueDelayWarningTime;

List<string> diagnostics = null;
lock (this)
{
if (State != ActivationState.Valid)
{
return;
}

if (_blockingRequest is Message request)
{
TimeSpan? timeSinceQueued = default;
if (_runningRequests.TryGetValue(request, out var waitTime))
{
timeSinceQueued = waitTime.Elapsed;
}

var executionTime = _busyDuration.Elapsed;
if (executionTime >= slowRunningRequestDuration)
{
GetStatusList(ref diagnostics);
if (timeSinceQueued.HasValue)
{
diagnostics.Add($"Message {request} was enqueued {timeSinceQueued} ago and has now been executing for {executionTime}.");
}
else
{
diagnostics.Add($"Message {request} has been executing for {executionTime}.");
}

if (request is Message msg)
{
var response = messageFactory.CreateDiagnosticResponseMessage(msg, isExecuting: true, isWaiting: false, diagnostics);
messageCenter.SendMessage(response);
}
}
}
else if (_blockingRequest is GrainTimer timer)
{
/*
var executionTime = timer.GetElapsed();
if (executionTime >= slowRunningRequestDuration)
{
GetStatusList(ref diagnostics);
diagnostics.Add($"Timer {timer} has been executing for {executionTime}.");
var response = messageFactory.CreateDiagnosticResponseMessage(timer, isExecuting: true, isWaiting: false, diagnostics);
messageCenter.SendMessage(response);
}
*/
}

foreach (var running in _runningRequests)
{
var message = running.Key;
var runDuration = running.Value;
if (ReferenceEquals(message, _blockingRequest)) continue;

// Check how long they've been executing.
var executionTime = runDuration.Elapsed;
if (executionTime >= slowRunningRequestDuration)
{
// Interleaving message X has been executing for a long time
GetStatusList(ref diagnostics);
var messageDiagnostics = new List<string>(diagnostics)
{
$"Interleaving message {message} has been executing for {executionTime}."
};

if (message is Message msg)
{
var response = messageFactory.CreateDiagnosticResponseMessage(msg, isExecuting: true, isWaiting: false, messageDiagnostics);
messageCenter.SendMessage(response);
}
}
}

var queueLength = 1;
foreach (var pair in _waitingRequests)
{
var message = pair.Request;
var queuedTime = pair.QueuedTime.Elapsed;
if (queuedTime >= longQueueTimeDuration)
{
// Message X has been enqueued on the target grain for Y and is currently position QueueLength in queue for processing.
GetStatusList(ref diagnostics);
var messageDiagnostics = new List<string>(diagnostics)
{
$"Message {message} has been enqueued on the target grain for {queuedTime} and is currently position {queueLength} in queue for processing."
};

if (message is Message msg)
{
var response = messageFactory.CreateDiagnosticResponseMessage(msg, isExecuting: false, isWaiting: true, messageDiagnostics);
messageCenter.SendMessage(response);
}
}

queueLength++;
}
}

void GetStatusList(ref List<string> diagnostics)
{
if (diagnostics is not null) return;

diagnostics = new List<string>
{
ToDetailedString(),
$"TaskScheduler status: {_workItemGroup.DumpStatus()}"
};
}
}
}
Loading

0 comments on commit a7f2806

Please sign in to comment.