Skip to content

Commit

Permalink
Try to limit forwarding when a grain activation throws an exception i…
Browse files Browse the repository at this point in the history
…n OnActivateAsync() (dotnet#6891)
  • Loading branch information
benjaminpetit committed Jan 15, 2021
1 parent 404edb8 commit c7618f3
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 7 deletions.
25 changes: 25 additions & 0 deletions src/Orleans.Core/Utils/LRU.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,31 @@ public TValue Get(TKey key)
return value;
}

/// <summary>
/// Remove all expired value from the LRU instance.
/// </summary>
public void RemoveExpired()
{
var now = DateTime.UtcNow;
var toRemove = new List<TKey>();
foreach (var entry in this.cache)
{
var age = DateTime.UtcNow.Subtract(entry.Value.WhenLoaded);
if (age > requiredFreshness)
{
toRemove.Add(entry.Key);
}
}
foreach (var key in toRemove)
{
if (cache.TryRemove(key, out var result) && RaiseFlushEvent != null)
{
var args = new FlushEventArgs(key, result.Value);
RaiseFlushEvent(this, args);
}
}
}

private void AdjustSize()
{
while (cache.Count >= MaximumSize)
Expand Down
10 changes: 10 additions & 0 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public override void GetObjectData(SerializationInfo info, StreamingContext cont
private readonly ILocalGrainDirectory directory;
private readonly OrleansTaskScheduler scheduler;
private readonly ActivationDirectory activations;
private readonly LRU<ActivationAddress, Exception> failedActivations = new LRU<ActivationAddress, Exception>(1000, TimeSpan.FromSeconds(5), null);
private IStreamProviderRuntime providerRuntime;
private IServiceProvider serviceProvider;
private readonly ILogger logger;
Expand Down Expand Up @@ -267,6 +268,8 @@ private async Task CollectActivationsImpl(bool scanStale, TimeSpan ageLimit = de
var number = Interlocked.Increment(ref collectionNumber);
long memBefore = GC.GetTotalMemory(false) / (1024 * 1024);

failedActivations.RemoveExpired();

if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug(
Expand Down Expand Up @@ -558,6 +561,12 @@ public void GetGrainTypeInfo(int typeCode, out string grainClass, out PlacementS
// Did not find and did not start placing new
if (result == null)
{
if (failedActivations.TryGetValue(address, out var ex))
{
logger.Warn(ErrorCode.Catalog_ActivationException, "Call to an activation that failed during OnActivateAsync()");
throw ex;
}

var msg = String.Format("Non-existent activation: {0}, grain type: {1}.",
address.ToFullString(), grainType);
if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.CatalogNonExistingActivation2, msg);
Expand Down Expand Up @@ -703,6 +712,7 @@ private async Task InitActivation(ActivationData activation, Dictionary<string,
UnregisterMessageTarget(activation);
if (initStage == ActivationInitializationStage.InvokeActivate)
{
failedActivations.Add(activation.Address, exception);
activation.SetState(ActivationState.FailedToActivate);
logger.Warn(ErrorCode.Catalog_Failed_InvokeActivate, string.Format("Failed to InvokeActivate for {0}.", activation), exception);
// Reject all of the messages queued for this activation.
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Core/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void ReceiveMessage(Message message)
{
var str = $"Error creating activation for {message.NewGrainType}. Message {message}";
logger.Error(ErrorCode.Dispatcher_ErrorCreatingActivation, str, ex);
throw new OrleansException(str, ex);
throw;
}

if (nea.IsStatelessWorker)
Expand Down
29 changes: 29 additions & 0 deletions test/DefaultCluster.Tests/BasicActivationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,35 @@ public async Task BasicActivation_Fail()
if (!failed) Assert.True(false, "Should have failed, but instead returned " + key);
}

[Fact, TestCategory("BVT"), TestCategory("ActivateDeactivate"), TestCategory("ErrorHandling"), TestCategory("GetGrain")]
public async Task BasicActivation_BurstFail()
{
bool failed;
long key = 0;
var tasks = new List<Task>();
try
{
// Key values of -2 are not allowed in this case
var fail = this.GrainFactory.GetGrain<ITestGrainLongOnActivateAsync>(-2);
for (int i = 0; i < 10000; i++)
{
tasks.Add(fail.GetKey());
}
failed = false;
await Task.WhenAll(tasks);
}
catch (Exception)
{
failed = true;
foreach (var t in tasks)
{
Assert.Equal(typeof(ArgumentException), t.Exception.InnerException.GetType());
}
}

if (!failed) Assert.True(false, "Should have failed, but instead returned " + key);
}

[Fact, TestCategory("BVT"), TestCategory("ActivateDeactivate"), TestCategory("GetGrain")]
public void BasicActivation_ULong_MaxValue()
{
Expand Down
4 changes: 2 additions & 2 deletions test/DefaultCluster.Tests/GenericGrainTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public async Task Generic_1Argument_NonGenericCallFirst()

var id = Guid.NewGuid();
var nonGenericFacet = this.GrainFactory.GetGrain<INonGenericBase>(id, "UnitTests.Grains.Generic1ArgumentGrain");
await Assert.ThrowsAsync<OrleansException>(async () =>
await Assert.ThrowsAsync<ArgumentException>(async () =>
{
try
{
Expand All @@ -590,7 +590,7 @@ public async Task Generic_1Argument_GenericCallFirst()
var s2 = await grain.Ping(s1);
Assert.Equal(s1, s2);
var nonGenericFacet = this.GrainFactory.GetGrain<INonGenericBase>(id, "UnitTests.Grains.Generic1ArgumentGrain");
await Assert.ThrowsAsync<OrleansException>(async () =>
await Assert.ThrowsAsync<ArgumentException>(async () =>
{
try
{
Expand Down
3 changes: 2 additions & 1 deletion test/DefaultCluster.Tests/StatelessWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public async Task StatelessWorkerThrowExceptionConstructor()

for (int i=0; i<100; i++)
{
await Assert.ThrowsAsync<OrleansException>(() => grain.Ping());
var ex = await Assert.ThrowsAsync<Exception>(() => grain.Ping());
Assert.Equal("oops", ex.Message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Xunit;
using System.Linq;
using Orleans.Hosting;
using System;

namespace DependencyInjection.Tests
{
Expand Down Expand Up @@ -156,9 +157,8 @@ public async Task CanResolveSingletonGrainFactory()
public async Task CannotGetExplictlyRegisteredGrain()
{
ISimpleDIGrain grain = this.fixture.GrainFactory.GetGrain<ISimpleDIGrain>(GetRandomGrainId(), grainClassNamePrefix: "UnitTests.Grains.ExplicitlyRegistered");
var exception = await Assert.ThrowsAsync<OrleansException>(() => grain.GetLongValue());
Assert.Contains("Error creating activation for", exception.Message);
Assert.Contains(nameof(ExplicitlyRegisteredSimpleDIGrain), exception.Message);
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => grain.GetLongValue());
Assert.Contains("Unable to resolve service for type 'System.String' while attempting to activate 'UnitTests.Grains.ExplicitlyRegisteredSimpleDIGrain'", exception.Message);
}

[Fact]
Expand Down
5 changes: 5 additions & 0 deletions test/Grains/TestGrainInterfaces/ITestGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public interface ITestGrain : IGrainWithIntegerKey
Task DoLongAction(TimeSpan timespan, string str);
}

public interface ITestGrainLongOnActivateAsync : IGrainWithIntegerKey
{
Task<long> GetKey();
}

public interface IGuidTestGrain : IGrainWithGuidKey
{
// duplicate to verify identity
Expand Down
22 changes: 22 additions & 0 deletions test/Grains/TestInternalGrains/TestGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,28 @@ public Task<List<IGrain>> GetMultipleGrainInterfaces_List()
}
}

public class TestGrainLongActivateAsync : Grain, ITestGrainLongOnActivateAsync
{
public TestGrainLongActivateAsync()
{
}

public override async Task OnActivateAsync()
{
await Task.Delay(TimeSpan.FromSeconds(3));

if (this.GetPrimaryKeyLong() == -2)
throw new ArgumentException("Primary key cannot be -2 for this test case");

await base.OnActivateAsync();
}

public Task<long> GetKey()
{
return Task.FromResult(this.GetPrimaryKeyLong());
}
}

internal class GuidTestGrain : Grain, IGuidTestGrain
{
private string label;
Expand Down
34 changes: 34 additions & 0 deletions test/NonSilo.Tests/General/LruTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Orleans.Runtime;
using Xunit;

Expand Down Expand Up @@ -89,5 +90,38 @@ public void LruUsageTest()
Assert.True(target.ContainsKey(s), "Recently used item " + s + " was incorrectly expelled");
}
}

[Fact, TestCategory("BVT"), TestCategory("LRU")]
public async Task LruRemoveExpired()
{
const int n = 10;
const int maxSize = n*2;
var maxAge = TimeSpan.FromMilliseconds(500);
LRU<string, string>.FetchValueDelegate f = null;
var flushCounter = 0;

var target = new LRU<string, string>(maxSize, maxAge, f);
target.RaiseFlushEvent += (object o, LRU<string, string>.FlushEventArgs args) => flushCounter++;

for (int i = 0; i < n; i++)
{
var s = i.ToString();
target.Add(s, $"item {s}");
}

target.RemoveExpired();
Assert.Equal(0, flushCounter);
Assert.Equal(n, target.Count);

await Task.Delay(maxAge.Add(maxAge));

target.Add("expected", "value");
target.RemoveExpired();

Assert.Equal(n, flushCounter);
Assert.Equal(1, target.Count);
Assert.True(target.TryGetValue("expected", out var value));
Assert.Equal("value", value);
}
}
}

0 comments on commit c7618f3

Please sign in to comment.