diff --git a/src/EventStore.Core.XUnit.Tests/Scavenge/EmptyScavengedChunksTests.cs b/src/EventStore.Core.XUnit.Tests/Scavenge/EmptyScavengedChunksTests.cs new file mode 100644 index 00000000000..976c894fee4 --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Scavenge/EmptyScavengedChunksTests.cs @@ -0,0 +1,291 @@ +using System; +using System.Threading.Tasks; +using EventStore.Core.Data; +using EventStore.Core.Tests; +using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; +using EventStore.Core.TransactionLog.Scavenging; +using EventStore.Core.XUnit.Tests.Scavenge.Sqlite; +using Xunit; +using static EventStore.Core.XUnit.Tests.Scavenge.StreamMetadatas; + +namespace EventStore.Core.XUnit.Tests.Scavenge; + +// the old scavenger can leave behind empty, scavenged chunks when +// all events from a chunk are deleted. if the scavenger is interrupted +// before the index scavenge phase, entries pointing to these +// empty chunks may be present in the index. +// +// note: it may also happen that the merge phase has completed and the empty chunks +// have been merged but these will be similar to the before-merge case as the +// new scavenger works on logical chunk numbers. + +public class EmptyScavengedChunksTests : SqliteDbPerTest { + // let's assume we have an index entry X pointing to an empty, scavenged chunk. + // call the stream for that index entry: S. + // + // there are the following cases: + // + // Case A. no scavenge data (metadata, tombstone) for S exists in the log: + // as usual: + // - no events of S will be deleted + // - X will stay in the index + // + // Case B. scavenge data (metadata, tombstone) for S exists in the log: + // - as usual, scavenge data will be accumulated and discard points calculated + // - X must or must not be deleted from the index, depending on the cases below: + // + // B.1: X is the first index entry for the stream + // delete X since all events prior to X are deleted and X doesn't exist in the log + // + // B.2: X is not the first index entry for the stream and the previous index entry is not another "X" + // + // B.2.1: based on scavenge data, the index entry just before X will be discarded for sure (i.e not 'maybe' discardable) + // delete X since all events prior to X will be deleted and X doesn't exist in the log + // + // B.2.2: based on scavenge data, the index entry just before X is 'maybe' discardable or will be kept for sure + // keep X since not all events prior to X may be deleted from the log + // + // B.3: X is not the first index entry for the stream and the previous index entry is another "X": call it X' + // + // B.3.1: X' has/will be discarded from the index (due to rules B.1 or B.2.1) + // delete X since all events prior to X are/will be deleted and X doesn't exist in the log + // + // B.3.2: X' will not be discarded from the index (due to rule B.2.2) + // keep X since not all events prior to X may be deleted from the log + // + // assumptions: + // i) 'discardable for sure' events cannot occur after 'maybe discardable' events + // ii) 'discardable for sure' events cannot occur after 'kept for sure' events + // iii) 'maybe discardable' events cannot occur after 'kept for sure' events + + [Fact] + public async Task case_a() { + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1"), + Rec.Write(t++, "ab-1")) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(0) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + Assert.False(x.TryGetOriginalStreamData("ab-1", out _)); // no scavenge data accumulated + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), // emptied prior to scavenge + x.Recs[1].KeepAll() + }, + x => new[] { + x.Recs[0].KeepAll(), + x.Recs[1].KeepAll() + } + ); + } + + [Fact] + public async Task case_b() { + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1"), // B.1: delete + Rec.Write(t++, "ab-1")) // B.3.1: delete + .Chunk( + Rec.Write(t++, "ab-1"), // delete (before truncate before) + Rec.Write(t++, "ab-1")) // delete (before truncate before) + .Chunk( + Rec.Write(t++, "ab-1"), // B.2.1: delete + Rec.Write(t++, "ab-1")) // B.3.1: delete <-- discard point must move here at event 5 + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Cutoff - TimeSpan.FromSeconds(1)), // maybe deleted + Rec.Write(t++, "ab-1", timestamp: Cutoff), // maybe deleted + Rec.Write(t++, "ab-1", timestamp: Cutoff + TimeSpan.FromSeconds(1))) // maybe deleted <-- maybe discard point must move here at event 8 + .Chunk( + Rec.Write(t++, "ab-1"), // B.2.2: keep + Rec.Write(t++, "ab-1")) // B.3.2: keep + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired) + Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired) + Rec.Write(t++, "ab-1", timestamp: Active)) // keep (not expired) + .Chunk( + Rec.Write(t++, "ab-1"), // B.2.2: keep + Rec.Write(t++, "ab-1")) // B.3.2: keep + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Active), // added just to create a valid last event that's readable from the log + Rec.Write(t++, "$$ab-1", "$metadata", metadata: new StreamMetadata( + maxAge: MaxAgeTimeSpan, + truncateBefore: 4 + ))) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(0) + .EmptyChunk(2) + .EmptyChunk(4) + .EmptyChunk(6) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + if (!x.TryGetOriginalStreamData("ab-1", out var data)) + Assert.Fail("Failed to get original stream data"); + + Assert.Equal(DiscardPoint.DiscardIncluding(5), data.DiscardPoint); + Assert.Equal(DiscardPoint.DiscardIncluding(8), data.MaybeDiscardPoint); + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), // emptied prior to scavenge + x.Recs[1].KeepNone(), + x.Recs[2].KeepNone(), // emptied prior to scavenge + x.Recs[3].KeepIndexes(1, 2), + x.Recs[4].KeepNone(), // emptied prior to scavenge + x.Recs[5].KeepAll(), + x.Recs[6].KeepNone(), // emptied prior to scavenge + x.Recs[7].KeepAll(), + x.Recs[8].KeepAll() + }, + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepNone(), + x.Recs[2].KeepNone(), + x.Recs[3].KeepAll(), // all kept since 'maybe' discardable + x.Recs[4].KeepAll(), + x.Recs[5].KeepAll(), + x.Recs[6].KeepAll(), + x.Recs[7].KeepAll(), + x.Recs[8].KeepAll() + } + ); + } + + [Fact] + public async Task case_b_with_no_events_in_the_log() { + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1"), // B.1: delete + Rec.Write(t++, "ab-1"), // B.3.1: delete <-- discard point must move here + Rec.Write(t++, "ab-1")) // B.3.1: normally deleted but kept due to overriding rule to keep last index entry + .Chunk( + Rec.Write(t++, "$$ab-1", "$metadata", metadata: MaxAgeMetadata)) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(0) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + if (!x.TryGetOriginalStreamData("ab-1", out var data)) + Assert.Fail("Failed to get original stream data"); + + Assert.Equal(DiscardPoint.DiscardIncluding(1), data.DiscardPoint); + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), // emptied prior to scavenge + x.Recs[1].KeepAll(), + x.Recs[2].KeepAll() + }, + x => new[] { + x.Recs[0].KeepIndexes(2), + x.Recs[1].KeepAll(), + x.Recs[2].KeepAll() + } + ); + } + + [Fact] + public async Task case_b_with_modified_tb_metadata() { + // in a real-life scenario, $tb would normally be 4. but in this test, we change it to 2 to illustrate that + // the $tb metadata will take precedence. Given that this stream doesn't have maxage metadata, for performance + // reasons we don't consult the chunk's time stamp range and thus won't know if these index entries have already + // been discarded. + + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1"), // delete (before truncate before) + Rec.Write(t++, "ab-1")) // delete (before truncate before) <-- discard point must move here at event 1 + .Chunk( + Rec.Write(t++, "ab-1"), // B.2.1: normally deleted, but kept since truncate before takes precedence where there is no maxage metadata + Rec.Write(t++, "ab-1")) // B.3.1: normally deleted, but kept since truncate before takes precedence where there is no maxage metadata + .Chunk( + Rec.Write(t++, "ab-1"), // keep (after truncate before) + Rec.Write(t++, "ab-1"), // keep (after truncate before) + Rec.Write(t++, "ab-1")) // keep (after truncate before) + .Chunk( + Rec.Write(t++, "$$ab-1", "$metadata", metadata: TruncateBefore2)) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(1) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + Assert.False(x.TryGetOriginalStreamData("ab-1", out _)); // calculation status is 'spent' + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepNone(), // emptied prior to scavenge + x.Recs[2].KeepAll(), + x.Recs[3].KeepAll(), + x.Recs[4].KeepAll() + }, + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepAll(), // all index entries kept + x.Recs[2].KeepAll(), + x.Recs[3].KeepAll(), + x.Recs[4].KeepAll() + } + ); + } + + [Fact] + public async Task case_b_with_modified_maxage_metadata() { + // in a real-life scenario, the timestamp of events in the emptied chunk would normally be 'Expired'. + // but in this test, we change it to 'Active' to illustrate that the stale index entries will still be deleted. + + var t = 0; + await new Scenario() + .WithDbPath(Fixture.Directory) + .WithDb(x => x + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Expired), // delete (expired) + Rec.Write(t++, "ab-1", timestamp: Expired)) // delete (expired) + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Active), // B.2.1: delete, although original event wasn't expired + Rec.Write(t++, "ab-1", timestamp: Active)) // B.3.1: delete, although original event wasn't expired <-- discard point must move here at event 3 + .Chunk( + Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired) + Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired) + Rec.Write(t++, "ab-1", timestamp: Active)) // keep (not expired) + .Chunk( + Rec.Write(t++, "$$ab-1", "$metadata", metadata: MaxAgeMetadata)) + .Chunk(ScavengePointRec(t++))) + .EmptyChunk(1) + .WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool)) + .AssertState(x => { + if (!x.TryGetOriginalStreamData("ab-1", out var data)) + Assert.Fail("Failed to get original stream data"); + + Assert.Equal(DiscardPoint.DiscardIncluding(3), data.DiscardPoint); + }) + .RunAsync( + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepNone(), // emptied prior to scavenge + x.Recs[2].KeepAll(), + x.Recs[3].KeepAll(), + x.Recs[4].KeepAll() + }, + x => new[] { + x.Recs[0].KeepNone(), + x.Recs[1].KeepNone(), // all index entries deleted + x.Recs[2].KeepAll(), + x.Recs[3].KeepAll(), + x.Recs[4].KeepAll() + } + ); + } +} diff --git a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs index e7d2fcf3f83..b2ed3bc0d8a 100644 --- a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs +++ b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs @@ -19,6 +19,7 @@ using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; using EventStore.Core.TransactionLog; using EventStore.Core.TransactionLog.Chunks; +using EventStore.Core.TransactionLog.Chunks.TFChunk; using EventStore.Core.TransactionLog.LogRecords; using EventStore.Core.TransactionLog.Scavenging; using EventStore.Core.Util; @@ -54,6 +55,7 @@ public class Scenario : Scenario { private Type _cancelWhenCheckpointingType; private (string Message, int Line)[] _expectedTrace; private bool _unsafeIgnoreHardDeletes; + private readonly HashSet _chunkNumsToEmpty = new(); protected Tracer Tracer { get; set; } @@ -179,6 +181,11 @@ public class Scenario : Scenario { return this; } + public Scenario EmptyChunk(int chunkNumber) { + _chunkNumsToEmpty.Add(chunkNumber); + return this; + } + public async Task RunAsync( Func getExpectedKeptRecords = null, Func getExpectedKeptIndexEntries = null) { @@ -288,6 +295,8 @@ public class Scenario : Scenario { } } + EmptyRequestedChunks(dbResult.Db); + Scavenger sut = null; try { var cancellationTokenSource = new CancellationTokenSource(); @@ -678,5 +687,35 @@ public class Scenario : Scenario { }); } } + + private void EmptyRequestedChunks(TFChunkDb db) { + foreach (var chunkNum in _chunkNumsToEmpty) { + var chunk = db.Manager.GetChunk(chunkNum); + var header = chunk.ChunkHeader; + + var newChunkHeader = new ChunkHeader( + version: header.Version, + chunkSize: header.ChunkSize, + chunkStartNumber: header.ChunkStartNumber, + chunkEndNumber: header.ChunkEndNumber, + isScavenged: true, + chunkId: Guid.NewGuid()); + + var newChunk = TFChunk.CreateWithHeader( + filename: $"{chunk.FileName}.tmp", + header: newChunkHeader, + fileSize: ChunkHeader.Size, + inMem: false, + unbuffered: false, + writethrough: false, + initialReaderCount: 1, + maxReaderCount: 1, + reduceFileCachePressure: false); + + newChunk.CompleteScavenge(null); + + db.Manager.SwitchChunk(newChunk, false, false); + } + } } } diff --git a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/StreamMetadatas.cs b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/StreamMetadatas.cs index 888fada420c..6405f289a85 100644 --- a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/StreamMetadatas.cs +++ b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/StreamMetadatas.cs @@ -26,6 +26,7 @@ public class StreamMetadatas { public static DateTime EffectiveNow { get; } = new DateTime(2022, 1, 5, 00, 00, 00); public static DateTime Expired { get; } = EffectiveNow - TimeSpan.FromDays(3); + public static DateTime Cutoff { get; } = EffectiveNow - MaxAgeTimeSpan; public static DateTime Active { get; } = EffectiveNow - TimeSpan.FromDays(1); public static Rec ScavengePointRec(int transaction, int threshold = 0, DateTime? timeStamp = null) => Rec.Write( diff --git a/src/EventStore.Core.XUnit.Tests/Scavenge/SubsequentScavengeTests.cs b/src/EventStore.Core.XUnit.Tests/Scavenge/SubsequentScavengeTests.cs index a74cf63cbdb..a31a8dee082 100644 --- a/src/EventStore.Core.XUnit.Tests/Scavenge/SubsequentScavengeTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Scavenge/SubsequentScavengeTests.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading.Tasks; using EventStore.Core.Tests; using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; @@ -128,6 +129,8 @@ public class SubsequentScavengeTests : SqliteDbPerTest x.SetCheckpoint(new ScavengeCheckpoint.Done(ScavengePoint( chunk: 1, eventNumber: 0))); + for (int i = 0; i < 6; i++) + x.SetChunkTimeStampRange(i, new ChunkTimeStampRange(DateTime.UtcNow, DateTime.UtcNow)); }) .AssertTrace( Tracer.Line("Accumulating from SP-0 to SP-2"), @@ -518,6 +521,9 @@ public class SubsequentScavengeTests : SqliteDbPerTest x.SetCheckpoint(new ScavengeCheckpoint.Done(ScavengePoint( chunk: 1, eventNumber: 0))); + + for (int i = 0; i < 4; i++) + x.SetChunkTimeStampRange(i, new ChunkTimeStampRange(DateTime.UtcNow, DateTime.UtcNow)); }) .AssertState(state => { // we changed the maxcount to 4, but we expect the discard points to remain diff --git a/src/EventStore.Core/TransactionLog/Scavenging/Data/DiscardDecision.cs b/src/EventStore.Core/TransactionLog/Scavenging/Data/DiscardDecision.cs index 6b414e41596..215737c9000 100644 --- a/src/EventStore.Core/TransactionLog/Scavenging/Data/DiscardDecision.cs +++ b/src/EventStore.Core/TransactionLog/Scavenging/Data/DiscardDecision.cs @@ -7,5 +7,6 @@ public enum DiscardDecision { // index will just keep such indexentries. MaybeDiscard, Keep, + AlreadyDiscarded, } } diff --git a/src/EventStore.Core/TransactionLog/Scavenging/Stages/Calculator.cs b/src/EventStore.Core/TransactionLog/Scavenging/Stages/Calculator.cs index d026cd74908..2db0ad5c084 100644 --- a/src/EventStore.Core/TransactionLog/Scavenging/Stages/Calculator.cs +++ b/src/EventStore.Core/TransactionLog/Scavenging/Stages/Calculator.cs @@ -208,6 +208,7 @@ public class Buffer { const int maxCount = 8192; var first = true; + var allDiscardedSoFar = true; while (true) { // read in slices because the stream might be huge. @@ -246,6 +247,7 @@ public class Buffer { case DiscardDecision.Discard: weights.OnDiscard(eventCalc.LogicalChunkNumber); discardPoint = DiscardPoint.DiscardIncluding(eventInfo.EventNumber); + allDiscardedSoFar = true; break; case DiscardDecision.MaybeDiscard: @@ -255,6 +257,21 @@ public class Buffer { // by the previous scavenge weights.OnMaybeDiscard(eventCalc.LogicalChunkNumber); maybeDiscardPoint = DiscardPoint.DiscardIncluding(eventInfo.EventNumber); + allDiscardedSoFar = false; + break; + + case DiscardDecision.AlreadyDiscarded: + // this event has already been deleted from the chunks but not from the index. + // we move the discard point forward only if all events before it have been or will be discarded for the following reasons: + // + // i) usually, there should be no gaps between the event numbers of a stream but this property is not guaranteed + // with the previous scavenger. if it happens that this event was in a stream gap and we always moved the discard point + // forward, we would end up deleting all the events that were present before the gap. + // + // ii) we do our best to delete stale entries from the index + + if (allDiscardedSoFar) + discardPoint = DiscardPoint.DiscardIncluding(eventInfo.EventNumber); break; case DiscardDecision.Keep: diff --git a/src/EventStore.Core/TransactionLog/Scavenging/Stages/EventCalculator.cs b/src/EventStore.Core/TransactionLog/Scavenging/Stages/EventCalculator.cs index ddcfc5bef91..d6dd34b9e04 100644 --- a/src/EventStore.Core/TransactionLog/Scavenging/Stages/EventCalculator.cs +++ b/src/EventStore.Core/TransactionLog/Scavenging/Stages/EventCalculator.cs @@ -95,7 +95,10 @@ public class EventCalculator { private DiscardDecision ShouldDiscardForMaxAge(DateTime cutoffTime) { // establish a range that the event was definitely created between. if (!State.TryGetChunkTimeStampRange(LogicalChunkNumber, out var createdAtRange)) { - throw new Exception($"Could not get TimeStamp range for chunk {LogicalChunkNumber}"); + // we don't have a time stamp range for this chunk which implies that it was empty during accumulation. + // however while reading event infos from the index, we encountered an event from that chunk. + // this indicates that the event was deleted from the chunk but not from the index by the old scavenger. + return DiscardDecision.AlreadyDiscarded; } // range is guaranteed to be non-empty