Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scavenge: Handle events that have been deleted from (now empty) chunks but not from the index #3813

Merged
merged 1 commit into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
291 changes: 291 additions & 0 deletions src/EventStore.Core.XUnit.Tests/Scavenge/EmptyScavengedChunksTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
using System;
using System.Threading.Tasks;
using EventStore.Core.Data;
using EventStore.Core.Tests;
using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers;
using EventStore.Core.TransactionLog.Scavenging;
using EventStore.Core.XUnit.Tests.Scavenge.Sqlite;
using Xunit;
using static EventStore.Core.XUnit.Tests.Scavenge.StreamMetadatas;

namespace EventStore.Core.XUnit.Tests.Scavenge;

// the old scavenger can leave behind empty, scavenged chunks when
// all events from a chunk are deleted. if the scavenger is interrupted
// before the index scavenge phase, entries pointing to these
// empty chunks may be present in the index.
//
// note: it may also happen that the merge phase has completed and the empty chunks
// have been merged but these will be similar to the before-merge case as the
// new scavenger works on logical chunk numbers.

public class EmptyScavengedChunksTests : SqliteDbPerTest<MaxAgeTests> {
// let's assume we have an index entry X pointing to an empty, scavenged chunk.
// call the stream for that index entry: S.
//
// there are the following cases:
//
// Case A. no scavenge data (metadata, tombstone) for S exists in the log:
// as usual:
// - no events of S will be deleted
// - X will stay in the index
//
// Case B. scavenge data (metadata, tombstone) for S exists in the log:
// - as usual, scavenge data will be accumulated and discard points calculated
// - X must or must not be deleted from the index, depending on the cases below:
//
// B.1: X is the first index entry for the stream
// delete X since all events prior to X are deleted and X doesn't exist in the log
//
// B.2: X is not the first index entry for the stream and the previous index entry is not another "X"
//
// B.2.1: based on scavenge data, the index entry just before X will be discarded for sure (i.e not 'maybe' discardable)
// delete X since all events prior to X will be deleted and X doesn't exist in the log
//
// B.2.2: based on scavenge data, the index entry just before X is 'maybe' discardable or will be kept for sure
// keep X since not all events prior to X may be deleted from the log
//
// B.3: X is not the first index entry for the stream and the previous index entry is another "X": call it X'
//
// B.3.1: X' has/will be discarded from the index (due to rules B.1 or B.2.1)
// delete X since all events prior to X are/will be deleted and X doesn't exist in the log
//
// B.3.2: X' will not be discarded from the index (due to rule B.2.2)
// keep X since not all events prior to X may be deleted from the log
//
// assumptions:
// i) 'discardable for sure' events cannot occur after 'maybe discardable' events
// ii) 'discardable for sure' events cannot occur after 'kept for sure' events
// iii) 'maybe discardable' events cannot occur after 'kept for sure' events

[Fact]
public async Task case_a() {
var t = 0;
await new Scenario<LogFormat.V2, string>()
.WithDbPath(Fixture.Directory)
.WithDb(x => x
.Chunk(
Rec.Write(t++, "ab-1"),
Rec.Write(t++, "ab-1"))
.Chunk(ScavengePointRec(t++)))
.EmptyChunk(0)
.WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool))
.AssertState(x => {
Assert.False(x.TryGetOriginalStreamData("ab-1", out _)); // no scavenge data accumulated
})
.RunAsync(
x => new[] {
x.Recs[0].KeepNone(), // emptied prior to scavenge
x.Recs[1].KeepAll()
},
x => new[] {
x.Recs[0].KeepAll(),
x.Recs[1].KeepAll()
}
);
}

[Fact]
public async Task case_b() {
var t = 0;
await new Scenario<LogFormat.V2, string>()
.WithDbPath(Fixture.Directory)
.WithDb(x => x
.Chunk(
Rec.Write(t++, "ab-1"), // B.1: delete
Rec.Write(t++, "ab-1")) // B.3.1: delete
.Chunk(
Rec.Write(t++, "ab-1"), // delete (before truncate before)
Rec.Write(t++, "ab-1")) // delete (before truncate before)
.Chunk(
Rec.Write(t++, "ab-1"), // B.2.1: delete
Rec.Write(t++, "ab-1")) // B.3.1: delete <-- discard point must move here at event 5
.Chunk(
Rec.Write(t++, "ab-1", timestamp: Cutoff - TimeSpan.FromSeconds(1)), // maybe deleted
Rec.Write(t++, "ab-1", timestamp: Cutoff), // maybe deleted
Rec.Write(t++, "ab-1", timestamp: Cutoff + TimeSpan.FromSeconds(1))) // maybe deleted <-- maybe discard point must move here at event 8
.Chunk(
Rec.Write(t++, "ab-1"), // B.2.2: keep
Rec.Write(t++, "ab-1")) // B.3.2: keep
.Chunk(
Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired)
Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired)
Rec.Write(t++, "ab-1", timestamp: Active)) // keep (not expired)
.Chunk(
Rec.Write(t++, "ab-1"), // B.2.2: keep
Rec.Write(t++, "ab-1")) // B.3.2: keep
.Chunk(
Rec.Write(t++, "ab-1", timestamp: Active), // added just to create a valid last event that's readable from the log
Rec.Write(t++, "$$ab-1", "$metadata", metadata: new StreamMetadata(
maxAge: MaxAgeTimeSpan,
truncateBefore: 4
)))
.Chunk(ScavengePointRec(t++)))
.EmptyChunk(0)
.EmptyChunk(2)
.EmptyChunk(4)
.EmptyChunk(6)
.WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool))
.AssertState(x => {
if (!x.TryGetOriginalStreamData("ab-1", out var data))
Assert.Fail("Failed to get original stream data");

Assert.Equal(DiscardPoint.DiscardIncluding(5), data.DiscardPoint);
Assert.Equal(DiscardPoint.DiscardIncluding(8), data.MaybeDiscardPoint);
})
.RunAsync(
x => new[] {
x.Recs[0].KeepNone(), // emptied prior to scavenge
x.Recs[1].KeepNone(),
x.Recs[2].KeepNone(), // emptied prior to scavenge
x.Recs[3].KeepIndexes(1, 2),
x.Recs[4].KeepNone(), // emptied prior to scavenge
x.Recs[5].KeepAll(),
x.Recs[6].KeepNone(), // emptied prior to scavenge
x.Recs[7].KeepAll(),
x.Recs[8].KeepAll()
},
x => new[] {
x.Recs[0].KeepNone(),
x.Recs[1].KeepNone(),
x.Recs[2].KeepNone(),
x.Recs[3].KeepAll(), // all kept since 'maybe' discardable
x.Recs[4].KeepAll(),
x.Recs[5].KeepAll(),
x.Recs[6].KeepAll(),
x.Recs[7].KeepAll(),
x.Recs[8].KeepAll()
}
);
}

[Fact]
public async Task case_b_with_no_events_in_the_log() {
var t = 0;
await new Scenario<LogFormat.V2, string>()
.WithDbPath(Fixture.Directory)
.WithDb(x => x
.Chunk(
Rec.Write(t++, "ab-1"), // B.1: delete
Rec.Write(t++, "ab-1"), // B.3.1: delete <-- discard point must move here
Rec.Write(t++, "ab-1")) // B.3.1: normally deleted but kept due to overriding rule to keep last index entry
.Chunk(
Rec.Write(t++, "$$ab-1", "$metadata", metadata: MaxAgeMetadata))
.Chunk(ScavengePointRec(t++)))
.EmptyChunk(0)
.WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool))
.AssertState(x => {
if (!x.TryGetOriginalStreamData("ab-1", out var data))
Assert.Fail("Failed to get original stream data");

Assert.Equal(DiscardPoint.DiscardIncluding(1), data.DiscardPoint);
})
.RunAsync(
x => new[] {
x.Recs[0].KeepNone(), // emptied prior to scavenge
x.Recs[1].KeepAll(),
x.Recs[2].KeepAll()
},
x => new[] {
x.Recs[0].KeepIndexes(2),
x.Recs[1].KeepAll(),
x.Recs[2].KeepAll()
}
);
}

[Fact]
public async Task case_b_with_modified_tb_metadata() {
// in a real-life scenario, $tb would normally be 4. but in this test, we change it to 2 to illustrate that
// the $tb metadata will take precedence. Given that this stream doesn't have maxage metadata, for performance
// reasons we don't consult the chunk's time stamp range and thus won't know if these index entries have already
// been discarded.

var t = 0;
await new Scenario<LogFormat.V2, string>()
.WithDbPath(Fixture.Directory)
.WithDb(x => x
.Chunk(
Rec.Write(t++, "ab-1"), // delete (before truncate before)
Rec.Write(t++, "ab-1")) // delete (before truncate before) <-- discard point must move here at event 1
.Chunk(
Rec.Write(t++, "ab-1"), // B.2.1: normally deleted, but kept since truncate before takes precedence where there is no maxage metadata
Rec.Write(t++, "ab-1")) // B.3.1: normally deleted, but kept since truncate before takes precedence where there is no maxage metadata
.Chunk(
Rec.Write(t++, "ab-1"), // keep (after truncate before)
Rec.Write(t++, "ab-1"), // keep (after truncate before)
Rec.Write(t++, "ab-1")) // keep (after truncate before)
.Chunk(
Rec.Write(t++, "$$ab-1", "$metadata", metadata: TruncateBefore2))
.Chunk(ScavengePointRec(t++)))
.EmptyChunk(1)
.WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool))
.AssertState(x => {
Assert.False(x.TryGetOriginalStreamData("ab-1", out _)); // calculation status is 'spent'
})
.RunAsync(
x => new[] {
x.Recs[0].KeepNone(),
x.Recs[1].KeepNone(), // emptied prior to scavenge
x.Recs[2].KeepAll(),
x.Recs[3].KeepAll(),
x.Recs[4].KeepAll()
},
x => new[] {
x.Recs[0].KeepNone(),
x.Recs[1].KeepAll(), // all index entries kept
x.Recs[2].KeepAll(),
x.Recs[3].KeepAll(),
x.Recs[4].KeepAll()
}
);
}

[Fact]
public async Task case_b_with_modified_maxage_metadata() {
// in a real-life scenario, the timestamp of events in the emptied chunk would normally be 'Expired'.
// but in this test, we change it to 'Active' to illustrate that the stale index entries will still be deleted.

var t = 0;
await new Scenario<LogFormat.V2, string>()
.WithDbPath(Fixture.Directory)
.WithDb(x => x
.Chunk(
Rec.Write(t++, "ab-1", timestamp: Expired), // delete (expired)
Rec.Write(t++, "ab-1", timestamp: Expired)) // delete (expired)
.Chunk(
Rec.Write(t++, "ab-1", timestamp: Active), // B.2.1: delete, although original event wasn't expired
Rec.Write(t++, "ab-1", timestamp: Active)) // B.3.1: delete, although original event wasn't expired <-- discard point must move here at event 3
.Chunk(
Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired)
Rec.Write(t++, "ab-1", timestamp: Active), // keep (not expired)
Rec.Write(t++, "ab-1", timestamp: Active)) // keep (not expired)
.Chunk(
Rec.Write(t++, "$$ab-1", "$metadata", metadata: MaxAgeMetadata))
.Chunk(ScavengePointRec(t++)))
.EmptyChunk(1)
.WithState(x => x.WithConnectionPool(Fixture.DbConnectionPool))
.AssertState(x => {
if (!x.TryGetOriginalStreamData("ab-1", out var data))
Assert.Fail("Failed to get original stream data");

Assert.Equal(DiscardPoint.DiscardIncluding(3), data.DiscardPoint);
})
.RunAsync(
x => new[] {
x.Recs[0].KeepNone(),
x.Recs[1].KeepNone(), // emptied prior to scavenge
x.Recs[2].KeepAll(),
x.Recs[3].KeepAll(),
x.Recs[4].KeepAll()
},
x => new[] {
x.Recs[0].KeepNone(),
x.Recs[1].KeepNone(), // all index entries deleted
x.Recs[2].KeepAll(),
x.Recs[3].KeepAll(),
x.Recs[4].KeepAll()
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers;
using EventStore.Core.TransactionLog;
using EventStore.Core.TransactionLog.Chunks;
using EventStore.Core.TransactionLog.Chunks.TFChunk;
using EventStore.Core.TransactionLog.LogRecords;
using EventStore.Core.TransactionLog.Scavenging;
using EventStore.Core.Util;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class Scenario<TLogFormat, TStreamId> : Scenario {
private Type _cancelWhenCheckpointingType;
private (string Message, int Line)[] _expectedTrace;
private bool _unsafeIgnoreHardDeletes;
private readonly HashSet<int> _chunkNumsToEmpty = new();

protected Tracer Tracer { get; set; }

Expand Down Expand Up @@ -179,6 +181,11 @@ public class Scenario<TLogFormat, TStreamId> : Scenario {
return this;
}

public Scenario<TLogFormat, TStreamId> EmptyChunk(int chunkNumber) {
_chunkNumsToEmpty.Add(chunkNumber);
return this;
}

public async Task<DbResult> RunAsync(
Func<DbResult, ILogRecord[][]> getExpectedKeptRecords = null,
Func<DbResult, ILogRecord[][]> getExpectedKeptIndexEntries = null) {
Expand Down Expand Up @@ -288,6 +295,8 @@ public class Scenario<TLogFormat, TStreamId> : Scenario {
}
}

EmptyRequestedChunks(dbResult.Db);

Scavenger<TStreamId> sut = null;
try {
var cancellationTokenSource = new CancellationTokenSource();
Expand Down Expand Up @@ -678,5 +687,35 @@ public class Scenario<TLogFormat, TStreamId> : Scenario {
});
}
}

private void EmptyRequestedChunks(TFChunkDb db) {
foreach (var chunkNum in _chunkNumsToEmpty) {
var chunk = db.Manager.GetChunk(chunkNum);
var header = chunk.ChunkHeader;

var newChunkHeader = new ChunkHeader(
version: header.Version,
chunkSize: header.ChunkSize,
chunkStartNumber: header.ChunkStartNumber,
chunkEndNumber: header.ChunkEndNumber,
isScavenged: true,
chunkId: Guid.NewGuid());

var newChunk = TFChunk.CreateWithHeader(
filename: $"{chunk.FileName}.tmp",
header: newChunkHeader,
fileSize: ChunkHeader.Size,
inMem: false,
unbuffered: false,
writethrough: false,
initialReaderCount: 1,
maxReaderCount: 1,
reduceFileCachePressure: false);

newChunk.CompleteScavenge(null);

db.Manager.SwitchChunk(newChunk, false, false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class StreamMetadatas {

public static DateTime EffectiveNow { get; } = new DateTime(2022, 1, 5, 00, 00, 00);
public static DateTime Expired { get; } = EffectiveNow - TimeSpan.FromDays(3);
public static DateTime Cutoff { get; } = EffectiveNow - MaxAgeTimeSpan;
public static DateTime Active { get; } = EffectiveNow - TimeSpan.FromDays(1);

public static Rec ScavengePointRec(int transaction, int threshold = 0, DateTime? timeStamp = null) => Rec.Write(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using EventStore.Core.Tests;
using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers;
Expand Down Expand Up @@ -128,6 +129,8 @@ public class SubsequentScavengeTests : SqliteDbPerTest<SubsequentScavengeTests>
x.SetCheckpoint(new ScavengeCheckpoint.Done(ScavengePoint(
chunk: 1,
eventNumber: 0)));
for (int i = 0; i < 6; i++)
x.SetChunkTimeStampRange(i, new ChunkTimeStampRange(DateTime.UtcNow, DateTime.UtcNow));
})
.AssertTrace(
Tracer.Line("Accumulating from SP-0 to SP-2"),
Expand Down Expand Up @@ -518,6 +521,9 @@ public class SubsequentScavengeTests : SqliteDbPerTest<SubsequentScavengeTests>
x.SetCheckpoint(new ScavengeCheckpoint.Done(ScavengePoint(
chunk: 1,
eventNumber: 0)));

for (int i = 0; i < 4; i++)
x.SetChunkTimeStampRange(i, new ChunkTimeStampRange(DateTime.UtcNow, DateTime.UtcNow));
})
.AssertState(state => {
// we changed the maxcount to 4, but we expect the discard points to remain
Expand Down