From f257bcda15122b16327787b79911bd8ee22c907c Mon Sep 17 00:00:00 2001 From: shaan1337 Date: Wed, 12 Apr 2023 12:56:38 +0400 Subject: [PATCH] Handle events that have been deleted from (now empty) chunks but not from the index Previously this would result in the scavenge stopping with an error like "Could not get TimeStamp range for chunk " But this is a possible state if the old scavenge had scavenged the chunks but not the index. This fix correctly handles this scenario so that the scavenge process can continue --- .../Scavenge/EmptyScavengedChunksTests.cs | 291 ++++++++++++++++++ .../Scavenge/Infrastructure/Scenario.cs | 39 +++ .../Infrastructure/StreamMetadatas.cs | 1 + .../Scavenge/SubsequentScavengeTests.cs | 8 +- .../Scavenging/Data/DiscardDecision.cs | 1 + .../Scavenging/Stages/Calculator.cs | 17 + .../Scavenging/Stages/EventCalculator.cs | 5 +- 7 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 src/EventStore.Core.XUnit.Tests/Scavenge/EmptyScavengedChunksTests.cs diff --git a/src/EventStore.Core.XUnit.Tests/Scavenge/EmptyScavengedChunksTests.cs b/src/EventStore.Core.XUnit.Tests/Scavenge/EmptyScavengedChunksTests.cs new file mode 100644 index 00000000000..976c894fee4 --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Scavenge/EmptyScavengedChunksTests.cs @@ -0,0 +1,291 @@ +using System; +using System.Threading.Tasks; +using EventStore.Core.Data; +using EventStore.Core.Tests; +using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; +using EventStore.Core.TransactionLog.Scavenging; +using EventStore.Core.XUnit.Tests.Scavenge.Sqlite; +using Xunit; +using static EventStore.Core.XUnit.Tests.Scavenge.StreamMetadatas; + +namespace EventStore.Core.XUnit.Tests.Scavenge; + +// the old scavenger can leave behind empty, scavenged chunks when +// all events from a chunk are deleted. if the scavenger is interrupted +// before the index scavenge phase, entries pointing to these +// empty chunks may be present in the index. +// +// note: it may also happen that the merge phase has completed and the empty chunks +// have been merged but these will be similar to the before-merge case as the +// new scavenger works on logical chunk numbers. + +public class EmptyScavengedChunksTests : SqliteDbPerTest { + // let's assume we have an index entry X pointing to an empty, scavenged chunk. + // call the stream for that index entry: S. + // + // there are the following cases: + // + // Case A. no scavenge data (metadata, tombstone) for S exists in the log: + // as usual: + // - no events of S will be deleted + // - X will stay in the index + // + // Case B. scavenge data (metadata, tombstone) for S exists in the log: + // - as usual, scavenge data will be accumulated and discard points calculated + // - X must or must not be deleted from the index, depending on the cases below: + // + // B.1: X is the first index entry for the stream + // delete X since all events prior to X are deleted and X doesn't exist in the log + // + // B.2: X is not the first index entry for the stream and the previous index entry is not another "X" + // + // B.2.1: based on scavenge data, the index entry just before X will be discarded for sure (i.e not 'maybe' discardable) + // delete X since all events prior to X will be deleted and X doesn't exist in the log + // + // B.2.2: based on scavenge data, the index entry just before X is 'maybe' discardable or will be kept for sure + // keep X since not all events prior to X may be deleted from the log + // + // B.3: X is not the first index entry for the stream and the previous index entry is another "X": call it X' + // + // B.3.1: X' has/will be discarded from the index (due to rules B.1 or B.2.1) + // delete X since all events prior to X are/will be deleted and X doesn't exist in the log + // + // B.3.2: X' will not be discarded from the index (due to rule B.2.2) + // keep X since not all events prior to X may be deleted from the log + // + // assumptions: + // i) 'discardable for sure' events cannot occur after 'maybe discardable' events + // ii) 'discardable for sure' events cannot occur after 'kept for sure' events + // iii) 'maybe discardable' events cannot occur after 'kept for sure' events + + [Fact] + public async Task case_a() { + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1"), + Rec.Write(t++, "ab-1")) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(0) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + Assert.False(x.TryGetOriginalStreamData("ab-1", out _)); // no scavenge data accumulated + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), // emptied prior to scavenge + x.Recs[1].KeepAll() + }, + x => new[] { + x.Recs[0].KeepAll(), + x.Recs[1].KeepAll() + } + ); + } + + [Fact] + public async Task case_b() { + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1"), // B.1: delete + Rec.Write(t++, "ab-1")) // B.3.1: delete + .Chunk( + Rec.Write(t++, "ab-1"), // delete (before truncate before) + Rec.Write(t++, "ab-1")) // delete (before truncate before) + .Chunk( + Rec.Write(t++, "ab-1"), // B.2.1: delete + Rec.Write(t++, "ab-1")) // B.3.1: delete <-- discard point must move here at event 5 + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Cutoff - TimeSpan.FromSeconds(1)), // maybe deleted + Rec.Write(t++, "ab-1", timestamp: Cutoff), // maybe deleted + Rec.Write(t++, "ab-1", timestamp: Cutoff + TimeSpan.FromSeconds(1))) // maybe deleted <-- maybe discard point must move here at event 8 + .Chunk( + Rec.Write(t++, "ab-1"), // B.2.2: keep + Rec.Write(t++, "ab-1")) // B.3.2: keep + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired) + Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired) + Rec.Write(t++, "ab-1", timestamp: Active)) // keep (not expired) + .Chunk( + Rec.Write(t++, "ab-1"), // B.2.2: keep + Rec.Write(t++, "ab-1")) // B.3.2: keep + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Active), // added just to create a valid last event that's readable from the log + Rec.Write(t++, "$$ab-1", "$metadata", metadata: new StreamMetadata( + maxAge: MaxAgeTimeSpan, + truncateBefore: 4 + ))) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(0) + .EmptyChunk(2) + .EmptyChunk(4) + .EmptyChunk(6) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + if (!x.TryGetOriginalStreamData("ab-1", out var data)) + Assert.Fail("Failed to get original stream data"); + + Assert.Equal(DiscardPoint.DiscardIncluding(5), data.DiscardPoint); + Assert.Equal(DiscardPoint.DiscardIncluding(8), data.MaybeDiscardPoint); + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), // emptied prior to scavenge + x.Recs[1].KeepNone(), + x.Recs[2].KeepNone(), // emptied prior to scavenge + x.Recs[3].KeepIndexes(1, 2), + x.Recs[4].KeepNone(), // emptied prior to scavenge + x.Recs[5].KeepAll(), + x.Recs[6].KeepNone(), // emptied prior to scavenge + x.Recs[7].KeepAll(), + x.Recs[8].KeepAll() + }, + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepNone(), + x.Recs[2].KeepNone(), + x.Recs[3].KeepAll(), // all kept since 'maybe' discardable + x.Recs[4].KeepAll(), + x.Recs[5].KeepAll(), + x.Recs[6].KeepAll(), + x.Recs[7].KeepAll(), + x.Recs[8].KeepAll() + } + ); + } + + [Fact] + public async Task case_b_with_no_events_in_the_log() { + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1"), // B.1: delete + Rec.Write(t++, "ab-1"), // B.3.1: delete <-- discard point must move here + Rec.Write(t++, "ab-1")) // B.3.1: normally deleted but kept due to overriding rule to keep last index entry + .Chunk( + Rec.Write(t++, "$$ab-1", "$metadata", metadata: MaxAgeMetadata)) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(0) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + if (!x.TryGetOriginalStreamData("ab-1", out var data)) + Assert.Fail("Failed to get original stream data"); + + Assert.Equal(DiscardPoint.DiscardIncluding(1), data.DiscardPoint); + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), // emptied prior to scavenge + x.Recs[1].KeepAll(), + x.Recs[2].KeepAll() + }, + x => new[] { + x.Recs[0].KeepIndexes(2), + x.Recs[1].KeepAll(), + x.Recs[2].KeepAll() + } + ); + } + + [Fact] + public async Task case_b_with_modified_tb_metadata() { + // in a real-life scenario, $tb would normally be 4. but in this test, we change it to 2 to illustrate that + // the $tb metadata will take precedence. Given that this stream doesn't have maxage metadata, for performance + // reasons we don't consult the chunk's time stamp range and thus won't know if these index entries have already + // been discarded. + + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1"), // delete (before truncate before) + Rec.Write(t++, "ab-1")) // delete (before truncate before) <-- discard point must move here at event 1 + .Chunk( + Rec.Write(t++, "ab-1"), // B.2.1: normally deleted, but kept since truncate before takes precedence where there is no maxage metadata + Rec.Write(t++, "ab-1")) // B.3.1: normally deleted, but kept since truncate before takes precedence where there is no maxage metadata + .Chunk( + Rec.Write(t++, "ab-1"), // keep (after truncate before) + Rec.Write(t++, "ab-1"), // keep (after truncate before) + Rec.Write(t++, "ab-1")) // keep (after truncate before) + .Chunk( + Rec.Write(t++, "$$ab-1", "$metadata", metadata: TruncateBefore2)) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(1) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + Assert.False(x.TryGetOriginalStreamData("ab-1", out _)); // calculation status is 'spent' + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepNone(), // emptied prior to scavenge + x.Recs[2].KeepAll(), + x.Recs[3].KeepAll(), + x.Recs[4].KeepAll() + }, + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepAll(), // all index entries kept + x.Recs[2].KeepAll(), + x.Recs[3].KeepAll(), + x.Recs[4].KeepAll() + } + ); + } + + [Fact] + public async Task case_b_with_modified_maxage_metadata() { + // in a real-life scenario, the timestamp of events in the emptied chunk would normally be 'Expired'. + // but in this test, we change it to 'Active' to illustrate that the stale index entries will still be deleted. + + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Expired), // delete (expired) + Rec.Write(t++, "ab-1", timestamp: Expired)) // delete (expired) + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Active), // B.2.1: delete, although original event wasn't expired + Rec.Write(t++, "ab-1", timestamp: Active)) // B.3.1: delete, although original event wasn't expired <-- discard point must move here at event 3 + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired) + Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired) + Rec.Write(t++, "ab-1", timestamp: Active)) // keep (not expired) + .Chunk( + Rec.Write(t++, "$$ab-1", "$metadata", metadata: MaxAgeMetadata)) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(1) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + if (!x.TryGetOriginalStreamData("ab-1", out var data)) + Assert.Fail("Failed to get original stream data"); + + Assert.Equal(DiscardPoint.DiscardIncluding(3), data.DiscardPoint); + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepNone(), // emptied prior to scavenge + x.Recs[2].KeepAll(), + x.Recs[3].KeepAll(), + x.Recs[4].KeepAll() + }, + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepNone(), // all index entries deleted + x.Recs[2].KeepAll(), + x.Recs[3].KeepAll(), + x.Recs[4].KeepAll() + } + ); + } +} diff --git a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs index e7d2fcf3f83..b2ed3bc0d8a 100644 --- a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs +++ b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs @@ -19,6 +19,7 @@ using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; using EventStore.Core.TransactionLog; using EventStore.Core.TransactionLog.Chunks; +using EventStore.Core.TransactionLog.Chunks.TFChunk; using EventStore.Core.TransactionLog.LogRecords; using EventStore.Core.TransactionLog.Scavenging; using EventStore.Core.Util; @@ -54,6 +55,7 @@ public class Scenario : Scenario { private Type _cancelWhenCheckpointingType; private (string Message, int Line)[] _expectedTrace; private bool _unsafeIgnoreHardDeletes; + private readonly HashSet _chunkNumsToEmpty = new(); protected Tracer Tracer { get; set; } @@ -179,6 +181,11 @@ public class Scenario : Scenario { return this; } + public Scenario EmptyChunk(int chunkNumber) { + _chunkNumsToEmpty.Add(chunkNumber); + return this; + } + public async Task RunAsync( Func getExpectedKeptRecords = null, Func getExpectedKeptIndexEntries = null) { @@ -288,6 +295,8 @@ public class Scenario : Scenario { } } + EmptyRequestedChunks(dbResult.Db); + Scavenger sut = null; try { var cancellationTokenSource = new CancellationTokenSource(); @@ -678,5 +687,35 @@ public class Scenario : Scenario { }); } } + + private void EmptyRequestedChunks(TFChunkDb db) { + foreach (var chunkNum in _chunkNumsToEmpty) { + var chunk = db.Manager.GetChunk(chunkNum); + var header = chunk.ChunkHeader; + + var newChunkHeader = new ChunkHeader( + version: header.Version, + chunkSize: header.ChunkSize, + chunkStartNumber: header.ChunkStartNumber, + chunkEndNumber: header.ChunkEndNumber, + isScavenged: true, + chunkId: Guid.NewGuid()); + + var newChunk = TFChunk.CreateWithHeader( + filename: $"{chunk.FileName}.tmp", + header: newChunkHeader, + fileSize: ChunkHeader.Size, + inMem: false, + unbuffered: false, + writethrough: false, + initialReaderCount: 1, + maxReaderCount: 1, + reduceFileCachePressure: false); + + newChunk.CompleteScavenge(null); + + db.Manager.SwitchChunk(newChunk, false, false); + } + } } } diff --git a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/StreamMetadatas.cs b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/StreamMetadatas.cs index 888fada420c..6405f289a85 100644 --- a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/StreamMetadatas.cs +++ b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/StreamMetadatas.cs @@ -26,6 +26,7 @@ public class StreamMetadatas { public static DateTime EffectiveNow { get; } = new DateTime(2022, 1, 5, 00, 00, 00); public static DateTime Expired { get; } = EffectiveNow - TimeSpan.FromDays(3); + public static DateTime Cutoff { get; } = EffectiveNow - MaxAgeTimeSpan; public static DateTime Active { get; } = EffectiveNow - TimeSpan.FromDays(1); public static Rec ScavengePointRec(int transaction, int threshold = 0, DateTime? timeStamp = null) => Rec.Write( diff --git a/src/EventStore.Core.XUnit.Tests/Scavenge/SubsequentScavengeTests.cs b/src/EventStore.Core.XUnit.Tests/Scavenge/SubsequentScavengeTests.cs index a74cf63cbdb..a31a8dee082 100644 --- a/src/EventStore.Core.XUnit.Tests/Scavenge/SubsequentScavengeTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Scavenge/SubsequentScavengeTests.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading.Tasks; using EventStore.Core.Tests; using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; @@ -128,6 +129,8 @@ public class SubsequentScavengeTests : SqliteDbPerTest x.SetCheckpoint(new ScavengeCheckpoint.Done(ScavengePoint( chunk: 1, eventNumber: 0))); + for (int i = 0; i < 6; i++) + x.SetChunkTimeStampRange(i, new ChunkTimeStampRange(DateTime.UtcNow, DateTime.UtcNow)); }) .AssertTrace( Tracer.Line("Accumulating from SP-0 to SP-2"), @@ -518,6 +521,9 @@ public class SubsequentScavengeTests : SqliteDbPerTest x.SetCheckpoint(new ScavengeCheckpoint.Done(ScavengePoint( chunk: 1, eventNumber: 0))); + + for (int i = 0; i < 4; i++) + x.SetChunkTimeStampRange(i, new ChunkTimeStampRange(DateTime.UtcNow, DateTime.UtcNow)); }) .AssertState(state => { // we changed the maxcount to 4, but we expect the discard points to remain diff --git a/src/EventStore.Core/TransactionLog/Scavenging/Data/DiscardDecision.cs b/src/EventStore.Core/TransactionLog/Scavenging/Data/DiscardDecision.cs index 6b414e41596..215737c9000 100644 --- a/src/EventStore.Core/TransactionLog/Scavenging/Data/DiscardDecision.cs +++ b/src/EventStore.Core/TransactionLog/Scavenging/Data/DiscardDecision.cs @@ -7,5 +7,6 @@ public enum DiscardDecision { // index will just keep such indexentries. MaybeDiscard, Keep, + AlreadyDiscarded, } } diff --git a/src/EventStore.Core/TransactionLog/Scavenging/Stages/Calculator.cs b/src/EventStore.Core/TransactionLog/Scavenging/Stages/Calculator.cs index d026cd74908..2db0ad5c084 100644 --- a/src/EventStore.Core/TransactionLog/Scavenging/Stages/Calculator.cs +++ b/src/EventStore.Core/TransactionLog/Scavenging/Stages/Calculator.cs @@ -208,6 +208,7 @@ public class Buffer { const int maxCount = 8192; var first = true; + var allDiscardedSoFar = true; while (true) { // read in slices because the stream might be huge. @@ -246,6 +247,7 @@ public class Buffer { case DiscardDecision.Discard: weights.OnDiscard(eventCalc.LogicalChunkNumber); discardPoint = DiscardPoint.DiscardIncluding(eventInfo.EventNumber); + allDiscardedSoFar = true; break; case DiscardDecision.MaybeDiscard: @@ -255,6 +257,21 @@ public class Buffer { // by the previous scavenge weights.OnMaybeDiscard(eventCalc.LogicalChunkNumber); maybeDiscardPoint = DiscardPoint.DiscardIncluding(eventInfo.EventNumber); + allDiscardedSoFar = false; + break; + + case DiscardDecision.AlreadyDiscarded: + // this event has already been deleted from the chunks but not from the index. + // we move the discard point forward only if all events before it have been or will be discarded for the following reasons: + // + // i) usually, there should be no gaps between the event numbers of a stream but this property is not guaranteed + // with the previous scavenger. if it happens that this event was in a stream gap and we always moved the discard point + // forward, we would end up deleting all the events that were present before the gap. + // + // ii) we do our best to delete stale entries from the index + + if (allDiscardedSoFar) + discardPoint = DiscardPoint.DiscardIncluding(eventInfo.EventNumber); break; case DiscardDecision.Keep: diff --git a/src/EventStore.Core/TransactionLog/Scavenging/Stages/EventCalculator.cs b/src/EventStore.Core/TransactionLog/Scavenging/Stages/EventCalculator.cs index ddcfc5bef91..d6dd34b9e04 100644 --- a/src/EventStore.Core/TransactionLog/Scavenging/Stages/EventCalculator.cs +++ b/src/EventStore.Core/TransactionLog/Scavenging/Stages/EventCalculator.cs @@ -95,7 +95,10 @@ public class EventCalculator { private DiscardDecision ShouldDiscardForMaxAge(DateTime cutoffTime) { // establish a range that the event was definitely created between. if (!State.TryGetChunkTimeStampRange(LogicalChunkNumber, out var createdAtRange)) { - throw new Exception($"Could not get TimeStamp range for chunk {LogicalChunkNumber}"); + // we don't have a time stamp range for this chunk which implies that it was empty during accumulation. + // however while reading event infos from the index, we encountered an event from that chunk. + // this indicates that the event was deleted from the chunk but not from the index by the old scavenger. + return DiscardDecision.AlreadyDiscarded; } // range is guaranteed to be non-empty