From c7618f3db4beeb34137ee08ebebec19b43ac7460 Mon Sep 17 00:00:00 2001 From: Benjamin Petit Date: Fri, 15 Jan 2021 12:38:23 -0800 Subject: [PATCH] Try to limit forwarding when a grain activation throws an exception in OnActivateAsync() (#6891) --- src/Orleans.Core/Utils/LRU.cs | 25 ++++++++++++++ src/Orleans.Runtime/Catalog/Catalog.cs | 10 ++++++ src/Orleans.Runtime/Core/Dispatcher.cs | 2 +- .../BasicActivationTests.cs | 29 ++++++++++++++++ .../DefaultCluster.Tests/GenericGrainTests.cs | 4 +-- .../StatelessWorkerTests.cs | 3 +- .../DependencyInjectionGrainTestsRunner.cs | 6 ++-- test/Grains/TestGrainInterfaces/ITestGrain.cs | 5 +++ test/Grains/TestInternalGrains/TestGrain.cs | 22 ++++++++++++ test/NonSilo.Tests/General/LruTest.cs | 34 +++++++++++++++++++ 10 files changed, 133 insertions(+), 7 deletions(-) diff --git a/src/Orleans.Core/Utils/LRU.cs b/src/Orleans.Core/Utils/LRU.cs index 09c207459a..71e6c894a1 100644 --- a/src/Orleans.Core/Utils/LRU.cs +++ b/src/Orleans.Core/Utils/LRU.cs @@ -157,6 +157,31 @@ public TValue Get(TKey key) return value; } + /// + /// Remove all expired value from the LRU instance. + /// + public void RemoveExpired() + { + var now = DateTime.UtcNow; + var toRemove = new List(); + foreach (var entry in this.cache) + { + var age = DateTime.UtcNow.Subtract(entry.Value.WhenLoaded); + if (age > requiredFreshness) + { + toRemove.Add(entry.Key); + } + } + foreach (var key in toRemove) + { + if (cache.TryRemove(key, out var result) && RaiseFlushEvent != null) + { + var args = new FlushEventArgs(key, result.Value); + RaiseFlushEvent(this, args); + } + } + } + private void AdjustSize() { while (cache.Count >= MaximumSize) diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index d885fffb25..2296baded6 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -80,6 +80,7 @@ public override void GetObjectData(SerializationInfo info, StreamingContext cont private readonly ILocalGrainDirectory directory; private readonly OrleansTaskScheduler scheduler; private readonly ActivationDirectory activations; + private readonly LRU failedActivations = new LRU(1000, TimeSpan.FromSeconds(5), null); private IStreamProviderRuntime providerRuntime; private IServiceProvider serviceProvider; private readonly ILogger logger; @@ -267,6 +268,8 @@ private async Task CollectActivationsImpl(bool scanStale, TimeSpan ageLimit = de var number = Interlocked.Increment(ref collectionNumber); long memBefore = GC.GetTotalMemory(false) / (1024 * 1024); + failedActivations.RemoveExpired(); + if (logger.IsEnabled(LogLevel.Debug)) { logger.LogDebug( @@ -558,6 +561,12 @@ public void GetGrainTypeInfo(int typeCode, out string grainClass, out PlacementS // Did not find and did not start placing new if (result == null) { + if (failedActivations.TryGetValue(address, out var ex)) + { + logger.Warn(ErrorCode.Catalog_ActivationException, "Call to an activation that failed during OnActivateAsync()"); + throw ex; + } + var msg = String.Format("Non-existent activation: {0}, grain type: {1}.", address.ToFullString(), grainType); if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.CatalogNonExistingActivation2, msg); @@ -703,6 +712,7 @@ private async Task InitActivation(ActivationData activation, Dictionary(); + try + { + // Key values of -2 are not allowed in this case + var fail = this.GrainFactory.GetGrain(-2); + for (int i = 0; i < 10000; i++) + { + tasks.Add(fail.GetKey()); + } + failed = false; + await Task.WhenAll(tasks); + } + catch (Exception) + { + failed = true; + foreach (var t in tasks) + { + Assert.Equal(typeof(ArgumentException), t.Exception.InnerException.GetType()); + } + } + + if (!failed) Assert.True(false, "Should have failed, but instead returned " + key); + } + [Fact, TestCategory("BVT"), TestCategory("ActivateDeactivate"), TestCategory("GetGrain")] public void BasicActivation_ULong_MaxValue() { diff --git a/test/DefaultCluster.Tests/GenericGrainTests.cs b/test/DefaultCluster.Tests/GenericGrainTests.cs index ee5197376b..566da0ee49 100644 --- a/test/DefaultCluster.Tests/GenericGrainTests.cs +++ b/test/DefaultCluster.Tests/GenericGrainTests.cs @@ -568,7 +568,7 @@ public async Task Generic_1Argument_NonGenericCallFirst() var id = Guid.NewGuid(); var nonGenericFacet = this.GrainFactory.GetGrain(id, "UnitTests.Grains.Generic1ArgumentGrain"); - await Assert.ThrowsAsync(async () => + await Assert.ThrowsAsync(async () => { try { @@ -590,7 +590,7 @@ public async Task Generic_1Argument_GenericCallFirst() var s2 = await grain.Ping(s1); Assert.Equal(s1, s2); var nonGenericFacet = this.GrainFactory.GetGrain(id, "UnitTests.Grains.Generic1ArgumentGrain"); - await Assert.ThrowsAsync(async () => + await Assert.ThrowsAsync(async () => { try { diff --git a/test/DefaultCluster.Tests/StatelessWorkerTests.cs b/test/DefaultCluster.Tests/StatelessWorkerTests.cs index 9ab4af3e0f..7bbfc65107 100644 --- a/test/DefaultCluster.Tests/StatelessWorkerTests.cs +++ b/test/DefaultCluster.Tests/StatelessWorkerTests.cs @@ -34,7 +34,8 @@ public async Task StatelessWorkerThrowExceptionConstructor() for (int i=0; i<100; i++) { - await Assert.ThrowsAsync(() => grain.Ping()); + var ex = await Assert.ThrowsAsync(() => grain.Ping()); + Assert.Equal("oops", ex.Message); } } diff --git a/test/DependencyInjection.Tests/DependencyInjectionGrainTestsRunner.cs b/test/DependencyInjection.Tests/DependencyInjectionGrainTestsRunner.cs index 4df2c9e7d7..a9596be4b6 100644 --- a/test/DependencyInjection.Tests/DependencyInjectionGrainTestsRunner.cs +++ b/test/DependencyInjection.Tests/DependencyInjectionGrainTestsRunner.cs @@ -8,6 +8,7 @@ using Xunit; using System.Linq; using Orleans.Hosting; +using System; namespace DependencyInjection.Tests { @@ -156,9 +157,8 @@ public async Task CanResolveSingletonGrainFactory() public async Task CannotGetExplictlyRegisteredGrain() { ISimpleDIGrain grain = this.fixture.GrainFactory.GetGrain(GetRandomGrainId(), grainClassNamePrefix: "UnitTests.Grains.ExplicitlyRegistered"); - var exception = await Assert.ThrowsAsync(() => grain.GetLongValue()); - Assert.Contains("Error creating activation for", exception.Message); - Assert.Contains(nameof(ExplicitlyRegisteredSimpleDIGrain), exception.Message); + var exception = await Assert.ThrowsAsync(() => grain.GetLongValue()); + Assert.Contains("Unable to resolve service for type 'System.String' while attempting to activate 'UnitTests.Grains.ExplicitlyRegisteredSimpleDIGrain'", exception.Message); } [Fact] diff --git a/test/Grains/TestGrainInterfaces/ITestGrain.cs b/test/Grains/TestGrainInterfaces/ITestGrain.cs index c138c00543..1b7ae52379 100644 --- a/test/Grains/TestGrainInterfaces/ITestGrain.cs +++ b/test/Grains/TestGrainInterfaces/ITestGrain.cs @@ -34,6 +34,11 @@ public interface ITestGrain : IGrainWithIntegerKey Task DoLongAction(TimeSpan timespan, string str); } + public interface ITestGrainLongOnActivateAsync : IGrainWithIntegerKey + { + Task GetKey(); + } + public interface IGuidTestGrain : IGrainWithGuidKey { // duplicate to verify identity diff --git a/test/Grains/TestInternalGrains/TestGrain.cs b/test/Grains/TestInternalGrains/TestGrain.cs index 0790691cf4..b8db330a67 100644 --- a/test/Grains/TestInternalGrains/TestGrain.cs +++ b/test/Grains/TestInternalGrains/TestGrain.cs @@ -134,6 +134,28 @@ public Task> GetMultipleGrainInterfaces_List() } } + public class TestGrainLongActivateAsync : Grain, ITestGrainLongOnActivateAsync + { + public TestGrainLongActivateAsync() + { + } + + public override async Task OnActivateAsync() + { + await Task.Delay(TimeSpan.FromSeconds(3)); + + if (this.GetPrimaryKeyLong() == -2) + throw new ArgumentException("Primary key cannot be -2 for this test case"); + + await base.OnActivateAsync(); + } + + public Task GetKey() + { + return Task.FromResult(this.GetPrimaryKeyLong()); + } + } + internal class GuidTestGrain : Grain, IGuidTestGrain { private string label; diff --git a/test/NonSilo.Tests/General/LruTest.cs b/test/NonSilo.Tests/General/LruTest.cs index 403bcdced6..9168fce547 100644 --- a/test/NonSilo.Tests/General/LruTest.cs +++ b/test/NonSilo.Tests/General/LruTest.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; using Orleans.Runtime; using Xunit; @@ -89,5 +90,38 @@ public void LruUsageTest() Assert.True(target.ContainsKey(s), "Recently used item " + s + " was incorrectly expelled"); } } + + [Fact, TestCategory("BVT"), TestCategory("LRU")] + public async Task LruRemoveExpired() + { + const int n = 10; + const int maxSize = n*2; + var maxAge = TimeSpan.FromMilliseconds(500); + LRU.FetchValueDelegate f = null; + var flushCounter = 0; + + var target = new LRU(maxSize, maxAge, f); + target.RaiseFlushEvent += (object o, LRU.FlushEventArgs args) => flushCounter++; + + for (int i = 0; i < n; i++) + { + var s = i.ToString(); + target.Add(s, $"item {s}"); + } + + target.RemoveExpired(); + Assert.Equal(0, flushCounter); + Assert.Equal(n, target.Count); + + await Task.Delay(maxAge.Add(maxAge)); + + target.Add("expected", "value"); + target.RemoveExpired(); + + Assert.Equal(n, flushCounter); + Assert.Equal(1, target.Count); + Assert.True(target.TryGetValue("expected", out var value)); + Assert.Equal("value", value); + } } }