Skip to content

Commit

Permalink
WIP - consolidate Reentrant and Non-Reentrant timers
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed May 6, 2024
1 parent d43d20e commit bcfaa40
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 382 deletions.
11 changes: 9 additions & 2 deletions src/Orleans.Core.Abstractions/Timers/TimerCreationOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#nullable enable
using System;
using System.Threading;
using Orleans.Concurrency;

namespace Orleans.Runtime;

Expand Down Expand Up @@ -28,9 +29,15 @@ namespace Orleans.Runtime;
/// </summary>
/// <remarks>
/// If this value is <see langword="false"/>, the timer callback will be treated akin to a grain call. If the grain scheduling this timer is reentrant
/// (i.e., it has the <see cref="Concurrency.ReentrantAttribute"/> attributed applied to its implementation class), the timer callback will be allowed
/// (i.e., it has the <see cref="ReentrantAttribute"/> attributed applied to its implementation class), the timer callback will be allowed
/// to interleave with other grain calls and timers regardless of the value of this property.
/// If this value is <see langword="true"/>, the timer callback will be allowed to interleave with other timers and grain calls.
/// </remarks>
public bool Reentrant { get; init; }
public bool Interleave { get; init; }

/// <summary>
/// Gets a value indicating whether callbacks scheduled by this timer should keep the grain activation active. Defaults to <see langword="false"/>.
/// </summary>
public bool KeepAlive { get; init; }

}
6 changes: 2 additions & 4 deletions src/Orleans.Core/Messaging/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,8 @@ public GrainInterfaceType InterfaceType
}
}

public bool IsExpirableMessage(bool dropExpiredMessages)
public bool IsExpirableMessage()
{
if (!dropExpiredMessages) return false;

GrainId id = TargetGrain;
if (id.IsDefault) return false;

Expand Down Expand Up @@ -393,7 +391,7 @@ public ResponseTypes ResponseType
public void SetFlag(MessageFlags flag, bool value) => _fields = value switch
{
true => _fields | (uint)flag,
_ => _fields & ~(uint)flag,
false => _fields & ~(uint)flag,
};
}
}
Expand Down
40 changes: 27 additions & 13 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,19 @@ internal List<Message> DequeueAllWaitingRequests()
{
lock (this)
{
var tmp = _waitingRequests.Select(m => m.Item1).ToList();
var result = new List<Message>(_waitingRequests.Count);
foreach (var (message, _) in _waitingRequests)
{
if (message.IsLocalOnly)
{
continue;
}

result.Add(message);
}

_waitingRequests.Clear();
return tmp;
return result;
}
}

Expand Down Expand Up @@ -597,17 +607,17 @@ private Task WaitForAllTimersToFinish(CancellationToken cancellationToken)
var timerCopy = Timers.ToList(); // need to copy since OnTimerDisposed will change the timers set.
foreach (var timer in timerCopy)
{
if (timer is IAsyncDisposable asyncDisposable)
if (timer is IAsyncDisposable asyncDisposable)
{
var task = asyncDisposable.DisposeAsync();
if (!task.IsCompletedSuccessfully)
{
var task = asyncDisposable.DisposeAsync();
if (!task.IsCompletedSuccessfully)
{
tasks.Add(task.AsTask());
}
tasks.Add(task.AsTask());
}
else
{
timer.Dispose();
}
else
{
timer.Dispose();
}
}

Expand Down Expand Up @@ -936,7 +946,11 @@ void ProcessPendingRequests()
}
catch (Exception exception)
{
_shared.InternalRuntime.MessageCenter.RejectMessage(message, Message.RejectionTypes.Transient, exception);
if (!message.IsLocalOnly)
{
_shared.InternalRuntime.MessageCenter.RejectMessage(message, Message.RejectionTypes.Transient, exception);
}

_waitingRequests.RemoveAt(i);
continue;
}
Expand Down Expand Up @@ -1299,7 +1313,7 @@ private void ReceiveResponse(Message message)
private void ReceiveRequest(Message message)
{
var overloadException = CheckOverloaded();
if (overloadException != null)
if (overloadException != null && !message.IsLocalOnly)
{
MessagingProcessingInstruments.OnDispatcherMessageProcessedError(message);
_shared.InternalRuntime.MessageCenter.RejectMessage(message, Message.RejectionTypes.Overloaded, overloadException, "Target activation is overloaded " + this);
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Core/InsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private List<IIncomingGrainCallFilter> GrainCallFilters
sharedData = this.sharedCallbackData;
}

if (message.IsExpirableMessage(this.messagingOptions.DropExpiredMessages))
if (this.messagingOptions.DropExpiredMessages && message.IsExpirableMessage())
{
message.TimeToLive = request.GetDefaultResponseTimeout() ?? sharedData.ResponseTimeout;
}
Expand Down
18 changes: 5 additions & 13 deletions src/Orleans.Runtime/Core/SystemTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,32 +160,24 @@ internal void HandleResponse(Message response)
/// Registers a timer to send regular callbacks to this grain.
/// This timer will keep the current grain from being deactivated.
/// </summary>
/// <param name="asyncCallback">The timer callback, which will fire whenever the timer becomes due.</param>
/// <param name="callback">The timer callback, which will fire whenever the timer becomes due.</param>
/// <param name="state">The state object passed to the callback.</param>
/// <param name="dueTime">
/// The amount of time to delay before the <paramref name="asyncCallback"/> is invoked.
/// The amount of time to delay before the <paramref name="callback"/> is invoked.
/// Specify <see cref="System.Threading.Timeout.InfiniteTimeSpan"/> to prevent the timer from starting.
/// Specify <see cref="TimeSpan.Zero"/> to invoke the callback promptly.
/// </param>
/// <param name="period">
/// The time interval between invocations of <paramref name="asyncCallback"/>.
/// The time interval between invocations of <paramref name="callback"/>.
/// Specify <see cref="System.Threading.Timeout.InfiniteTimeSpan"/> to disable periodic signaling.
/// </param>
/// <returns>
/// An <see cref="IDisposable"/> object which will cancel the timer upon disposal.
/// </returns>
public IGrainTimer RegisterTimer(Func<object, Task> asyncCallback, object state, TimeSpan dueTime, TimeSpan period)
=> RegisterGrainTimer(asyncCallback, state, dueTime, period);

/// <summary>
/// Internal version of <see cref="RegisterTimer(Func{object, Task}, object?, TimeSpan, TimeSpan)"/> that returns the inner IGrainTimer
/// </summary>
internal IGrainTimer RegisterGrainTimer(Func<object, Task> asyncCallback, object state, TimeSpan dueTime, TimeSpan period)
public IGrainTimer RegisterTimer(Func<object, Task> callback, object state, TimeSpan dueTime, TimeSpan period)
{
var ctxt = RuntimeContext.Current;

var timer = new GrainTimer<object>(this, this.timerLogger, asyncCallback, state, RuntimeClient.TimeProvider);
timer.Change(dueTime, period);
var timer = this.ActivationServices.GetRequiredService<ITimerRegistry>().RegisterTimer(this, callback, state, dueTime, period);
return timer;
}

Expand Down
22 changes: 13 additions & 9 deletions src/Orleans.Runtime/Messaging/MessageCenter.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -131,6 +132,8 @@ public Action<Message> SniffIncomingMessage

public void SendMessage(Message msg)
{
Debug.Assert(!msg.IsLocalOnly);

// Note that if we identify or add other grains that are required for proper stopping, we will need to treat them as we do the membership table grain here.
if (IsBlockingApplicationMessages && !msg.IsSystemMessage && msg.Result is not Message.ResponseTypes.Rejection && !Constants.SystemMembershipTableType.Equals(msg.TargetGrain))
{
Expand Down Expand Up @@ -277,6 +280,8 @@ static async Task SendAsync(MessageCenter messageCenter, ValueTask<Connection> c

foreach (var message in messages)
{
Debug.Assert(!message.IsLocalOnly);

if (oldAddress != null)
{
message.AddToCacheInvalidationHeader(oldAddress, validAddress: validAddress);
Expand Down Expand Up @@ -305,14 +310,16 @@ static async Task SendAsync(MessageCenter messageCenter, ValueTask<Connection> c
}
}

internal void ProcessRequestToInvalidActivation(
private void ProcessRequestToInvalidActivation(
Message message,
GrainAddress oldAddress,
SiloAddress forwardingAddress,
string failedOperation,
Exception exc = null,
bool rejectMessages = false)
{
Debug.Assert(!message.IsLocalOnly);

// Just use this opportunity to invalidate local Cache Entry as well.
if (oldAddress != null)
{
Expand All @@ -339,13 +346,9 @@ static async Task SendAsync(MessageCenter messageCenter, ValueTask<Connection> c
}
}

internal void TryForwardRequest(Message message, GrainAddress oldAddress, GrainAddress destination, string failedOperation = null, Exception exc = null)
private void TryForwardRequest(Message message, GrainAddress oldAddress, GrainAddress destination, string failedOperation = null, Exception exc = null)
{
if (message.IsLocalOnly)
{
// Do nothing: the message cannot be forwarded since it is a local-only message.
return;
}
Debug.Assert(!message.IsLocalOnly);

bool forwardingSucceeded = false;
var forwardingAddress = destination?.SiloAddress;
Expand Down Expand Up @@ -401,7 +404,7 @@ internal void RerouteMessage(Message message)
ResendMessageImpl(message);
}

internal bool TryForwardMessage(Message message, SiloAddress forwardingAddress)
private bool TryForwardMessage(Message message, SiloAddress forwardingAddress)
{
if (!MayForward(message, this.messagingOptions)) return false;

Expand Down Expand Up @@ -436,7 +439,7 @@ private void ResendMessageImpl(Message message, SiloAddress forwardingAddress =
// (got here due to duplicate activation, outdated cache, silo is shutting down/overloaded, ...).
private static bool MayForward(Message message, SiloMessagingOptions messagingOptions)
{
return message.ForwardCount < messagingOptions.MaxForwardCount && !message.IsLocalOnly;
return message.ForwardCount < messagingOptions.MaxForwardCount;
}

/// <summary>
Expand Down Expand Up @@ -503,6 +506,7 @@ internal void SendResponse(Message request, Response response)

public void ReceiveMessage(Message msg)
{
Debug.Assert(!msg.IsLocalOnly);
try
{
this.messagingTrace.OnIncomingMessageAgentReceiveMessage(msg);
Expand Down
Loading

0 comments on commit bcfaa40

Please sign in to comment.