Skip to content

Commit

Permalink
Merge pull request #1638 from lscpike/feature/scavenge_index
Browse files Browse the repository at this point in the history
Scavenge index
  • Loading branch information
riccardone committed Nov 27, 2018
2 parents 581b113 + 3e7f3ba commit ae4d359
Show file tree
Hide file tree
Showing 46 changed files with 2,295 additions and 494 deletions.
9 changes: 8 additions & 1 deletion src/EventStore.Core.Tests/Index/FakeIndexReader.cs
Expand Up @@ -6,6 +6,13 @@ namespace EventStore.Core.Tests.Fakes
{
public class FakeIndexReader : ITransactionFileReader
{
private readonly Func<long, bool> _existsAt;

public FakeIndexReader(Func<long,bool> existsAt = null)
{
_existsAt = existsAt ?? (l => true);
}

public void Reposition(long position)
{
throw new NotImplementedException();
Expand All @@ -29,7 +36,7 @@ public RecordReadResult TryReadAt(long position)

public bool ExistsAt(long position)
{
return true;
return _existsAt(position);
}
}
}
13 changes: 9 additions & 4 deletions src/EventStore.Core.Tests/Index/IndexV1/when_merging_ptables.cs
Expand Up @@ -277,9 +277,11 @@ public void merged_ptable_is_64bit()
}

[Test]
public void there_are_5_records_in_the_merged_index()
public void there_are_8_records_in_the_merged_index()
{
Assert.AreEqual(5, _newtable.Count);
// 5 from the 64 bit table (existsAt doesn't get used)
// 3 from the 32 bit table (3 even positions)
Assert.AreEqual(8, _newtable.Count);
}

[Test]
Expand Down Expand Up @@ -357,9 +359,12 @@ public void merged_ptable_is_64bit()
}

[Test]
public void there_are_7_records_in_the_merged_index()
public void there_are_10_records_in_the_merged_index()
{
Assert.AreEqual(7, _newtable.Count);
// 5 from 64 bit (existsAt not called)
// 2 from first table (2 even positions)
// 3 from last table (3 even positions)
Assert.AreEqual(10, _newtable.Count);
}

[Test]
Expand Down
Expand Up @@ -6,13 +6,11 @@

namespace EventStore.Core.Tests.Index.IndexV1
{
[TestFixture(PTableVersions.IndexV1,false)]
[TestFixture(PTableVersions.IndexV1,true)]
[TestFixture(PTableVersions.IndexV2,false)]
[TestFixture(PTableVersions.IndexV2,true)]
[TestFixture(PTableVersions.IndexV3,false)]
[TestFixture(PTableVersions.IndexV3,true)]
public class when_merging_ptables_with_entries_to_nonexisting_record: SpecificationWithDirectoryPerTestFixture
public class when_merging_ptables_with_entries_to_nonexisting_record_in_newer_index_versions: SpecificationWithDirectoryPerTestFixture
{
private readonly List<string> _files = new List<string>();
private readonly List<PTable> _tables = new List<PTable>();
Expand All @@ -21,7 +19,7 @@ public class when_merging_ptables_with_entries_to_nonexisting_record: Specificat

private bool _skipIndexVerify;

public when_merging_ptables_with_entries_to_nonexisting_record(byte version, bool skipIndexVerify){
public when_merging_ptables_with_entries_to_nonexisting_record_in_newer_index_versions(byte version, bool skipIndexVerify){
_ptableVersion = version;
_skipIndexVerify = skipIndexVerify;
}
Expand Down Expand Up @@ -57,9 +55,9 @@ public override void TestFixtureTearDown()
}

[Test]
public void there_are_only_twenty_entries_left()
public void all_entries_are_left()
{
Assert.AreEqual(20, _newtable.Count);
Assert.AreEqual(40, _newtable.Count);
}

[Test]
Expand All @@ -82,13 +80,8 @@ public void the_right_items_are_deleted()
for (int j = 0; j < 10; j++)
{
long position;
if ((i*10 + j)%2 == 0)
{
Assert.IsTrue(_newtable.TryGetOneValue((ulong)(0x010100000000 << i), j, out position));
Assert.AreEqual(i*10+j, position);
}
else
Assert.IsFalse(_newtable.TryGetOneValue((ulong)(0x010100000000 << i), j, out position));
Assert.IsTrue(_newtable.TryGetOneValue((ulong)(0x010100000000 << i), j, out position));
Assert.AreEqual(i*10+j, position);
}
}
}
Expand Down
Expand Up @@ -143,8 +143,118 @@ public void none_of_the_entries_have_upgraded_hashes()
}
}

[TestFixture(PTableVersions.IndexV1,false)]
[TestFixture(PTableVersions.IndexV1,true)]
[TestFixture(PTableVersions.IndexV1, false)]
[TestFixture(PTableVersions.IndexV1, true)]
public class when_merging_to_ptable_v4_with_deleted_entries_from_v1 : SpecificationWithDirectoryPerTestFixture
{
private readonly List<string> _files = new List<string>();
private readonly List<PTable> _tables = new List<PTable>();
private IHasher hasher;
private string _newtableFile;

private PTable _newtable;
private byte _fromVersion;
private bool _skipIndexVerify;

public when_merging_to_ptable_v4_with_deleted_entries_from_v1(byte fromVersion, bool skipIndexVerify)
{
_fromVersion = fromVersion;
_skipIndexVerify = skipIndexVerify;
}

[OneTimeSetUp]
public override void TestFixtureSetUp()
{
hasher = new Murmur3AUnsafe();
base.TestFixtureSetUp();
_files.Add(GetTempFilePath());
var table = new HashListMemTable(_fromVersion, maxSize: 20);
table.Add(0x010100000000, 0, 1);
table.Add(0x010200000000, 0, 2);
table.Add(0x010300000000, 0, 3);
table.Add(0x010300000000, 1, 4);
_tables.Add(PTable.FromMemtable(table, GetTempFilePath(), skipIndexVerify: _skipIndexVerify));
table = new HashListMemTable(_fromVersion, maxSize: 20);
table.Add(0x010100000000, 2, 5);
table.Add(0x010200000000, 1, 6);
table.Add(0x010200000000, 2, 7);
table.Add(0x010400000000, 0, 8);
table.Add(0x010400000000, 1, 9);
_tables.Add(PTable.FromMemtable(table, GetTempFilePath(), skipIndexVerify: _skipIndexVerify));
table = new HashListMemTable(_fromVersion, maxSize: 20);
table.Add(0x010100000000, 1, 10);
table.Add(0x010100000000, 2, 11);
table.Add(0x010500000000, 1, 12);
table.Add(0x010500000000, 2, 13);
table.Add(0x010500000000, 3, 14);
_tables.Add(PTable.FromMemtable(table, GetTempFilePath(), skipIndexVerify: _skipIndexVerify));
_newtableFile = GetTempFilePath();
_newtable = PTable.MergeTo(_tables, _newtableFile, (streamId, hash) => hash << 32 | hasher.Hash(streamId), x => x.Position % 2 == 0, x => new Tuple<string, bool>(x.Stream.ToString(), x.Position % 2 == 0), PTableVersions.IndexV4, skipIndexVerify: _skipIndexVerify);
}

[OneTimeTearDown]
public override void TestFixtureTearDown()
{
_newtable.Dispose();
foreach (var ssTable in _tables)
{
ssTable.Dispose();
}
base.TestFixtureTearDown();
}

[Test]
public void merged_ptable_is_64bit()
{
Assert.AreEqual(PTableVersions.IndexV4, _newtable.Version);
}

[Test]
public void there_are_7_records_in_the_merged_index()
{
Assert.AreEqual(7, _newtable.Count);
}

[Test]
public void midpoints_are_cached_in_ptable_footer()
{
var numIndexEntries = 7;
var requiredMidpoints = PTable.GetRequiredMidpointCountCached(numIndexEntries, PTableVersions.IndexV4);

var newTableFileCopy = GetTempFilePath();
File.Copy(_newtableFile, newTableFileCopy);
using (var filestream = File.Open(newTableFileCopy, FileMode.Open, FileAccess.Read))
{
var footerSize = PTableFooter.GetSize(PTableVersions.IndexV4);
Assert.AreEqual(filestream.Length, PTableHeader.Size + numIndexEntries * PTable.IndexEntryV4Size + requiredMidpoints * PTable.IndexEntryV4Size + footerSize + PTable.MD5Size);
filestream.Seek(PTableHeader.Size + numIndexEntries * PTable.IndexEntryV4Size + requiredMidpoints * PTable.IndexEntryV4Size, SeekOrigin.Begin);

var ptableFooter = PTableFooter.FromStream(filestream);
Assert.AreEqual(FileType.PTableFile, ptableFooter.FileType);
Assert.AreEqual(PTableVersions.IndexV4, ptableFooter.Version);
Assert.AreEqual(requiredMidpoints, ptableFooter.NumMidpointsCached);
}
}

[Test]
public void correct_number_of_midpoints_are_loaded()
{
Assert.AreEqual(_newtable.GetMidPoints().Length, PTable.GetRequiredMidpointCountCached(7, PTableVersions.IndexV4));
}

[Test]
public void the_items_are_sorted()
{
var last = new IndexEntry(ulong.MaxValue, 0, long.MaxValue);
foreach (var item in _newtable.IterateAllInOrder())
{
Assert.IsTrue((last.Stream == item.Stream ? last.Version > item.Version : last.Stream > item.Stream) ||
((last.Stream == item.Stream && last.Version == item.Version) && last.Position > item.Position));
last = item;
}
}
}

[TestFixture(PTableVersions.IndexV2,false)]
[TestFixture(PTableVersions.IndexV2,true)]
[TestFixture(PTableVersions.IndexV3,false)]
Expand Down Expand Up @@ -215,15 +325,15 @@ public void merged_ptable_is_64bit()
}

[Test]
public void there_are_7_records_in_the_merged_index()
public void there_are_14_records_in_the_merged_index()
{
Assert.AreEqual(7, _newtable.Count);
Assert.AreEqual(14, _newtable.Count);
}

[Test]
public void midpoints_are_cached_in_ptable_footer()
{
var numIndexEntries = 7;
var numIndexEntries = 14;
var requiredMidpoints = PTable.GetRequiredMidpointCountCached(numIndexEntries,PTableVersions.IndexV4);

var newTableFileCopy = GetTempFilePath();
Expand All @@ -244,7 +354,7 @@ public void midpoints_are_cached_in_ptable_footer()
[Test]
public void correct_number_of_midpoints_are_loaded()
{
Assert.AreEqual(_newtable.GetMidPoints().Length, PTable.GetRequiredMidpointCountCached(7,PTableVersions.IndexV4));
Assert.AreEqual(_newtable.GetMidPoints().Length, PTable.GetRequiredMidpointCountCached(14,PTableVersions.IndexV4));
}

[Test]
Expand Down
Expand Up @@ -5,7 +5,7 @@ namespace EventStore.Core.Tests.Index.IndexV4
{
[TestFixture(PTableVersions.IndexV4,false)]
[TestFixture(PTableVersions.IndexV4,true)]
public class when_merging_ptables_with_entries_to_nonexisting_record: IndexV1.when_merging_ptables_with_entries_to_nonexisting_record
public class when_merging_ptables_with_entries_to_nonexisting_record: IndexV1.when_merging_ptables_with_entries_to_nonexisting_record_in_newer_index_versions
{
public when_merging_ptables_with_entries_to_nonexisting_record(byte version, bool skipIndexVerify):base(version,skipIndexVerify)
{
Expand Down
@@ -0,0 +1,126 @@
using System.IO;
using System.Linq;
using System.Threading;
using EventStore.Core.Index;
using EventStore.Core.Index.Hashes;
using EventStore.Core.Tests.Fakes;
using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers;
using EventStore.Core.TransactionLog;
using NUnit.Framework;

namespace EventStore.Core.Tests.Index.Scavenge
{
[TestFixture(false)]
[TestFixture(true)]
class when_scavenging_a_table_index : SpecificationWithDirectoryPerTestFixture
{
private TableIndex _tableIndex;
private IHasher _lowHasher;
private IHasher _highHasher;
private string _indexDir;
private FakeTFScavengerLog _log;
private static readonly long[] Deleted = { 200, 300, 500 };
private bool _skipIndexVerify;

public when_scavenging_a_table_index(bool skipIndexVerify)
{
_skipIndexVerify = skipIndexVerify;
}

[OneTimeSetUp]
public override void TestFixtureSetUp()
{
base.TestFixtureSetUp();

_indexDir = PathName;

var fakeReader = new TFReaderLease(new FakeIndexReader(l => !Deleted.Contains(l)));

_lowHasher = new XXHashUnsafe();
_highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher,
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
PTableVersions.IndexV4,
maxSizeForMemory: 2,
maxTablesPerLevel: 5, skipIndexVerify: _skipIndexVerify);
_tableIndex.Initialize(long.MaxValue);


_tableIndex.Add(1, "testStream-1", 0, 0);
_tableIndex.Add(1, "testStream-1", 1, 100);
_tableIndex.Add(1, "testStream-1", 2, 200);
_tableIndex.Add(1, "testStream-1", 3, 300);
_tableIndex.Add(1, "testStream-1", 4, 400);
_tableIndex.Add(1, "testStream-1", 5, 500);

_log = new FakeTFScavengerLog();
_tableIndex.Scavenge(_log, CancellationToken.None);

// Check it's loadable.
_tableIndex.Close(false);

_tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher,
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
PTableVersions.IndexV4,
maxSizeForMemory: 2,
maxTablesPerLevel: 5);

_tableIndex.Initialize(long.MaxValue);

}

[OneTimeTearDown]
public override void TestFixtureTearDown()
{
_tableIndex.Close();

base.TestFixtureTearDown();
}

[Test]
public void should_have_logged_each_index_table()
{
Assert.That(_log.ScavengedIndices.Count, Is.EqualTo(3));
Assert.That(_log.ScavengedIndices[0].Scavenged, Is.True);
Assert.That(_log.ScavengedIndices[0].Error, Is.Null);
Assert.That(_log.ScavengedIndices[0].EntriesDeleted, Is.EqualTo(1));
Assert.That(_log.ScavengedIndices[1].Scavenged, Is.True);
Assert.That(_log.ScavengedIndices[1].Error, Is.Null);
Assert.That(_log.ScavengedIndices[1].EntriesDeleted, Is.EqualTo(2));
Assert.That(_log.ScavengedIndices[2].Scavenged, Is.False);
Assert.That(_log.ScavengedIndices[2].Error, Is.Empty);
Assert.That(_log.ScavengedIndices[2].EntriesDeleted, Is.EqualTo(0));
}

[Test]
public void should_have_entries_in_sorted_order()
{
var streamId = "testStream-1";
var result = _tableIndex.GetRange(streamId, 0, 5).ToArray();
var hash = (ulong)_lowHasher.Hash(streamId) << 32 | _highHasher.Hash(streamId);

Assert.That(result.Count(), Is.EqualTo(3));

Assert.That(result[0].Stream, Is.EqualTo(hash));
Assert.That(result[0].Version, Is.EqualTo(4));
Assert.That(result[0].Position, Is.EqualTo(400));

Assert.That(result[1].Stream, Is.EqualTo(hash));
Assert.That(result[1].Version, Is.EqualTo(1));
Assert.That(result[1].Position, Is.EqualTo(100));

Assert.That(result[2].Stream, Is.EqualTo(hash));
Assert.That(result[2].Version, Is.EqualTo(0));
Assert.That(result[2].Position, Is.EqualTo(0));
}


[Test]
public void old_index_tables_are_deleted()
{
Assert.That(Directory.EnumerateFiles(_indexDir).Count(), Is.EqualTo(4), "Expected IndexMap and 3 tables.");
}
}
}

0 comments on commit ae4d359

Please sign in to comment.