Skip to content

Commit

Permalink
Implement TimerCreationOptions.KeepAlive
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed May 12, 2024
1 parent b6a45d5 commit 0ec6d0a
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 74 deletions.
81 changes: 37 additions & 44 deletions src/Orleans.Core/Async/MultiTaskCompletionSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,53 @@
using System.Threading;
using System.Threading.Tasks;

namespace Orleans
namespace Orleans;

/// <summary>
/// An alternative to <see cref="TaskCompletionSource{TResult}"/> which completes only once a specified number of signals have been received.
/// </summary>
internal sealed class MultiTaskCompletionSource
{
private readonly TaskCompletionSource _tcs;
private int _count;

/// <summary>
/// An alternative to <see cref="TaskCompletionSource{TResult}"/> which completes only once a specified number of signals have been received.
/// Initializes a new instance of the <see cref="MultiTaskCompletionSource"/> class.
/// </summary>
internal class MultiTaskCompletionSource
/// <param name="count">
/// The number of signals which must occur before this completion source completes.
/// </param>
/// <exception cref="ArgumentOutOfRangeException">
/// The count value is less than or equal to zero.
/// </exception>
public MultiTaskCompletionSource(int count)
{
private readonly TaskCompletionSource<bool> tcs;
private int count;
ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(count, 0);
_tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_count = count;
}

/// <summary>
/// Initializes a new instance of the <see cref="MultiTaskCompletionSource"/> class.
/// </summary>
/// <param name="count">
/// The number of signals which must occur before this completion source completes.
/// </param>
/// <exception cref="ArgumentOutOfRangeException">
/// The count value is less than or equal to zero.
/// </exception>
public MultiTaskCompletionSource(int count)
{
if (count <= 0)
{
throw new ArgumentOutOfRangeException(nameof(count), "count has to be positive.");
}
tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
this.count = count;
}
/// <summary>
/// Gets the task which is completed when a sufficient number of signals are received.
/// </summary>
public Task Task => _tcs.Task;

/// <summary>
/// Gets the task which is completed when a sufficient number of signals are received.
/// </summary>
public Task Task
/// <summary>
/// Signals this instance.
/// </summary>
/// <exception cref="InvalidOperationException">This method was called more times than the initially specified count argument allows.</exception>
public void SetOneResult()
{
var current = Interlocked.Decrement(ref _count);
if (current < 0)
{
get { return tcs.Task; }
throw new InvalidOperationException(
"SetOneResult was called more times than initially specified by the count argument.");
}

/// <summary>
/// Signals this instance.
/// </summary>
/// <exception cref="InvalidOperationException">This method was called more times than the initially specified count argument allows.</exception>
public void SetOneResult()
if (current == 0)
{
int current = Interlocked.Decrement(ref count);
if (current < 0)
{
throw new InvalidOperationException(
"SetOneResult was called more times than initially specified by the count argument.");
}

if (current == 0)
{
tcs.SetResult(true);
}
_tcs.SetResult();
}
}
}
20 changes: 11 additions & 9 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,23 +1238,25 @@ static async ValueTask OnCompleteAsync(ActivationData activation, Message messag
/// <summary>
/// Invoked when an activation has finished a transaction and may be ready for additional transactions
/// </summary>
/// <param name="message">The message that has just completed processing.
/// This will be <c>null</c> for the case of completion of Activate/Deactivate calls.</param>
/// <param name="message">The message that has just completed processing.</param>
private void OnCompletedRequest(Message message)
{
lock (this)
{
_runningRequests.Remove(message);

if (_runningRequests.Count == 0)
if (message.IsKeepAlive)
{
_idleDuration = CoarseStopwatch.StartNew();
}
if (_runningRequests.Count == 0)
{
_idleDuration = CoarseStopwatch.StartNew();
}

if (!_isInWorkingSet)
{
_isInWorkingSet = true;
_shared.InternalRuntime.ActivationWorkingSet.OnActive(this);
if (!_isInWorkingSet)
{
_isInWorkingSet = true;
_shared.InternalRuntime.ActivationWorkingSet.OnActive(this);
}
}

// The below logic only works for non-reentrant activations
Expand Down
15 changes: 9 additions & 6 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,16 @@ public void UnregisterSystemTarget(ISystemTarget target)
public int ActivationCount { get { return activations.Count; } }

/// <summary>
/// If activation already exists, use it
/// Otherwise, create an activation of an existing grain by reading its state.
/// Return immediately using a dummy that will queue messages.
/// Concurrently start creating and initializing the real activation and replace it when it is ready.
/// If activation already exists, return it.
/// Otherwise, creates a new activation, begins rehydrating it and activating it, then returns it.
/// </summary>
/// <param name="grainId">The grain identity</param>
/// <param name="requestContextData">Request context data.</param>
/// <remarks>
/// There is no guarantee about the validity of the activation which is returned.
/// Activations are responsible for handling any messages which they receive.
/// </remarks>
/// <param name="grainId">The grain identity.</param>
/// <param name="requestContextData">Optional request context data.</param>
/// <param name="rehydrationContext">Optional rehydration context.</param>
/// <returns></returns>
public IGrainContext GetOrCreateActivation(
in GrainId grainId,
Expand Down
18 changes: 11 additions & 7 deletions src/Orleans.Runtime/Placement/PlacementService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -84,7 +85,7 @@ public Task AddressMessage(Message message)

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Placing grain {GrainId} for message {Message}", grainId, message);
_logger.LogDebug("Looking up address for grain {GrainId} for message {Message}", grainId, message);
}

var worker = _workers[grainId.GetUniformHashCode() % PlacementWorkerCount];
Expand Down Expand Up @@ -275,11 +276,7 @@ private async Task ProcessLoop()
foreach (var message in messages)
{
var target = message.Message.TargetGrain;
if (!_inProgress.TryGetValue(target, out var workItem))
{
_inProgress[target] = workItem = new();
}

var workItem = GetOrAddWorkItem(target);
workItem.Messages.Add(message);
if (workItem.Result is null)
{
Expand Down Expand Up @@ -308,11 +305,18 @@ private async Task ProcessLoop()
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Exception in placement worker");
_logger.LogWarning(exception, "Error in placement worker.");
}

await _workSignal.WaitAsync();
}

GrainPlacementWorkItem GetOrAddWorkItem(GrainId target)
{
ref var workItem = ref CollectionsMarshal.GetValueRefOrAddDefault(_inProgress, target, out _);
workItem ??= new();
return workItem;
}
}

private void AddressWaitingMessages(GrainPlacementWorkItem completedWorkItem)
Expand Down
6 changes: 2 additions & 4 deletions src/Orleans.Runtime/Timers/GrainTimer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,8 @@ protected void ScheduleTickOnActivation()
msg.SendingSilo = _shared.LocalSiloDetails.SiloAddress;
msg.TargetSilo = _shared.LocalSiloDetails.SiloAddress;
msg.InterfaceType = TimerInterfaceType;
if (_interleave)
{
msg.IsAlwaysInterleave = true;
}
msg.IsKeepAlive = _keepAlive;
msg.IsAlwaysInterleave = _interleave;

// Prevent the message from being forwarded in the case of deactivation.
msg.IsLocalOnly = true;
Expand Down
1 change: 0 additions & 1 deletion test/DefaultCluster.Tests/TimerOrleansTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Diagnostics;
using ProtoBuf.WellKnownTypes;
using TestExtensions;
using UnitTests.GrainInterfaces;
using Xunit;
Expand Down
2 changes: 1 addition & 1 deletion test/Grains/TestGrainInterfaces/ITimerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface INonReentrantTimerCallGrain : IGrainWithIntegerKey
Task<int> GetTickCount();
Task<Exception> GetException();

Task StartTimer(string name, TimeSpan delay);
Task StartTimer(string name, TimeSpan delay, bool keepAlive = true);
Task StopTimer(string name);
Task ExternalTick(string name);
}
Expand Down
4 changes: 2 additions & 2 deletions test/Grains/TestInternalGrains/TimerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public override Task OnActivateAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public Task StartTimer(string name, TimeSpan delay)
public Task StartTimer(string name, TimeSpan delay, bool keepAlive)
{
_logger.LogInformation("StartTimer Name={Name} Delay={Delay}", name, delay);
if (_timers.TryGetValue(name, out var timer))
Expand All @@ -364,7 +364,7 @@ public Task StartTimer(string name, TimeSpan delay)
}
else
{
_timers[name] = base.RegisterTimer(TimerTick, name, new() { DueTime = delay }); // One shot timer
_timers[name] = RegisterTimer(TimerTick, name, new() { DueTime = delay, KeepAlive = keepAlive }); // One shot timer
}

return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Orleans.TestingHost;
using Tester;
using TestExtensions;
using UnitTestGrains;
using UnitTests.GrainInterfaces;
using UnitTests.Grains;
using Xunit;
Expand Down Expand Up @@ -574,5 +575,25 @@ public async Task ActivationCollectorShouldCollectByCollectionSpecificAgeLimitFo
int activationsNotCollected = await TestUtils.GetActivationCount(this.testCluster.GrainFactory, fullGrainTypeName);
Assert.Equal(0, activationsNotCollected);
}

[Fact, TestCategory("SlowBVT"), TestCategory("Timers")]
public async Task NonReentrantGrainTimer_NoKeepAlive_Test()
{
await Initialize(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1));

const string testName = "NonReentrantGrainTimer_NoKeepAlive_Test";

var grain = this.testCluster.GrainFactory.GetGrain<INonReentrantTimerCallGrain>(GetRandomGrainId());

// Schedule a timer to fire at the 30s mark which will not extend the grain's lifetime.
await grain.StartTimer(testName, TimeSpan.FromSeconds(4), keepAlive: false);
await Task.Delay(TimeSpan.FromSeconds(7));

var tickCount = await grain.GetTickCount();

// The grain should have been deactivated.
Assert.Equal(0, tickCount);
}

}
}

0 comments on commit 0ec6d0a

Please sign in to comment.