diff --git a/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj b/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj index 7afc366799e..b0a47ed4d82 100644 --- a/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj +++ b/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj @@ -256,6 +256,7 @@ + diff --git a/src/NServiceBus.Core.Tests/Timeout/RavenTimeoutPersisterTests.cs b/src/NServiceBus.Core.Tests/Timeout/RavenTimeoutPersisterTests.cs new file mode 100644 index 00000000000..d92d5238706 --- /dev/null +++ b/src/NServiceBus.Core.Tests/Timeout/RavenTimeoutPersisterTests.cs @@ -0,0 +1,260 @@ +namespace NServiceBus.Core.Tests.Timeout +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using NServiceBus.Persistence.Raven; + using NServiceBus.Persistence.Raven.TimeoutPersister; + using NServiceBus.Timeout.Core; + using NUnit.Framework; + using Raven.Client; + using Raven.Client.Document; + + [TestFixture] + public class RavenTimeoutPersisterTests + { + [TestCase, Repeat(200)] + public void Should_not_skip_timeouts() + { + var db = Guid.NewGuid().ToString(); + documentStore = new DocumentStore + { + Url = "http://localhost:8080", + DefaultDatabase = db, + }.Initialize(); + persister = new RavenTimeoutPersistence(new StoreAccessor(documentStore)) + { + TriggerCleanupEvery = TimeSpan.FromHours(1), // Make sure cleanup doesn't run automatically + }; + + var startSlice = DateTime.UtcNow.AddYears(-10); + // avoid cleanup from running during the test by making it register as being run + Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count()); + + var expected = new List>(); + var lastExpectedTimeout = DateTime.UtcNow; + var finishedAdding = false; + + new Thread(() => + { + var sagaId = Guid.NewGuid(); + for (var i = 0; i < 10000; i++) + { + var td = new TimeoutData + { + SagaId = sagaId, + Destination = new Address("queue", "machine"), + Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(5, 20)), + OwningTimeoutManager = string.Empty, + }; + persister.Add(td); + expected.Add(new Tuple(td.Id, td.Time)); + lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout; + } + finishedAdding = true; + Console.WriteLine("*** Finished adding ***"); + }).Start(); + + // Mimic the behavior of the TimeoutPersister coordinator + var found = 0; + TimeoutData tempTd; + while (!finishedAdding || startSlice < lastExpectedTimeout) + { + DateTime nextRetrieval; + var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval); + foreach (var timeoutData in timeoutDatas) + { + if (startSlice < timeoutData.Item2) + { + startSlice = timeoutData.Item2; + } + + Assert.IsTrue(persister.TryRemove(timeoutData.Item1, out tempTd)); + found++; + } + } + + WaitForIndexing(documentStore); + + // If the persister reports stale results have been seen at one point during its normal operation, + // we need to perform manual cleaup. + while (true) + { + var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray(); + Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length); + if (chunkToCleanup.Length == 0) break; + + found += chunkToCleanup.Length; + foreach (var tuple in chunkToCleanup) + { + Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd)); + } + + WaitForIndexing(documentStore); + } + + using (var session = documentStore.OpenSession()) + { + var results = session.Query().ToList(); + Assert.AreEqual(0, results.Count); + } + + Assert.AreEqual(expected.Count, found); + } + + [TestCase, Repeat(200)] + public void Should_not_skip_timeouts_also_with_multiple_clients_adding_timeouts() + { + var db = Guid.NewGuid().ToString(); + documentStore = new DocumentStore + { + Url = "http://localhost:8080", + DefaultDatabase = db, + }.Initialize(); + persister = new RavenTimeoutPersistence(new StoreAccessor(documentStore)) + { + TriggerCleanupEvery = TimeSpan.FromDays(1), // Make sure cleanup doesn't run automatically + }; + + var startSlice = DateTime.UtcNow.AddYears(-10); + // avoid cleanup from running during the test by making it register as being run + Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count()); + + const int insertsPerThread = 10000; + var expected1 = new List>(); + var expected2 = new List>(); + var lastExpectedTimeout = DateTime.UtcNow; + var finishedAdding1 = false; + var finishedAdding2 = false; + + new Thread(() => + { + var sagaId = Guid.NewGuid(); + for (var i = 0; i < insertsPerThread; i++) + { + var td = new TimeoutData + { + SagaId = sagaId, + Destination = new Address("queue", "machine"), + Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)), + OwningTimeoutManager = string.Empty, + }; + persister.Add(td); + expected1.Add(new Tuple(td.Id, td.Time)); + lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout; + } + finishedAdding1 = true; + Console.WriteLine("*** Finished adding ***"); + }).Start(); + + new Thread(() => + { + using (var store = new DocumentStore + { + Url = "http://localhost:8080", + DefaultDatabase = db, + }.Initialize()) + { + var persister2 = new RavenTimeoutPersistence(new StoreAccessor(store)); + + var sagaId = Guid.NewGuid(); + for (var i = 0; i < insertsPerThread; i++) + { + var td = new TimeoutData + { + SagaId = sagaId, + Destination = new Address("queue", "machine"), + Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)), + OwningTimeoutManager = string.Empty, + }; + persister2.Add(td); + expected2.Add(new Tuple(td.Id, td.Time)); + lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout; + } + } + finishedAdding2 = true; + Console.WriteLine("*** Finished adding via a second client connection ***"); + }).Start(); + + // Mimic the behavior of the TimeoutPersister coordinator + var found = 0; + TimeoutData tempTd; + while (!finishedAdding1 || !finishedAdding2 || startSlice < lastExpectedTimeout) + { + DateTime nextRetrieval; + var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval); + foreach (var timeoutData in timeoutDatas) + { + if (startSlice < timeoutData.Item2) + { + startSlice = timeoutData.Item2; + } + + Assert.IsTrue(persister.TryRemove(timeoutData.Item1, out tempTd)); // Raven returns duplicates, so we can't assert on this here + found++; + } + } + + WaitForIndexing(documentStore); + + // If the persister reports stale results have been seen at one point during its normal operation, + // we need to perform manual cleaup. + while (true) + { + var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray(); + Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length); + if (chunkToCleanup.Length == 0) break; + + found += chunkToCleanup.Length; + foreach (var tuple in chunkToCleanup) + { + Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd)); + } + + WaitForIndexing(documentStore); + } + + using (var session = documentStore.OpenSession()) + { + var results = session.Query().ToList(); + Assert.AreEqual(0, results.Count); + } + + Assert.AreEqual(expected1.Count + expected2.Count, found); + } + + IDocumentStore documentStore; + RavenTimeoutPersistence persister; + + [TearDown] + public void TearDown() + { + if (documentStore != null) + documentStore.Dispose(); + } + + static void WaitForIndexing(IDocumentStore store, string db = null, TimeSpan? timeout = null) + { + var databaseCommands = store.DatabaseCommands; + if (db != null) + databaseCommands = databaseCommands.ForDatabase(db); + var spinUntil = SpinWait.SpinUntil(() => databaseCommands.GetStatistics().StaleIndexes.Length == 0, timeout ?? TimeSpan.FromSeconds(20)); + Assert.True(spinUntil); + } + + static class RandomProvider + { + private static int seed = Environment.TickCount; + + private static ThreadLocal randomWrapper = new ThreadLocal(() => + new Random(Interlocked.Increment(ref seed)) + ); + + public static Random GetThreadRandom() + { + return randomWrapper.Value; + } + } + } +} diff --git a/src/NServiceBus.Core.Tests/Timeout/When_pooling_timeouts.cs b/src/NServiceBus.Core.Tests/Timeout/When_pooling_timeouts.cs index 3cdb3b47cc2..5375c456b9d 100644 --- a/src/NServiceBus.Core.Tests/Timeout/When_pooling_timeouts.cs +++ b/src/NServiceBus.Core.Tests/Timeout/When_pooling_timeouts.cs @@ -16,7 +16,6 @@ namespace NServiceBus.Core.Tests.Timeout using Raven.Client.Embedded; [TestFixture] - [Explicit] public class When_pooling_timeouts_with_raven : When_pooling_timeouts { private IDocumentStore store; @@ -37,10 +36,21 @@ public void Cleanup() { store.Dispose(); } + + [Test] + public void Should_retrieve_all_timeout_messages_that_expired_even_if_it_needs_to_page() + { + expected = 1024 + 5; + + Enumerable.Range(1, expected).ToList().ForEach(i => persister.Add(CreateData(DateTime.UtcNow.AddSeconds(-5)))); + + StartAndStopReceiver(5); + + WaitForMessagesThenAssert(5); + } } [TestFixture] - [Explicit] public class When_pooling_timeouts_with_inMemory : When_pooling_timeouts { protected override IPersistTimeouts CreateTimeoutPersister() @@ -54,7 +64,7 @@ public abstract class When_pooling_timeouts private IManageTimeouts manager; private FakeMessageSender messageSender; readonly Random rand = new Random(); - private int expected; + protected int expected; protected IPersistTimeouts persister; protected TimeoutPersisterReceiver receiver; @@ -187,14 +197,14 @@ private void Push(int total, DateTime time) Enumerable.Range(1, total).ToList().ForEach(i => manager.PushTimeout(CreateData(time))); } - private void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1) + protected void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1) { receiver.Start(); Thread.Sleep(TimeSpan.FromSeconds(secondsToWaitBeforeCallingStop)); receiver.Stop(); } - private static TimeoutData CreateData(DateTime time) + protected static TimeoutData CreateData(DateTime time) { return new TimeoutData { @@ -204,7 +214,7 @@ private static TimeoutData CreateData(DateTime time) }; } - private void WaitForMessagesThenAssert(int maxSecondsToWait) + protected void WaitForMessagesThenAssert(int maxSecondsToWait) { var maxTime = DateTime.Now.AddSeconds(maxSecondsToWait); @@ -216,4 +226,4 @@ private void WaitForMessagesThenAssert(int maxSecondsToWait) Assert.AreEqual(expected, messageSender.MessagesSent); } } -} \ No newline at end of file +} diff --git a/src/NServiceBus.Core/Persistence/Raven/TimeoutPersister/RavenTimeoutPersistence.cs b/src/NServiceBus.Core/Persistence/Raven/TimeoutPersister/RavenTimeoutPersistence.cs index 87afe2a787e..a4a59fa43ea 100644 --- a/src/NServiceBus.Core/Persistence/Raven/TimeoutPersister/RavenTimeoutPersistence.cs +++ b/src/NServiceBus.Core/Persistence/Raven/TimeoutPersister/RavenTimeoutPersistence.cs @@ -14,9 +14,47 @@ public class RavenTimeoutPersistence : IPersistTimeouts { readonly IDocumentStore store; - public RavenTimeoutPersistence(StoreAccessor storeAccessor) + public TimeSpan CleanupGapFromTimeslice { get; set; } + public TimeSpan TriggerCleanupEvery { get; set; } + DateTime lastCleanupTime = DateTime.MinValue; + + public RavenTimeoutPersistence(StoreAccessor storeAccessor) { store = storeAccessor.Store; + TriggerCleanupEvery = TimeSpan.FromMinutes(2); + CleanupGapFromTimeslice = TimeSpan.FromMinutes(1); + } + + private static IRavenQueryable GetChunkQuery(IDocumentSession session) + { + session.Advanced.AllowNonAuthoritativeInformation = true; + return session.Query() + .OrderBy(t => t.Time) + .Where( + t => + t.OwningTimeoutManager == String.Empty || + t.OwningTimeoutManager == Configure.EndpointName); + } + + public IEnumerable> GetCleanupChunk(DateTime startSlice) + { + using (var session = OpenSession()) + { + var chunk = GetChunkQuery(session) + .Where(t => t.Time <= startSlice.Subtract(CleanupGapFromTimeslice)) + .Select(t => new + { + t.Id, + t.Time + }) + .Take(1024) + .ToList() + .Select(arg => new Tuple(arg.Id, arg.Time)); + + lastCleanupTime = DateTime.UtcNow; + + return chunk; + } } public List> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery) @@ -24,28 +62,31 @@ public RavenTimeoutPersistence(StoreAccessor storeAccessor) try { var now = DateTime.UtcNow; - var skip = 0; var results = new List>(); + + // Allow for occasionally cleaning up old timeouts for edge cases where timeouts have been + // added after startSlice have been set to a later timout and we might have missed them + // because of stale indexes. + if (lastCleanupTime.Add(TriggerCleanupEvery) > now || lastCleanupTime == DateTime.MinValue) + { + results.AddRange(GetCleanupChunk(startSlice)); + } + + var skip = 0; var numberOfRequestsExecutedSoFar = 0; RavenQueryStatistics stats; - do { using (var session = OpenSession()) { session.Advanced.AllowNonAuthoritativeInformation = true; - var query = session.Query() + var query = GetChunkQuery(session) .Where( t => - t.OwningTimeoutManager == String.Empty || - t.OwningTimeoutManager == Configure.EndpointName) - .Where( - t => - t.Time > startSlice && + t.Time > startSlice && t.Time <= now) - .OrderBy(t => t.Time) - .Select(t => new {t.Id, t.Time}) + .Select(t => new { t.Id, t.Time }) .Statistics(out stats); do { @@ -61,33 +102,29 @@ public RavenTimeoutPersistence(StoreAccessor storeAccessor) } } while (skip < stats.TotalResults); - using (var session = OpenSession()) + // Set next execution to be now if we received stale results. + // Delay the next execution a bit if we results weren't stale and we got the full chunk. + if (stats.IsStale) { - session.Advanced.AllowNonAuthoritativeInformation = true; - - //Retrieve next time we need to run query - var startOfNextChunk = - session.Query() - .Where( - t => - t.OwningTimeoutManager == String.Empty || - t.OwningTimeoutManager == Configure.EndpointName) - .Where(t => t.Time > now) - .OrderBy(t => t.Time) - .Select(t => new {t.Id, t.Time}) - .FirstOrDefault(); - - if (startOfNextChunk != null) - { - nextTimeToRunQuery = startOfNextChunk.Time; - } - else + nextTimeToRunQuery = now; + } + else + { + using (var session = OpenSession()) { - nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10); + var beginningOfNextChunk = GetChunkQuery(session) + .Where(t => t.Time > now) + .Take(1) + .Select(t => t.Time) + .FirstOrDefault(); + + nextTimeToRunQuery = (beginningOfNextChunk == default(DateTime)) + ? DateTime.UtcNow.AddMinutes(10) + : beginningOfNextChunk.ToUniversalTime(); } - - return results; } + + return results; } catch (WebException ex) { @@ -114,9 +151,6 @@ public bool TryRemove(string timeoutId, out TimeoutData timeoutData) if (timeoutData == null) return false; - timeoutData.Time = DateTime.UtcNow.AddYears(-1); - session.SaveChanges(); - session.Delete(timeoutData); session.SaveChanges();