diff --git a/src/EventStore.Core.Tests/EventStore.Core.Tests.csproj b/src/EventStore.Core.Tests/EventStore.Core.Tests.csproj index ae5f6643270..10d7401b078 100644 --- a/src/EventStore.Core.Tests/EventStore.Core.Tests.csproj +++ b/src/EventStore.Core.Tests/EventStore.Core.Tests.csproj @@ -278,6 +278,7 @@ + @@ -314,6 +315,15 @@ + + + + + + + + + diff --git a/src/EventStore.Core.Tests/Index/FakeIndexReader.cs b/src/EventStore.Core.Tests/Index/FakeIndexReader.cs index 8531876a748..5344bce0690 100644 --- a/src/EventStore.Core.Tests/Index/FakeIndexReader.cs +++ b/src/EventStore.Core.Tests/Index/FakeIndexReader.cs @@ -6,6 +6,13 @@ namespace EventStore.Core.Tests.Fakes { public class FakeIndexReader : ITransactionFileReader { + private readonly Func _existsAt; + + public FakeIndexReader(Func existsAt = null) + { + _existsAt = existsAt ?? (l => true); + } + public void Reposition(long position) { throw new NotImplementedException(); @@ -29,7 +36,7 @@ public RecordReadResult TryReadAt(long position) public bool ExistsAt(long position) { - return true; + return _existsAt(position); } } } diff --git a/src/EventStore.Core.Tests/Index/IndexV1/when_merging_ptables.cs b/src/EventStore.Core.Tests/Index/IndexV1/when_merging_ptables.cs index 657395214e2..a306e3d3fde 100644 --- a/src/EventStore.Core.Tests/Index/IndexV1/when_merging_ptables.cs +++ b/src/EventStore.Core.Tests/Index/IndexV1/when_merging_ptables.cs @@ -277,9 +277,11 @@ public void merged_ptable_is_64bit() } [Test] - public void there_are_5_records_in_the_merged_index() + public void there_are_8_records_in_the_merged_index() { - Assert.AreEqual(5, _newtable.Count); + // 5 from the 64 bit table (existsAt doesn't get used) + // 3 from the 32 bit table (3 even positions) + Assert.AreEqual(8, _newtable.Count); } [Test] @@ -357,9 +359,12 @@ public void merged_ptable_is_64bit() } [Test] - public void there_are_7_records_in_the_merged_index() + public void there_are_10_records_in_the_merged_index() { - Assert.AreEqual(7, _newtable.Count); + // 5 from 64 bit (existsAt not called) + // 2 from first table (2 even positions) + // 3 from last table (3 even positions) + Assert.AreEqual(10, _newtable.Count); } [Test] diff --git a/src/EventStore.Core.Tests/Index/IndexV1/when_merging_ptables_with_entries_to_nonexisting_record.cs b/src/EventStore.Core.Tests/Index/IndexV1/when_merging_ptables_with_entries_to_nonexisting_record.cs index ecd87e344df..5c8d7809d73 100644 --- a/src/EventStore.Core.Tests/Index/IndexV1/when_merging_ptables_with_entries_to_nonexisting_record.cs +++ b/src/EventStore.Core.Tests/Index/IndexV1/when_merging_ptables_with_entries_to_nonexisting_record.cs @@ -6,13 +6,11 @@ namespace EventStore.Core.Tests.Index.IndexV1 { - [TestFixture(PTableVersions.IndexV1,false)] - [TestFixture(PTableVersions.IndexV1,true)] [TestFixture(PTableVersions.IndexV2,false)] [TestFixture(PTableVersions.IndexV2,true)] [TestFixture(PTableVersions.IndexV3,false)] [TestFixture(PTableVersions.IndexV3,true)] - public class when_merging_ptables_with_entries_to_nonexisting_record: SpecificationWithDirectoryPerTestFixture + public class when_merging_ptables_with_entries_to_nonexisting_record_in_newer_index_versions: SpecificationWithDirectoryPerTestFixture { private readonly List _files = new List(); private readonly List _tables = new List(); @@ -21,7 +19,7 @@ public class when_merging_ptables_with_entries_to_nonexisting_record: Specificat private bool _skipIndexVerify; - public when_merging_ptables_with_entries_to_nonexisting_record(byte version, bool skipIndexVerify){ + public when_merging_ptables_with_entries_to_nonexisting_record_in_newer_index_versions(byte version, bool skipIndexVerify){ _ptableVersion = version; _skipIndexVerify = skipIndexVerify; } @@ -57,9 +55,9 @@ public override void TestFixtureTearDown() } [Test] - public void there_are_only_twenty_entries_left() + public void all_entries_are_left() { - Assert.AreEqual(20, _newtable.Count); + Assert.AreEqual(40, _newtable.Count); } [Test] @@ -82,13 +80,8 @@ public void the_right_items_are_deleted() for (int j = 0; j < 10; j++) { long position; - if ((i*10 + j)%2 == 0) - { - Assert.IsTrue(_newtable.TryGetOneValue((ulong)(0x010100000000 << i), j, out position)); - Assert.AreEqual(i*10+j, position); - } - else - Assert.IsFalse(_newtable.TryGetOneValue((ulong)(0x010100000000 << i), j, out position)); + Assert.IsTrue(_newtable.TryGetOneValue((ulong)(0x010100000000 << i), j, out position)); + Assert.AreEqual(i*10+j, position); } } } diff --git a/src/EventStore.Core.Tests/Index/IndexV4/when_merging_ptables_vx_to_v4.cs b/src/EventStore.Core.Tests/Index/IndexV4/when_merging_ptables_vx_to_v4.cs index 5e5ef729eda..85e8b1f69bc 100644 --- a/src/EventStore.Core.Tests/Index/IndexV4/when_merging_ptables_vx_to_v4.cs +++ b/src/EventStore.Core.Tests/Index/IndexV4/when_merging_ptables_vx_to_v4.cs @@ -143,8 +143,118 @@ public void none_of_the_entries_have_upgraded_hashes() } } - [TestFixture(PTableVersions.IndexV1,false)] - [TestFixture(PTableVersions.IndexV1,true)] + [TestFixture(PTableVersions.IndexV1, false)] + [TestFixture(PTableVersions.IndexV1, true)] + public class when_merging_to_ptable_v4_with_deleted_entries_from_v1 : SpecificationWithDirectoryPerTestFixture + { + private readonly List _files = new List(); + private readonly List _tables = new List(); + private IHasher hasher; + private string _newtableFile; + + private PTable _newtable; + private byte _fromVersion; + private bool _skipIndexVerify; + + public when_merging_to_ptable_v4_with_deleted_entries_from_v1(byte fromVersion, bool skipIndexVerify) + { + _fromVersion = fromVersion; + _skipIndexVerify = skipIndexVerify; + } + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + hasher = new Murmur3AUnsafe(); + base.TestFixtureSetUp(); + _files.Add(GetTempFilePath()); + var table = new HashListMemTable(_fromVersion, maxSize: 20); + table.Add(0x010100000000, 0, 1); + table.Add(0x010200000000, 0, 2); + table.Add(0x010300000000, 0, 3); + table.Add(0x010300000000, 1, 4); + _tables.Add(PTable.FromMemtable(table, GetTempFilePath(), skipIndexVerify: _skipIndexVerify)); + table = new HashListMemTable(_fromVersion, maxSize: 20); + table.Add(0x010100000000, 2, 5); + table.Add(0x010200000000, 1, 6); + table.Add(0x010200000000, 2, 7); + table.Add(0x010400000000, 0, 8); + table.Add(0x010400000000, 1, 9); + _tables.Add(PTable.FromMemtable(table, GetTempFilePath(), skipIndexVerify: _skipIndexVerify)); + table = new HashListMemTable(_fromVersion, maxSize: 20); + table.Add(0x010100000000, 1, 10); + table.Add(0x010100000000, 2, 11); + table.Add(0x010500000000, 1, 12); + table.Add(0x010500000000, 2, 13); + table.Add(0x010500000000, 3, 14); + _tables.Add(PTable.FromMemtable(table, GetTempFilePath(), skipIndexVerify: _skipIndexVerify)); + _newtableFile = GetTempFilePath(); + _newtable = PTable.MergeTo(_tables, _newtableFile, (streamId, hash) => hash << 32 | hasher.Hash(streamId), x => x.Position % 2 == 0, x => new Tuple(x.Stream.ToString(), x.Position % 2 == 0), PTableVersions.IndexV4, skipIndexVerify: _skipIndexVerify); + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _newtable.Dispose(); + foreach (var ssTable in _tables) + { + ssTable.Dispose(); + } + base.TestFixtureTearDown(); + } + + [Test] + public void merged_ptable_is_64bit() + { + Assert.AreEqual(PTableVersions.IndexV4, _newtable.Version); + } + + [Test] + public void there_are_7_records_in_the_merged_index() + { + Assert.AreEqual(7, _newtable.Count); + } + + [Test] + public void midpoints_are_cached_in_ptable_footer() + { + var numIndexEntries = 7; + var requiredMidpoints = PTable.GetRequiredMidpointCountCached(numIndexEntries, PTableVersions.IndexV4); + + var newTableFileCopy = GetTempFilePath(); + File.Copy(_newtableFile, newTableFileCopy); + using (var filestream = File.Open(newTableFileCopy, FileMode.Open, FileAccess.Read)) + { + var footerSize = PTableFooter.GetSize(PTableVersions.IndexV4); + Assert.AreEqual(filestream.Length, PTableHeader.Size + numIndexEntries * PTable.IndexEntryV4Size + requiredMidpoints * PTable.IndexEntryV4Size + footerSize + PTable.MD5Size); + filestream.Seek(PTableHeader.Size + numIndexEntries * PTable.IndexEntryV4Size + requiredMidpoints * PTable.IndexEntryV4Size, SeekOrigin.Begin); + + var ptableFooter = PTableFooter.FromStream(filestream); + Assert.AreEqual(FileType.PTableFile, ptableFooter.FileType); + Assert.AreEqual(PTableVersions.IndexV4, ptableFooter.Version); + Assert.AreEqual(requiredMidpoints, ptableFooter.NumMidpointsCached); + } + } + + [Test] + public void correct_number_of_midpoints_are_loaded() + { + Assert.AreEqual(_newtable.GetMidPoints().Length, PTable.GetRequiredMidpointCountCached(7, PTableVersions.IndexV4)); + } + + [Test] + public void the_items_are_sorted() + { + var last = new IndexEntry(ulong.MaxValue, 0, long.MaxValue); + foreach (var item in _newtable.IterateAllInOrder()) + { + Assert.IsTrue((last.Stream == item.Stream ? last.Version > item.Version : last.Stream > item.Stream) || + ((last.Stream == item.Stream && last.Version == item.Version) && last.Position > item.Position)); + last = item; + } + } + } + [TestFixture(PTableVersions.IndexV2,false)] [TestFixture(PTableVersions.IndexV2,true)] [TestFixture(PTableVersions.IndexV3,false)] @@ -215,15 +325,15 @@ public void merged_ptable_is_64bit() } [Test] - public void there_are_7_records_in_the_merged_index() + public void there_are_14_records_in_the_merged_index() { - Assert.AreEqual(7, _newtable.Count); + Assert.AreEqual(14, _newtable.Count); } [Test] public void midpoints_are_cached_in_ptable_footer() { - var numIndexEntries = 7; + var numIndexEntries = 14; var requiredMidpoints = PTable.GetRequiredMidpointCountCached(numIndexEntries,PTableVersions.IndexV4); var newTableFileCopy = GetTempFilePath(); @@ -244,7 +354,7 @@ public void midpoints_are_cached_in_ptable_footer() [Test] public void correct_number_of_midpoints_are_loaded() { - Assert.AreEqual(_newtable.GetMidPoints().Length, PTable.GetRequiredMidpointCountCached(7,PTableVersions.IndexV4)); + Assert.AreEqual(_newtable.GetMidPoints().Length, PTable.GetRequiredMidpointCountCached(14,PTableVersions.IndexV4)); } [Test] diff --git a/src/EventStore.Core.Tests/Index/IndexV4/when_merging_ptables_with_entries_to_nonexisting_record.cs b/src/EventStore.Core.Tests/Index/IndexV4/when_merging_ptables_with_entries_to_nonexisting_record.cs index 9ef48957188..9fea1a0b48f 100644 --- a/src/EventStore.Core.Tests/Index/IndexV4/when_merging_ptables_with_entries_to_nonexisting_record.cs +++ b/src/EventStore.Core.Tests/Index/IndexV4/when_merging_ptables_with_entries_to_nonexisting_record.cs @@ -5,7 +5,7 @@ namespace EventStore.Core.Tests.Index.IndexV4 { [TestFixture(PTableVersions.IndexV4,false)] [TestFixture(PTableVersions.IndexV4,true)] - public class when_merging_ptables_with_entries_to_nonexisting_record: IndexV1.when_merging_ptables_with_entries_to_nonexisting_record + public class when_merging_ptables_with_entries_to_nonexisting_record: IndexV1.when_merging_ptables_with_entries_to_nonexisting_record_in_newer_index_versions { public when_merging_ptables_with_entries_to_nonexisting_record(byte version, bool skipIndexVerify):base(version,skipIndexVerify) { diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index.cs new file mode 100644 index 00000000000..b0024283c7c --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index.cs @@ -0,0 +1,126 @@ +using System.IO; +using System.Linq; +using System.Threading; +using EventStore.Core.Index; +using EventStore.Core.Index.Hashes; +using EventStore.Core.Tests.Fakes; +using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; +using EventStore.Core.TransactionLog; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture(false)] + [TestFixture(true)] + class when_scavenging_a_table_index : SpecificationWithDirectoryPerTestFixture + { + private TableIndex _tableIndex; + private IHasher _lowHasher; + private IHasher _highHasher; + private string _indexDir; + private FakeTFScavengerLog _log; + private static readonly long[] Deleted = { 200, 300, 500 }; + private bool _skipIndexVerify; + + public when_scavenging_a_table_index(bool skipIndexVerify) + { + _skipIndexVerify = skipIndexVerify; + } + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + base.TestFixtureSetUp(); + + _indexDir = PathName; + + var fakeReader = new TFReaderLease(new FakeIndexReader(l => !Deleted.Contains(l))); + + _lowHasher = new XXHashUnsafe(); + _highHasher = new Murmur3AUnsafe(); + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => fakeReader, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5, skipIndexVerify: _skipIndexVerify); + _tableIndex.Initialize(long.MaxValue); + + + _tableIndex.Add(1, "testStream-1", 0, 0); + _tableIndex.Add(1, "testStream-1", 1, 100); + _tableIndex.Add(1, "testStream-1", 2, 200); + _tableIndex.Add(1, "testStream-1", 3, 300); + _tableIndex.Add(1, "testStream-1", 4, 400); + _tableIndex.Add(1, "testStream-1", 5, 500); + + _log = new FakeTFScavengerLog(); + _tableIndex.Scavenge(_log, CancellationToken.None); + + // Check it's loadable. + _tableIndex.Close(false); + + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => fakeReader, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5); + + _tableIndex.Initialize(long.MaxValue); + + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _tableIndex.Close(); + + base.TestFixtureTearDown(); + } + + [Test] + public void should_have_logged_each_index_table() + { + Assert.That(_log.ScavengedIndices.Count, Is.EqualTo(3)); + Assert.That(_log.ScavengedIndices[0].Scavenged, Is.True); + Assert.That(_log.ScavengedIndices[0].Error, Is.Null); + Assert.That(_log.ScavengedIndices[0].EntriesDeleted, Is.EqualTo(1)); + Assert.That(_log.ScavengedIndices[1].Scavenged, Is.True); + Assert.That(_log.ScavengedIndices[1].Error, Is.Null); + Assert.That(_log.ScavengedIndices[1].EntriesDeleted, Is.EqualTo(2)); + Assert.That(_log.ScavengedIndices[2].Scavenged, Is.False); + Assert.That(_log.ScavengedIndices[2].Error, Is.Empty); + Assert.That(_log.ScavengedIndices[2].EntriesDeleted, Is.EqualTo(0)); + } + + [Test] + public void should_have_entries_in_sorted_order() + { + var streamId = "testStream-1"; + var result = _tableIndex.GetRange(streamId, 0, 5).ToArray(); + var hash = (ulong)_lowHasher.Hash(streamId) << 32 | _highHasher.Hash(streamId); + + Assert.That(result.Count(), Is.EqualTo(3)); + + Assert.That(result[0].Stream, Is.EqualTo(hash)); + Assert.That(result[0].Version, Is.EqualTo(4)); + Assert.That(result[0].Position, Is.EqualTo(400)); + + Assert.That(result[1].Stream, Is.EqualTo(hash)); + Assert.That(result[1].Version, Is.EqualTo(1)); + Assert.That(result[1].Position, Is.EqualTo(100)); + + Assert.That(result[2].Stream, Is.EqualTo(hash)); + Assert.That(result[2].Version, Is.EqualTo(0)); + Assert.That(result[2].Position, Is.EqualTo(0)); + } + + + [Test] + public void old_index_tables_are_deleted() + { + Assert.That(Directory.EnumerateFiles(_indexDir).Count(), Is.EqualTo(4), "Expected IndexMap and 3 tables."); + } + } +} diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_and_another_table_is_completed_during.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_and_another_table_is_completed_during.cs new file mode 100644 index 00000000000..123d18deb01 --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_and_another_table_is_completed_during.cs @@ -0,0 +1,136 @@ +using System; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Index; +using EventStore.Core.Index.Hashes; +using EventStore.Core.Tests.Fakes; +using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; +using EventStore.Core.TransactionLog; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture] + class when_scavenging_a_table_index_and_another_table_is_completed_during : SpecificationWithDirectoryPerTestFixture + { + private TableIndex _tableIndex; + private IHasher _lowHasher; + private IHasher _highHasher; + private string _indexDir; + private FakeTFScavengerLog _log; + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + base.TestFixtureSetUp(); + + _indexDir = PathName; + + var scavengeBlocker = new ManualResetEventSlim(false); + var scavengeStarted = new ManualResetEventSlim(false); + + var fakeReader = new TFReaderLease(new FakeIndexReader(l => + { + scavengeStarted.Set(); + if(!scavengeBlocker.Wait(5000)) + throw new Exception("Failed to continue."); + return false; + })); + + _lowHasher = new XXHashUnsafe(); + _highHasher = new Murmur3AUnsafe(); + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => fakeReader, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5); + _tableIndex.Initialize(long.MaxValue); + + + _tableIndex.Add(1, "testStream-1", 0, 0); + _tableIndex.Add(1, "testStream-1", 1, 100); + _tableIndex.WaitForBackgroundTasks(); + + _log = new FakeTFScavengerLog(); + var task = Task.Run(() => _tableIndex.Scavenge(_log, CancellationToken.None)); + + Assert.That(scavengeStarted.Wait(5000)); + + // Add enough for 2 more tables + _tableIndex.Add(1, "testStream-1", 2, 200); + _tableIndex.Add(1, "testStream-1", 3, 300); + _tableIndex.Add(1, "testStream-1", 4, 400); + _tableIndex.Add(1, "testStream-1", 5, 500); + + // Release the scavenge process + scavengeBlocker.Set(); + task.Wait(); + + // Check it's loadable. + _tableIndex.Close(false); + + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => fakeReader, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5); + + _tableIndex.Initialize(long.MaxValue); + + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _tableIndex.Close(); + + base.TestFixtureTearDown(); + } + + [Test] + public void should_have_logged_each_index_table() + { + Assert.That(_log.ScavengedIndices.Count, Is.EqualTo(1)); + Assert.That(_log.ScavengedIndices[0].Scavenged, Is.True); + Assert.That(_log.ScavengedIndices[0].Error, Is.Null); + Assert.That(_log.ScavengedIndices[0].EntriesDeleted, Is.EqualTo(2)); + } + + [Test] + public void should_still_have_all_entries_in_sorted_order() + { + var streamId = "testStream-1"; + var result = _tableIndex.GetRange(streamId, 0, 5).ToArray(); + var hash = (ulong)_lowHasher.Hash(streamId) << 32 | _highHasher.Hash(streamId); + + Assert.That(result.Count(), Is.EqualTo(4)); + + Assert.That(result[0].Stream, Is.EqualTo(hash)); + Assert.That(result[0].Version, Is.EqualTo(5)); + Assert.That(result[0].Position, Is.EqualTo(500)); + + Assert.That(result[1].Stream, Is.EqualTo(hash)); + Assert.That(result[1].Version, Is.EqualTo(4)); + Assert.That(result[1].Position, Is.EqualTo(400)); + + Assert.That(result[2].Stream, Is.EqualTo(hash)); + Assert.That(result[2].Version, Is.EqualTo(3)); + Assert.That(result[2].Position, Is.EqualTo(300)); + + Assert.That(result[3].Stream, Is.EqualTo(hash)); + Assert.That(result[3].Version, Is.EqualTo(2)); + Assert.That(result[3].Position, Is.EqualTo(200)); + } + + + [Test] + public void all_tables_are_written_to_disk() + { + Assert.That(Directory.EnumerateFiles(_indexDir).Count(), Is.EqualTo(4), "Expected IndexMap and 3 tables."); + } + } +} \ No newline at end of file diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_cancelled_while_scavenging_table.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_cancelled_while_scavenging_table.cs new file mode 100644 index 00000000000..3da1e6705ef --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_cancelled_while_scavenging_table.cs @@ -0,0 +1,124 @@ +using System; +using System.Linq; +using System.Threading; +using EventStore.Core.Index; +using EventStore.Core.Index.Hashes; +using EventStore.Core.Tests.Fakes; +using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; +using EventStore.Core.TransactionLog; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture] + class when_scavenging_a_table_index_cancelled_while_scavenging_table : SpecificationWithDirectoryPerTestFixture + { + private TableIndex _tableIndex; + private IHasher _lowHasher; + private IHasher _highHasher; + private string _indexDir; + private FakeTFScavengerLog _log; + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + base.TestFixtureSetUp(); + + _indexDir = PathName; + + var cancellationTokenSource = new CancellationTokenSource(); + + var fakeReader = new TFReaderLease(new FakeIndexReader(l => + { + cancellationTokenSource.Cancel(); + return true; + })); + + _lowHasher = new XXHashUnsafe(); + _highHasher = new Murmur3AUnsafe(); + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => fakeReader, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5); + _tableIndex.Initialize(long.MaxValue); + + _tableIndex.Add(1, "testStream-1", 0, 0); + _tableIndex.Add(1, "testStream-1", 1, 100); + _tableIndex.Add(1, "testStream-1", 2, 200); + _tableIndex.Add(1, "testStream-1", 3, 300); + _tableIndex.Add(1, "testStream-1", 4, 400); + _tableIndex.Add(1, "testStream-1", 5, 500); + + _log = new FakeTFScavengerLog(); + + + Assert.That(() => _tableIndex.Scavenge(_log, cancellationTokenSource.Token), Throws.InstanceOf()); + + // Check it's loadable still. + _tableIndex.Close(false); + + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => fakeReader, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5); + + _tableIndex.Initialize(long.MaxValue); + + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _tableIndex.Close(); + + base.TestFixtureTearDown(); + } + + [Test] + public void should_have_logged_a_failure() + { + Assert.That(_log.ScavengedIndices.Count, Is.EqualTo(1)); + Assert.That(_log.ScavengedIndices[0].Scavenged, Is.False); + Assert.That(_log.ScavengedIndices[0].Error, Is.EqualTo("Scavenge cancelled")); + Assert.That(_log.ScavengedIndices[0].EntriesDeleted, Is.EqualTo(0)); + } + + [Test] + public void should_still_have_all_entries_in_sorted_order() + { + var streamId = "testStream-1"; + var result = _tableIndex.GetRange(streamId, 0, 5).ToArray(); + var hash = (ulong)_lowHasher.Hash(streamId) << 32 | _highHasher.Hash(streamId); + + Assert.That(result.Count(), Is.EqualTo(6)); + + Assert.That(result[0].Stream, Is.EqualTo(hash)); + Assert.That(result[0].Version, Is.EqualTo(5)); + Assert.That(result[0].Position, Is.EqualTo(500)); + + Assert.That(result[1].Stream, Is.EqualTo(hash)); + Assert.That(result[1].Version, Is.EqualTo(4)); + Assert.That(result[1].Position, Is.EqualTo(400)); + + Assert.That(result[2].Stream, Is.EqualTo(hash)); + Assert.That(result[2].Version, Is.EqualTo(3)); + Assert.That(result[2].Position, Is.EqualTo(300)); + + Assert.That(result[3].Stream, Is.EqualTo(hash)); + Assert.That(result[3].Version, Is.EqualTo(2)); + Assert.That(result[3].Position, Is.EqualTo(200)); + + Assert.That(result[4].Stream, Is.EqualTo(hash)); + Assert.That(result[4].Version, Is.EqualTo(1)); + Assert.That(result[4].Position, Is.EqualTo(100)); + + Assert.That(result[5].Stream, Is.EqualTo(hash)); + Assert.That(result[5].Version, Is.EqualTo(0)); + Assert.That(result[5].Position, Is.EqualTo(0)); + } + } +} \ No newline at end of file diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_cancelled_while_waiting_for_lock.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_cancelled_while_waiting_for_lock.cs new file mode 100644 index 00000000000..0791a0a2e86 --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_cancelled_while_waiting_for_lock.cs @@ -0,0 +1,110 @@ +using System; +using System.Linq; +using System.Threading; +using EventStore.Core.Index; +using EventStore.Core.Index.Hashes; +using EventStore.Core.Tests.Fakes; +using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; +using EventStore.Core.TransactionLog; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture] + class when_scavenging_a_table_index_cancelled_while_waiting_for_lock : SpecificationWithDirectoryPerTestFixture + { + private TableIndex _tableIndex; + private IHasher _lowHasher; + private IHasher _highHasher; + private string _indexDir; + private FakeTFScavengerLog _log; + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + base.TestFixtureSetUp(); + + _indexDir = PathName; + + var fakeReader = new TFReaderLease(new FakeIndexReader()); + _lowHasher = new XXHashUnsafe(); + _highHasher = new Murmur3AUnsafe(); + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => fakeReader, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5); + _tableIndex.Initialize(long.MaxValue); + + _tableIndex.Add(1, "testStream-1", 0, 0); + _tableIndex.Add(1, "testStream-1", 1, 100); + _tableIndex.Add(1, "testStream-1", 2, 200); + _tableIndex.Add(1, "testStream-1", 3, 300); + _tableIndex.Add(1, "testStream-1", 4, 400); + _tableIndex.Add(1, "testStream-1", 5, 500); + + _log = new FakeTFScavengerLog(); + + var cancellationTokenSource = new CancellationTokenSource(); + cancellationTokenSource.Cancel(); + + Assert.That(() => _tableIndex.Scavenge(_log, cancellationTokenSource.Token), Throws.InstanceOf()); + + // Check it's loadable still. + _tableIndex.Close(false); + + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => fakeReader, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5); + + _tableIndex.Initialize(long.MaxValue); + + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _tableIndex.Close(); + + base.TestFixtureTearDown(); + } + + [Test] + public void should_still_have_all_entries_in_sorted_order() + { + var streamId = "testStream-1"; + var result = _tableIndex.GetRange(streamId, 0, 5).ToArray(); + var hash = (ulong)_lowHasher.Hash(streamId) << 32 | _highHasher.Hash(streamId); + + Assert.That(result.Count(), Is.EqualTo(6)); + + Assert.That(result[0].Stream, Is.EqualTo(hash)); + Assert.That(result[0].Version, Is.EqualTo(5)); + Assert.That(result[0].Position, Is.EqualTo(500)); + + Assert.That(result[1].Stream, Is.EqualTo(hash)); + Assert.That(result[1].Version, Is.EqualTo(4)); + Assert.That(result[1].Position, Is.EqualTo(400)); + + Assert.That(result[2].Stream, Is.EqualTo(hash)); + Assert.That(result[2].Version, Is.EqualTo(3)); + Assert.That(result[2].Position, Is.EqualTo(300)); + + Assert.That(result[3].Stream, Is.EqualTo(hash)); + Assert.That(result[3].Version, Is.EqualTo(2)); + Assert.That(result[3].Position, Is.EqualTo(200)); + + Assert.That(result[4].Stream, Is.EqualTo(hash)); + Assert.That(result[4].Version, Is.EqualTo(1)); + Assert.That(result[4].Position, Is.EqualTo(100)); + + Assert.That(result[5].Stream, Is.EqualTo(hash)); + Assert.That(result[5].Version, Is.EqualTo(0)); + Assert.That(result[5].Position, Is.EqualTo(0)); + } + } +} \ No newline at end of file diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_fails.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_fails.cs new file mode 100644 index 00000000000..0b8fc2ef676 --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_table_index_fails.cs @@ -0,0 +1,132 @@ +using System; +using System.IO; +using System.Linq; +using System.Threading; +using EventStore.Core.Index; +using EventStore.Core.Index.Hashes; +using EventStore.Core.Tests.Fakes; +using EventStore.Core.Tests.TransactionLog.Scavenging.Helpers; +using EventStore.Core.TransactionLog; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture] + class when_scavenging_a_table_index_fails : SpecificationWithDirectoryPerTestFixture + { + private TableIndex _tableIndex; + private IHasher _lowHasher; + private IHasher _highHasher; + private string _indexDir; + private FakeTFScavengerLog _log; + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + base.TestFixtureSetUp(); + + _indexDir = PathName; + + var fakeReader = new TFReaderLease(new FakeIndexReader()); + int readerCount = 0; + _lowHasher = new XXHashUnsafe(); + _highHasher = new Murmur3AUnsafe(); + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => + { + readerCount++; + if (readerCount < 4)// One for each table add. + { + return fakeReader; + } + + throw new Exception("Expected exception"); + }, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5); + _tableIndex.Initialize(long.MaxValue); + + _tableIndex.Add(1, "testStream-1", 0, 0); + _tableIndex.Add(1, "testStream-1", 1, 100); + _tableIndex.Add(1, "testStream-1", 2, 200); + _tableIndex.Add(1, "testStream-1", 3, 300); + _tableIndex.Add(1, "testStream-1", 4, 400); + _tableIndex.Add(1, "testStream-1", 5, 500); + + _log = new FakeTFScavengerLog(); + Assert.That(() => _tableIndex.Scavenge(_log, CancellationToken.None), Throws.Exception.With.Message.EqualTo("Expected exception")); + + // Check it's loadable still. + _tableIndex.Close(false); + + _tableIndex = new TableIndex(_indexDir, _lowHasher, _highHasher, + () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5), + () => fakeReader, + PTableVersions.IndexV4, + maxSizeForMemory: 2, + maxTablesPerLevel: 5); + + _tableIndex.Initialize(long.MaxValue); + + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _tableIndex.Close(); + + base.TestFixtureTearDown(); + } + + [Test] + public void should_have_logged_a_failure() + { + Assert.That(_log.ScavengedIndices.Count, Is.EqualTo(1)); + Assert.That(_log.ScavengedIndices[0].Scavenged, Is.False); + Assert.That(_log.ScavengedIndices[0].Error, Is.EqualTo("Expected exception")); + Assert.That(_log.ScavengedIndices[0].EntriesDeleted, Is.EqualTo(0)); + } + + [Test] + public void should_still_have_all_entries_in_sorted_order() + { + var streamId = "testStream-1"; + var result = _tableIndex.GetRange(streamId, 0, 5).ToArray(); + var hash = (ulong)_lowHasher.Hash(streamId) << 32 | _highHasher.Hash(streamId); + + Assert.That(result.Count(), Is.EqualTo(6)); + + Assert.That(result[0].Stream, Is.EqualTo(hash)); + Assert.That(result[0].Version, Is.EqualTo(5)); + Assert.That(result[0].Position, Is.EqualTo(500)); + + Assert.That(result[1].Stream, Is.EqualTo(hash)); + Assert.That(result[1].Version, Is.EqualTo(4)); + Assert.That(result[1].Position, Is.EqualTo(400)); + + Assert.That(result[2].Stream, Is.EqualTo(hash)); + Assert.That(result[2].Version, Is.EqualTo(3)); + Assert.That(result[2].Position, Is.EqualTo(300)); + + Assert.That(result[3].Stream, Is.EqualTo(hash)); + Assert.That(result[3].Version, Is.EqualTo(2)); + Assert.That(result[3].Position, Is.EqualTo(200)); + + Assert.That(result[4].Stream, Is.EqualTo(hash)); + Assert.That(result[4].Version, Is.EqualTo(1)); + Assert.That(result[4].Position, Is.EqualTo(100)); + + Assert.That(result[5].Stream, Is.EqualTo(hash)); + Assert.That(result[5].Version, Is.EqualTo(0)); + Assert.That(result[5].Position, Is.EqualTo(0)); + } + + [Test] + public void old_index_tables_are_deleted() + { + Assert.That(Directory.EnumerateFiles(_indexDir).Count(), Is.EqualTo(4), "Expected IndexMap and 3 tables."); + } + } +} \ No newline at end of file diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_v1_index.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_v1_index.cs new file mode 100644 index 00000000000..d9e89422744 --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_a_v1_index.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using EventStore.Core.Index; +using EventStore.Core.Index.Hashes; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture(PTableVersions.IndexV2, false)] + [TestFixture(PTableVersions.IndexV2, true)] + [TestFixture(PTableVersions.IndexV3, false)] + [TestFixture(PTableVersions.IndexV3, true)] + [TestFixture(PTableVersions.IndexV4, false)] + [TestFixture(PTableVersions.IndexV4, true)] + public class when_scavenging_a_v1_index : SpecificationWithDirectoryPerTestFixture + { + private IHasher hasher; + + private PTable _newtable; + private readonly byte _newVersion; + private bool _skipIndexVerify; + private Func _upgradeHash; + private PTable _oldTable; + + public when_scavenging_a_v1_index(byte newVersion, bool skipIndexVerify) + { + _newVersion = newVersion; + _skipIndexVerify = skipIndexVerify; + } + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + hasher = new Murmur3AUnsafe(); + base.TestFixtureSetUp(); + + var table = new HashListMemTable(PTableVersions.IndexV1, maxSize: 20); + table.Add(0x010100000000, 0, 1); + table.Add(0x010200000000, 0, 2); + table.Add(0x010300000000, 0, 3); + table.Add(0x010300000000, 1, 4); + _oldTable = PTable.FromMemtable(table, GetTempFilePath()); + + long spaceSaved; + _upgradeHash = (streamId, hash) => hash << 32 | hasher.Hash(streamId); + Func existsAt = x => x.Position % 2 == 0; + Func> readRecord = x => new Tuple(x.Stream.ToString(), x.Position % 2 == 0); + _newtable = PTable.Scavenged(_oldTable, GetTempFilePath(), _upgradeHash, existsAt, readRecord, _newVersion, out spaceSaved, skipIndexVerify: _skipIndexVerify); + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _oldTable.Dispose(); + _newtable.Dispose(); + + base.TestFixtureTearDown(); + } + + [Test] + public void scavenged_ptable_is_new_version() + { + Assert.AreEqual(_newVersion, _newtable.Version); + } + + [Test] + public void there_are_2_records_in_the_merged_index() + { + Assert.AreEqual(2, _newtable.Count); + } + + [Test] + public void remaining_entries_should_have_been_upgraded_to_64bit_hash() + { + ulong entry1 = 0x0103; + ulong entry2 = 0x0102; + + using (var enumerator = _newtable.IterateAllInOrder().GetEnumerator()) + { + Assert.That(enumerator.MoveNext()); + Assert.That(enumerator.Current.Stream, Is.EqualTo(_upgradeHash(entry1.ToString(), entry1))); + + Assert.That(enumerator.MoveNext()); + Assert.That(enumerator.Current.Stream, Is.EqualTo(_upgradeHash(entry2.ToString(), entry2))); + } + } + + [Test] + public void the_items_are_sorted() + { + var last = new IndexEntry(ulong.MaxValue, 0, long.MaxValue); + foreach (var item in _newtable.IterateAllInOrder()) + { + Assert.IsTrue((last.Stream == item.Stream ? last.Version > item.Version : last.Stream > item.Stream) || + ((last.Stream == item.Stream && last.Version == item.Version) && last.Position > item.Position)); + last = item; + } + } + } +} \ No newline at end of file diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index.cs new file mode 100644 index 00000000000..bfc82e37667 --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index.cs @@ -0,0 +1,86 @@ +using System; +using EventStore.Core.Index; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture(PTableVersions.IndexV2, false)] + [TestFixture(PTableVersions.IndexV2, true)] + [TestFixture(PTableVersions.IndexV3, false)] + [TestFixture(PTableVersions.IndexV3, true)] + [TestFixture(PTableVersions.IndexV4, false)] + [TestFixture(PTableVersions.IndexV4, true)] + public class when_scavenging_an_index : SpecificationWithDirectoryPerTestFixture + { + private PTable _newtable; + private readonly byte _oldVersion; + private bool _skipIndexVerify; + private PTable _oldTable; + + public when_scavenging_an_index(byte oldVersion, bool skipIndexVerify) + { + _oldVersion = oldVersion; + _skipIndexVerify = skipIndexVerify; + } + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + base.TestFixtureSetUp(); + + var table = new HashListMemTable(_oldVersion, maxSize: 20); + table.Add(0x010100000000, 0, 1); + table.Add(0x010200000000, 0, 2); + table.Add(0x010300000000, 0, 3); + table.Add(0x010300000000, 1, 4); + _oldTable = PTable.FromMemtable(table, GetTempFilePath()); + + long spaceSaved; + Func existsAt = x => x.Position % 2 == 0; + Func> readRecord = x => + { + throw new Exception("Should not be called"); + }; + Func upgradeHash = (streamId, hash) => + { + throw new Exception("Should not be called"); + }; + + _newtable = PTable.Scavenged(_oldTable, GetTempFilePath(), upgradeHash, existsAt, readRecord, PTableVersions.IndexV4, out spaceSaved, skipIndexVerify: _skipIndexVerify); + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _oldTable.Dispose(); + _newtable.Dispose(); + + base.TestFixtureTearDown(); + } + + [Test] + public void scavenged_ptable_is_newest_version() + { + Assert.AreEqual(PTableVersions.IndexV4, _newtable.Version); + } + + [Test] + public void there_are_2_records_in_the_merged_index() + { + Assert.AreEqual(2, _newtable.Count); + } + + [Test] + public void the_items_are_sorted() + { + var last = new IndexEntry(ulong.MaxValue, 0, long.MaxValue); + foreach (var item in _newtable.IterateAllInOrder()) + { + Assert.IsTrue((last.Stream == item.Stream ? last.Version > item.Version : last.Stream > item.Stream) || + ((last.Stream == item.Stream && last.Version == item.Version) && last.Position > item.Position)); + last = item; + } + } + + } +} diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index_fails.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index_fails.cs new file mode 100644 index 00000000000..e1bab9d5143 --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index_fails.cs @@ -0,0 +1,58 @@ +using System; +using System.IO; +using EventStore.Core.Index; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture] + public class when_scavenging_an_index_fails : SpecificationWithDirectoryPerTestFixture + { + private PTable _oldTable; + private string _expectedOutputFile; + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + base.TestFixtureSetUp(); + + var table = new HashListMemTable(PTableVersions.IndexV4, maxSize: 20); + table.Add(0x010100000000, 0, 1); + table.Add(0x010200000000, 0, 2); + table.Add(0x010300000000, 0, 3); + table.Add(0x010300000000, 1, 4); + _oldTable = PTable.FromMemtable(table, GetTempFilePath()); + + long spaceSaved; + Func existsAt = x => + { + throw new Exception("Expected exception"); + }; + Func> readRecord = x => + { + throw new Exception("Should not be called"); + }; + Func upgradeHash = (streamId, hash) => + { + throw new Exception("Should not be called"); + }; + + _expectedOutputFile = GetTempFilePath(); + Assert.That(() => PTable.Scavenged(_oldTable, _expectedOutputFile, upgradeHash, existsAt, readRecord, PTableVersions.IndexV4, out spaceSaved), Throws.Exception.With.Message.EqualTo("Expected exception")); + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _oldTable.Dispose(); + + base.TestFixtureTearDown(); + } + + [Test] + public void the_output_file_is_deleted() + { + Assert.That(File.Exists(_expectedOutputFile), Is.False); + } + } +} \ No newline at end of file diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index_is_cancelled.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index_is_cancelled.cs new file mode 100644 index 00000000000..3c25dc90872 --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index_is_cancelled.cs @@ -0,0 +1,61 @@ +using System; +using System.IO; +using System.Threading; +using EventStore.Core.Index; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture] + public class when_scavenging_an_index_is_cancelled : SpecificationWithDirectoryPerTestFixture + { + private PTable _oldTable; + private string _expectedOutputFile; + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + base.TestFixtureSetUp(); + + var table = new HashListMemTable(PTableVersions.IndexV4, maxSize: 20); + table.Add(0x010100000000, 0, 1); + table.Add(0x010200000000, 0, 2); + table.Add(0x010300000000, 0, 3); + table.Add(0x010300000000, 1, 4); + _oldTable = PTable.FromMemtable(table, GetTempFilePath()); + + var cancellationTokenSource = new CancellationTokenSource(); + long spaceSaved; + Func existsAt = x => + { + cancellationTokenSource.Cancel(); + return true; + }; + Func> readRecord = x => + { + throw new Exception("Should not be called"); + }; + Func upgradeHash = (streamId, hash) => + { + throw new Exception("Should not be called"); + }; + + _expectedOutputFile = GetTempFilePath(); + Assert.That(() => PTable.Scavenged(_oldTable, _expectedOutputFile, upgradeHash, existsAt, readRecord, PTableVersions.IndexV4, out spaceSaved, ct: cancellationTokenSource.Token), Throws.InstanceOf()); + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _oldTable.Dispose(); + + base.TestFixtureTearDown(); + } + + [Test] + public void the_output_file_is_deleted() + { + Assert.That(File.Exists(_expectedOutputFile), Is.False); + } + } +} \ No newline at end of file diff --git a/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index_removes_nothing.cs b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index_removes_nothing.cs new file mode 100644 index 00000000000..37e794a081e --- /dev/null +++ b/src/EventStore.Core.Tests/Index/Scavenge/when_scavenging_an_index_removes_nothing.cs @@ -0,0 +1,93 @@ +using System; +using System.IO; +using EventStore.Core.Index; +using NUnit.Framework; + +namespace EventStore.Core.Tests.Index.Scavenge +{ + [TestFixture(PTableVersions.IndexV2, false)] + [TestFixture(PTableVersions.IndexV2, true)] + [TestFixture(PTableVersions.IndexV3, false)] + [TestFixture(PTableVersions.IndexV3, true)] + [TestFixture(PTableVersions.IndexV4, false)] + [TestFixture(PTableVersions.IndexV4, true)] + public class when_scavenging_an_index_removes_nothing : SpecificationWithDirectoryPerTestFixture + { + private PTable _newtable; + private readonly byte _oldVersion; + private bool _skipIndexVerify; + private PTable _oldTable; + private string _expectedOutputFile; + + public when_scavenging_an_index_removes_nothing(byte oldVersion, bool skipIndexVerify) + { + _oldVersion = oldVersion; + _skipIndexVerify = skipIndexVerify; + } + + [OneTimeSetUp] + public override void TestFixtureSetUp() + { + base.TestFixtureSetUp(); + + var table = new HashListMemTable(_oldVersion, maxSize: 20); + table.Add(0x010100000000, 0, 1); + table.Add(0x010200000000, 0, 2); + table.Add(0x010300000000, 0, 3); + table.Add(0x010300000000, 1, 4); + _oldTable = PTable.FromMemtable(table, GetTempFilePath()); + + long spaceSaved; + Func existsAt = x => true; + Func> readRecord = x => + { + throw new Exception("Should not be called"); + }; + Func upgradeHash = (streamId, hash) => + { + throw new Exception("Should not be called"); + }; + + _expectedOutputFile = GetTempFilePath(); + _newtable = PTable.Scavenged(_oldTable, _expectedOutputFile, upgradeHash, existsAt, readRecord, PTableVersions.IndexV4, out spaceSaved, skipIndexVerify: _skipIndexVerify); + } + + [OneTimeTearDown] + public override void TestFixtureTearDown() + { + _oldTable.Dispose(); + _newtable?.Dispose(); + + base.TestFixtureTearDown(); + } + + [Test] + public void a_null_object_is_returned_if_the_version_is_unchanged() + { + if (_oldVersion == PTableVersions.IndexV4) + { + Assert.IsNull(_newtable); + + } + } + + [Test] + public void the_output_file_is_deleted_if_version_is_unchanged() + { + if (_oldVersion == PTableVersions.IndexV4) + { + Assert.That(File.Exists(_expectedOutputFile), Is.False); + } + } + + [Test] + public void a_table_with_all_items_is_returned_with_a_newer_version() + { + if (_oldVersion != PTableVersions.IndexV4) + { + Assert.IsNotNull(_newtable); + Assert.That(_newtable.Count, Is.EqualTo(4)); + } + } + } +} \ No newline at end of file diff --git a/src/EventStore.Core.Tests/Services/Storage/FakeTableIndex.cs b/src/EventStore.Core.Tests/Services/Storage/FakeTableIndex.cs index 716b313dd76..a46420f9de7 100644 --- a/src/EventStore.Core.Tests/Services/Storage/FakeTableIndex.cs +++ b/src/EventStore.Core.Tests/Services/Storage/FakeTableIndex.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using EventStore.Core.Index; namespace EventStore.Core.Tests.Services.Storage @@ -7,6 +8,7 @@ namespace EventStore.Core.Tests.Services.Storage public class FakeTableIndex: ITableIndex { internal static readonly IndexEntry InvalidIndexEntry = new IndexEntry(0, -1, -1); + public int ScavengeCount { get; private set; } public long PrepareCheckpoint { get { throw new NotImplementedException(); } } public long CommitCheckpoint { get { throw new NotImplementedException(); } } @@ -51,5 +53,10 @@ public IEnumerable GetRange(string streamId, long startVersion, long { yield break; } + + public void Scavenge(IIndexScavengerLog log, CancellationToken ct) + { + ScavengeCount++; + } } } \ No newline at end of file diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/Helpers/FakeTFScavengerLog.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/Helpers/FakeTFScavengerLog.cs index 4ff9c893874..d14c3c1192d 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/Helpers/FakeTFScavengerLog.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/Helpers/FakeTFScavengerLog.cs @@ -16,9 +16,11 @@ public class FakeTFScavengerLog : ITFChunkScavengerLog public event EventHandler StartedCallback; public event EventHandler ChunkScavenged; + public event EventHandler IndexScavenged; public event EventHandler CompletedCallback; public IList Scavenged { get; } = new List(); + public IList ScavengedIndices { get; } = new List(); public void ScavengeStarted() { @@ -56,6 +58,21 @@ public void ChunksNotMerged(int chunkStartNumber, int chunkEndNumber, TimeSpan e ChunkScavenged?.Invoke(this, scavengedLog); } + public void IndexTableScavenged(int level, int index, TimeSpan elapsed, long entriesDeleted, long entriesKept, + long spaceSaved) + { + var indexScavenged = new IndexScavengedLog(true, null, entriesDeleted); + ScavengedIndices.Add(indexScavenged); + IndexScavenged?.Invoke(this, indexScavenged); + } + + public void IndexTableNotScavenged(int level, int index, TimeSpan elapsed, long entriesKept, string errorMessage) + { + var indexScavenged = new IndexScavengedLog(false, errorMessage, 0); + ScavengedIndices.Add(indexScavenged); + IndexScavenged?.Invoke(this, indexScavenged); + } + public void ScavengeCompleted(ScavengeResult result, string error, TimeSpan elapsed) { Completed = true; @@ -78,6 +95,22 @@ public ScavengedLog(int chunkStart, int chunkEnd, bool scavenged, string error) Error = error; } } + + public class IndexScavengedLog + { + public bool Scavenged { get; } + public string Error { get; } + public long EntriesDeleted { get; } + + public IndexScavengedLog(bool scavenged, string error, long entriesDeleted) + { + Scavenged = scavenged; + Error = error; + EntriesDeleted = entriesDeleted; + } + } + + } diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/Helpers/ScavengeLifeCycleScenario.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/Helpers/ScavengeLifeCycleScenario.cs index 4719251fc00..5e4861ef7b7 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/Helpers/ScavengeLifeCycleScenario.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/Helpers/ScavengeLifeCycleScenario.cs @@ -12,6 +12,7 @@ abstract class ScavengeLifeCycleScenario : SpecificationWithDirectoryPerTestFixt private DbResult _dbResult; protected TFChunkScavenger TfChunkScavenger; protected FakeTFScavengerLog Log; + protected FakeTableIndex FakeTableIndex; public override void TestFixtureSetUp() { @@ -31,7 +32,8 @@ public override void TestFixtureSetUp() _dbResult.Db.Config.ChaserCheckpoint.Flush(); Log = new FakeTFScavengerLog(); - TfChunkScavenger = new TFChunkScavenger(_dbResult.Db, Log, new FakeTableIndex(), new FakeReadIndex(_ => false)); + FakeTableIndex = new FakeTableIndex(); + TfChunkScavenger = new TFChunkScavenger(_dbResult.Db, Log, FakeTableIndex, new FakeReadIndex(_ => false)); When(); } diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_chunck_scavenged.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_chunck_scavenged.cs index a8bba07fd66..bfb0848cc79 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_chunck_scavenged.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_chunck_scavenged.cs @@ -30,5 +30,11 @@ public void scavenge_record_for_first_and_cancelled_chunk() Assert.That(Log.Scavenged[0].Scavenged, Is.True); } + + [Test] + public void doesnt_call_scavenge_on_the_table_index() + { + Assert.That(FakeTableIndex.ScavengeCount, Is.EqualTo(0)); + } } } \ No newline at end of file diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_completed.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_completed.cs index 1270845a69d..17641ac11d1 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_completed.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_completed.cs @@ -32,5 +32,11 @@ public void scavenge_record_for_all_completed_chunks_plus_merge() Assert.That(Log.Scavenged[2].Scavenged, Is.True); } + + [Test] + public void calls_scavenge_on_the_table_index() + { + Assert.That(FakeTableIndex.ScavengeCount, Is.EqualTo(1)); + } } } \ No newline at end of file diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_started.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_started.cs index b67dd5fd987..00a51525051 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_started.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_after_started.cs @@ -30,5 +30,10 @@ public void no_chunks_scavenged() Assert.That(Log.Scavenged, Is.Empty); } + [Test] + public void doesnt_call_scavenge_on_the_table_index() + { + Assert.That(FakeTableIndex.ScavengeCount, Is.EqualTo(0)); + } } } \ No newline at end of file diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_before_started.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_before_started.cs index da9152677a8..fd44d3d284b 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_before_started.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_cancelled_before_started.cs @@ -28,5 +28,11 @@ public void no_chunks_scavenged() Assert.That(Log.Scavenged, Is.Empty); } + [Test] + public void doesnt_call_scavenge_on_the_table_index() + { + Assert.That(FakeTableIndex.ScavengeCount, Is.EqualTo(0)); + } + } } \ No newline at end of file diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_from_chunk_number.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_from_chunk_number.cs index 0547df2bcf2..a4e1022dafd 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_from_chunk_number.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_from_chunk_number.cs @@ -35,5 +35,12 @@ public void scavenge_record_for_chunks_1_plus() Assert.That(Log.Scavenged[0].ChunkStart, Is.EqualTo(1)); Assert.That(Log.Scavenged[0].ChunkEnd, Is.EqualTo(1)); } + + + [Test] + public void calls_scavenge_on_the_table_index() + { + Assert.That(FakeTableIndex.ScavengeCount, Is.EqualTo(1)); + } } } \ No newline at end of file diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_succeeds_without_error.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_succeeds_without_error.cs index 9157fe91f3f..55be63181c7 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_succeeds_without_error.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_succeeds_without_error.cs @@ -41,5 +41,11 @@ public void scavenge_record_for_all_completed_chunks_plus_merge() Assert.That(Log.Scavenged[2].ChunkStart, Is.EqualTo(0)); Assert.That(Log.Scavenged[2].ChunkEnd, Is.EqualTo(1)); } + + [Test] + public void calls_scavenge_on_the_table_index() + { + Assert.That(FakeTableIndex.ScavengeCount, Is.EqualTo(1)); + } } } diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_throws_exception_processing_chunk.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_throws_exception_processing_chunk.cs index 412b1b2252e..87038a49ea8 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_throws_exception_processing_chunk.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/when_scavenge_throws_exception_processing_chunk.cs @@ -28,5 +28,11 @@ public void no_exception_is_thrown_to_caller() Assert.That(Log.Completed); Assert.That(Log.Result, Is.EqualTo(ScavengeResult.Failed)); } + + [Test] + public void doesnt_call_scavenge_on_the_table_index() + { + Assert.That(FakeTableIndex.ScavengeCount, Is.EqualTo(0)); + } } } \ No newline at end of file diff --git a/src/EventStore.Core/EventStore.Core.csproj b/src/EventStore.Core/EventStore.Core.csproj index d8c1842649c..400a72ed040 100644 --- a/src/EventStore.Core/EventStore.Core.csproj +++ b/src/EventStore.Core/EventStore.Core.csproj @@ -154,6 +154,7 @@ + @@ -167,6 +168,7 @@ + diff --git a/src/EventStore.Core/Index/IIndexScavengerLog.cs b/src/EventStore.Core/Index/IIndexScavengerLog.cs new file mode 100644 index 00000000000..e3bac9b99d9 --- /dev/null +++ b/src/EventStore.Core/Index/IIndexScavengerLog.cs @@ -0,0 +1,11 @@ +using System; + +namespace EventStore.Core.Index +{ + public interface IIndexScavengerLog + { + void IndexTableScavenged(int level, int index, TimeSpan elapsed, long entriesDeleted, long entriesKept, long spaceSaved); + + void IndexTableNotScavenged(int level, int index, TimeSpan elapsed, long entriesKept, string errorMessage); + } +} \ No newline at end of file diff --git a/src/EventStore.Core/Index/ITableIndex.cs b/src/EventStore.Core/Index/ITableIndex.cs index 8d7d574d77d..47953631a8f 100644 --- a/src/EventStore.Core/Index/ITableIndex.cs +++ b/src/EventStore.Core/Index/ITableIndex.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading; namespace EventStore.Core.Index { @@ -18,5 +19,7 @@ public interface ITableIndex bool TryGetOldestEntry(string streamId, out IndexEntry entry); IEnumerable GetRange(string streamId, long startVersion, long endVersion, int? limit = null); + + void Scavenge(IIndexScavengerLog log, CancellationToken ct); } } \ No newline at end of file diff --git a/src/EventStore.Core/Index/IndexMap.cs b/src/EventStore.Core/Index/IndexMap.cs index 641ef877d70..c676d21b140 100644 --- a/src/EventStore.Core/Index/IndexMap.cs +++ b/src/EventStore.Core/Index/IndexMap.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Threading; using EventStore.Common.Log; using EventStore.Common.Utils; using EventStore.Core.Data; @@ -359,6 +360,46 @@ public void SaveToFile(string filename) return new MergeResult(indexMap, toDelete); } + public ScavengeResult Scavenge(Guid toScavenge, CancellationToken ct, + Func upgradeHash, + Func existsAt, + Func> recordExistsAt, + IIndexFilenameProvider filenameProvider, + byte version, + int indexCacheDepth = 16, + bool skipIndexVerify = false) + { + + var scavengedMap = CopyFrom(_map); + for (int level = 0; level < scavengedMap.Count; level++) + { + for (int i = 0; i < scavengedMap[level].Count; i++) + { + if (scavengedMap[level][i].Id == toScavenge) + { + long spaceSaved; + var filename = filenameProvider.GetFilenameNewTable(); + var oldTable = scavengedMap[level][i]; + + PTable scavenged = PTable.Scavenged(oldTable, filename, upgradeHash, existsAt, recordExistsAt, version, out spaceSaved, indexCacheDepth, skipIndexVerify, ct); + + if (scavenged == null) + { + return ScavengeResult.Failed(oldTable, level, i); + } + + scavengedMap[level][i] = scavenged; + + var indexMap = new IndexMap(Version, scavengedMap, PrepareCheckpoint, CommitCheckpoint, _maxTablesPerLevel); + + return ScavengeResult.Success(indexMap, oldTable, scavenged, spaceSaved, level, i); + } + } + } + + throw new ArgumentException("Unable to find table in map.", nameof(toScavenge)); + } + public void Dispose(TimeSpan timeout) { foreach (var ptable in InOrder()) diff --git a/src/EventStore.Core/Index/PTableConstruction.cs b/src/EventStore.Core/Index/PTableConstruction.cs index 6b294c5eb01..861ce147bc0 100644 --- a/src/EventStore.Core/Index/PTableConstruction.cs +++ b/src/EventStore.Core/Index/PTableConstruction.cs @@ -6,7 +6,7 @@ using System.Linq; using System.Runtime.InteropServices; using System.Security.Cryptography; -using EventStore.Common.Options; +using System.Threading; using EventStore.Common.Utils; namespace EventStore.Core.Index @@ -107,83 +107,93 @@ public static PTable MergeTo(IList tables, string outputFile, Func new EnumerableTable(version, table, upgradeHash, existsAt, readRecord)).ToList(); - - for (int i = 0; i < enumerators.Count; i++) + try { - if (!enumerators[i].MoveNext()) + for (int i = 0; i < enumerators.Count; i++) { - enumerators[i].Dispose(); - enumerators.RemoveAt(i); - i--; + if (!enumerators[i].MoveNext()) + { + enumerators[i].Dispose(); + enumerators.RemoveAt(i); + i--; + } } - } - - long dumpedEntryCount = 0; - using (var f = new FileStream(outputFile, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None, - DefaultSequentialBufferSize, FileOptions.SequentialScan)) - { - f.SetLength(fileSizeUpToIndexEntries); - f.Seek(0, SeekOrigin.Begin); - using (var md5 = MD5.Create()) - using (var cs = new CryptoStream(f, md5, CryptoStreamMode.Write)) - using (var bs = new BufferedStream(cs, DefaultSequentialBufferSize)) + long dumpedEntryCount = 0; + using (var f = new FileStream(outputFile, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None, + DefaultSequentialBufferSize, FileOptions.SequentialScan)) { - // WRITE HEADER - var headerBytes = new PTableHeader(version).AsByteArray(); - cs.Write(headerBytes, 0, headerBytes.Length); + f.SetLength(fileSizeUpToIndexEntries); + f.Seek(0, SeekOrigin.Begin); - var buffer = new byte[indexEntrySize]; - long indexEntry = 0L; - List midpoints = new List(); - var requiredMidpointCount = GetRequiredMidpointCountCached(numIndexEntries,version,cacheDepth); - // WRITE INDEX ENTRIES - while (enumerators.Count > 0) + using (var md5 = MD5.Create()) + using (var cs = new CryptoStream(f, md5, CryptoStreamMode.Write)) + using (var bs = new BufferedStream(cs, DefaultSequentialBufferSize)) { - var idx = GetMaxOf(enumerators); - var current = enumerators[idx].Current; - if(existsAt(current)) + // WRITE HEADER + var headerBytes = new PTableHeader(version).AsByteArray(); + cs.Write(headerBytes, 0, headerBytes.Length); + + var buffer = new byte[indexEntrySize]; + long indexEntry = 0L; + List midpoints = new List(); + var requiredMidpointCount = GetRequiredMidpointCountCached(numIndexEntries, version, cacheDepth); + // WRITE INDEX ENTRIES + while (enumerators.Count > 0) { + var idx = GetMaxOf(enumerators); + var current = enumerators[idx].Current; AppendRecordTo(bs, buffer, version, current, indexEntrySize); - if(version >= PTableVersions.IndexV4 && IsMidpointIndex(indexEntry,numIndexEntries,requiredMidpointCount)){ - midpoints.Add(new Midpoint(new IndexEntryKey(current.Stream,current.Version),indexEntry)); + if (version >= PTableVersions.IndexV4 && IsMidpointIndex(indexEntry, numIndexEntries, requiredMidpointCount)) + { + midpoints.Add(new Midpoint(new IndexEntryKey(current.Stream, current.Version),indexEntry)); } indexEntry++; - dumpedEntryCount += 1; - } - if (!enumerators[idx].MoveNext()) - { - enumerators[idx].Dispose(); - enumerators.RemoveAt(idx); + dumpedEntryCount++; + + if (!enumerators[idx].MoveNext()) + { + enumerators[idx].Dispose(); + enumerators.RemoveAt(idx); + } } - } - //WRITE MIDPOINTS - if(version >= PTableVersions.IndexV4){ - if(dumpedEntryCount!=numIndexEntries){ - //if index entries have been removed, compute the midpoints again - numIndexEntries = dumpedEntryCount; - requiredMidpointCount = GetRequiredMidpointCount(numIndexEntries, version, cacheDepth); - midpoints = ComputeMidpoints(bs,f,version,indexEntrySize,numIndexEntries, requiredMidpointCount,midpoints); + //WRITE MIDPOINTS + if (version >= PTableVersions.IndexV4) + { + if (dumpedEntryCount != numIndexEntries) + { + //if index entries have been removed, compute the midpoints again + numIndexEntries = dumpedEntryCount; + requiredMidpointCount = GetRequiredMidpointCount(numIndexEntries, version, cacheDepth); + midpoints = ComputeMidpoints(bs, f, version, indexEntrySize, numIndexEntries, requiredMidpointCount, midpoints); + } + WriteMidpointsTo(bs, f, version, indexEntrySize, buffer, dumpedEntryCount, numIndexEntries, requiredMidpointCount, midpoints); } - WriteMidpointsTo(bs,f,version,indexEntrySize,buffer,dumpedEntryCount,numIndexEntries,requiredMidpointCount,midpoints); - } - bs.Flush(); - cs.FlushFinalBlock(); + bs.Flush(); + cs.FlushFinalBlock(); - f.FlushToDisk(); - f.SetLength(f.Position + MD5Size); + f.FlushToDisk(); + f.SetLength(f.Position + MD5Size); - // WRITE MD5 - var hash = md5.Hash; - f.Write(hash, 0, hash.Length); - f.FlushToDisk(); + // WRITE MD5 + var hash = md5.Hash; + f.Write(hash, 0, hash.Length); + f.FlushToDisk(); + } + } + Log.Trace("PTables merge finished in {0} ([{1}] entries merged into {2}).", + watch.Elapsed, string.Join(", ", tables.Select(x => x.Count)), dumpedEntryCount); + return new PTable(outputFile, Guid.NewGuid(), depth: cacheDepth, skipIndexVerify: skipIndexVerify); + } + finally + { + foreach (var enumerableTable in enumerators) + { + enumerableTable.Dispose(); } } - Log.Trace("PTables merge finished in {0} ([{1}] entries merged into {2}).", - watch.Elapsed, string.Join(", ", tables.Select(x => x.Count)), dumpedEntryCount); - return new PTable(outputFile, Guid.NewGuid(), depth: cacheDepth, skipIndexVerify: skipIndexVerify); } private static int GetIndexEntrySize(byte version) @@ -212,83 +222,216 @@ private static int GetIndexEntrySize(byte version) var fileSizeUpToIndexEntries = GetFileSizeUpToIndexEntries(numIndexEntries, version); var enumerators = tables.Select(table => new EnumerableTable(version, table, upgradeHash, existsAt, readRecord)).ToList(); - long dumpedEntryCount = 0; - using (var f = new FileStream(outputFile, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None, - DefaultSequentialBufferSize, FileOptions.SequentialScan)) + try { - f.SetLength(fileSizeUpToIndexEntries); - f.Seek(0, SeekOrigin.Begin); - - using (var md5 = MD5.Create()) - using (var cs = new CryptoStream(f, md5, CryptoStreamMode.Write)) - using (var bs = new BufferedStream(cs, DefaultSequentialBufferSize)) + long dumpedEntryCount = 0; + using (var f = new FileStream(outputFile, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None, + DefaultSequentialBufferSize, FileOptions.SequentialScan)) { - // WRITE HEADER - var headerBytes = new PTableHeader(version).AsByteArray(); - cs.Write(headerBytes, 0, headerBytes.Length); + f.SetLength(fileSizeUpToIndexEntries); + f.Seek(0, SeekOrigin.Begin); - // WRITE INDEX ENTRIES - var buffer = new byte[indexEntrySize]; - long indexEntry = 0L; - List midpoints = new List(); - var requiredMidpointCount = GetRequiredMidpointCountCached(numIndexEntries,version,cacheDepth); - var enum1 = enumerators[0]; - var enum2 = enumerators[1]; - bool available1 = enum1.MoveNext(); - bool available2 = enum2.MoveNext(); - IndexEntry current; - while (available1 || available2) + using (var md5 = MD5.Create()) + using (var cs = new CryptoStream(f, md5, CryptoStreamMode.Write)) + using (var bs = new BufferedStream(cs, DefaultSequentialBufferSize)) { - var entry1 = new IndexEntry(enum1.Current.Stream, enum1.Current.Version, enum1.Current.Position); - var entry2 = new IndexEntry(enum2.Current.Stream, enum2.Current.Version, enum2.Current.Position); + // WRITE HEADER + var headerBytes = new PTableHeader(version).AsByteArray(); + cs.Write(headerBytes, 0, headerBytes.Length); + + // WRITE INDEX ENTRIES + var buffer = new byte[indexEntrySize]; + long indexEntry = 0L; + List midpoints = new List(); + var requiredMidpointCount = GetRequiredMidpointCountCached(numIndexEntries, version, cacheDepth); + var enum1 = enumerators[0]; + var enum2 = enumerators[1]; + bool available1 = enum1.MoveNext(); + bool available2 = enum2.MoveNext(); + IndexEntry current; + while (available1 || available2) + { + var entry1 = new IndexEntry(enum1.Current.Stream, enum1.Current.Version, enum1.Current.Position); + var entry2 = new IndexEntry(enum2.Current.Stream, enum2.Current.Version, enum2.Current.Position); + + if (available1 && (!available2 || entry1.CompareTo(entry2) > 0)) + { + current = entry1; + available1 = enum1.MoveNext(); + } + else + { + current = entry2; + available2 = enum2.MoveNext(); + } - if (available1 && (!available2 || entry1.CompareTo(entry2) > 0)) + AppendRecordTo(bs, buffer, version, current, indexEntrySize); + if (version >= PTableVersions.IndexV4 && IsMidpointIndex(indexEntry, numIndexEntries, requiredMidpointCount)) + { + midpoints.Add(new Midpoint(new IndexEntryKey(current.Stream, current.Version), indexEntry)); + } + indexEntry++; + dumpedEntryCount++; + } + + //WRITE MIDPOINTS + if (version >= PTableVersions.IndexV4) { - current = entry1; - available1 = enum1.MoveNext(); + if (dumpedEntryCount != numIndexEntries) + { + //if index entries have been removed, compute the midpoints again + numIndexEntries = dumpedEntryCount; + requiredMidpointCount = GetRequiredMidpointCount(numIndexEntries, version, cacheDepth); + midpoints = ComputeMidpoints(bs, f, version, indexEntrySize, numIndexEntries, requiredMidpointCount, midpoints); + } + WriteMidpointsTo(bs, f, version, indexEntrySize, buffer, dumpedEntryCount, numIndexEntries, requiredMidpointCount, midpoints); } - else + + bs.Flush(); + cs.FlushFinalBlock(); + + f.SetLength(f.Position + MD5Size); + + // WRITE MD5 + var hash = md5.Hash; + f.Write(hash, 0, hash.Length); + f.FlushToDisk(); + } + } + Log.Trace("PTables merge finished in {0} ([{1}] entries merged into {2}).", + watch.Elapsed, string.Join(", ", tables.Select(x => x.Count)), dumpedEntryCount); + return new PTable(outputFile, Guid.NewGuid(), depth: cacheDepth, skipIndexVerify: skipIndexVerify); + } + finally + { + foreach (var enumerator in enumerators) + { + enumerator.Dispose(); + } + } + + } + + public static PTable Scavenged(PTable table, string outputFile, Func upgradeHash, + Func existsAt, Func> readRecord, byte version, out long spaceSaved, + int cacheDepth = 16, bool skipIndexVerify = false, CancellationToken ct = default(CancellationToken)) + { + Ensure.NotNull(table, "table"); + Ensure.NotNullOrEmpty(outputFile, "outputFile"); + Ensure.Nonnegative(cacheDepth, "cacheDepth"); + + var indexEntrySize = GetIndexEntrySize(version); + var numIndexEntries = table.Count; + + var fileSizeUpToIndexEntries = GetFileSizeUpToIndexEntries(numIndexEntries, version); + + Log.Trace("PTables scavenge started with {0} entries.", numIndexEntries); + var watch = Stopwatch.StartNew(); + long keptCount = 0L; + long droppedCount; + + try + { + using (var f = new FileStream(outputFile, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None, + DefaultSequentialBufferSize, FileOptions.SequentialScan)) + { + f.SetLength(fileSizeUpToIndexEntries); + f.Seek(0, SeekOrigin.Begin); + + using (var md5 = MD5.Create()) + using (var cs = new CryptoStream(f, md5, CryptoStreamMode.Write)) + using (var bs = new BufferedStream(cs, DefaultSequentialBufferSize)) + { + // WRITE HEADER + var headerBytes = new PTableHeader(version).AsByteArray(); + cs.Write(headerBytes, 0, headerBytes.Length); + + // WRITE SCAVENGED INDEX ENTRIES + var buffer = new byte[indexEntrySize]; + using (var enumerator = new EnumerableTable(version, table, upgradeHash, existsAt, readRecord)) { - current = entry2; - available2 = enum2.MoveNext(); + while (enumerator.MoveNext()) + { + ct.ThrowIfCancellationRequested(); + if (existsAt(enumerator.Current)) + { + AppendRecordTo(bs, buffer, version, enumerator.Current, indexEntrySize); + keptCount++; + } + } } - if (existsAt(current)) + // We calculate this as the EnumerableTable can silently drop entries too. + droppedCount = numIndexEntries - keptCount; + + var forceKeep = version > table.Version; + + if (droppedCount == 0 && !forceKeep) { - AppendRecordTo(bs, buffer, version, current, indexEntrySize); - if(version >= PTableVersions.IndexV4 && IsMidpointIndex(indexEntry,numIndexEntries,requiredMidpointCount)){ - midpoints.Add(new Midpoint(new IndexEntryKey(current.Stream,current.Version),indexEntry)); + Log.Trace( + "PTable scavenge finished in {0}. No entries removed so not keeping scavenged table.", + watch.Elapsed); + + try + { + bs.Close(); + File.Delete(outputFile); } - indexEntry++; - dumpedEntryCount += 1; + catch (Exception ex) + { + Log.ErrorException(ex, "Unable to delete unwanted scavenged PTable: {0}", outputFile); + } + + spaceSaved = 0; + return null; } - } - //WRITE MIDPOINTS - if(version >= PTableVersions.IndexV4){ - if(dumpedEntryCount!=numIndexEntries){ - //if index entries have been removed, compute the midpoints again - numIndexEntries = dumpedEntryCount; - requiredMidpointCount = GetRequiredMidpointCount(numIndexEntries, version, cacheDepth); - midpoints = ComputeMidpoints(bs,f,version,indexEntrySize,numIndexEntries, requiredMidpointCount,midpoints); + if (droppedCount == 0 && forceKeep) + { + Log.Trace("Keeping scavenged index even though it isn't smaller; version upgraded."); } - WriteMidpointsTo(bs,f,version,indexEntrySize,buffer,dumpedEntryCount,numIndexEntries,requiredMidpointCount,midpoints); - } - bs.Flush(); - cs.FlushFinalBlock(); + //CALCULATE AND WRITE MIDPOINTS + if (version >= PTableVersions.IndexV4) + { + var requiredMidpointCount = GetRequiredMidpointCount(keptCount, version, cacheDepth); + var midpoints = ComputeMidpoints(bs, f, version, indexEntrySize, keptCount, requiredMidpointCount, new List(), ct); + WriteMidpointsTo(bs, f, version, indexEntrySize, buffer, keptCount, keptCount, requiredMidpointCount, midpoints); + } - f.SetLength(f.Position + MD5Size); + bs.Flush(); + cs.FlushFinalBlock(); - // WRITE MD5 - var hash = md5.Hash; - f.Write(hash, 0, hash.Length); - f.FlushToDisk(); + f.FlushToDisk(); + f.SetLength(f.Position + MD5Size); + + // WRITE MD5 + var hash = md5.Hash; + f.Write(hash, 0, hash.Length); + f.FlushToDisk(); + } + } + + Log.Trace("PTable scavenge finished in {0} ({1} entries removed, {2} remaining).", watch.Elapsed, + droppedCount, keptCount); + var scavengedTable = new PTable(outputFile, Guid.NewGuid(), depth: cacheDepth, + skipIndexVerify: skipIndexVerify); + spaceSaved = table._size - scavengedTable._size; + return scavengedTable; + } + catch (Exception) + { + try + { + File.Delete(outputFile); } + catch (Exception ex) + { + Log.ErrorException(ex, "Unable to delete unwanted scavenged PTable: {0}", outputFile); + } + throw; } - Log.Trace("PTables merge finished in {0} ([{1}] entries merged into {2}).", - watch.Elapsed, string.Join(", ", tables.Select(x => x.Count)), dumpedEntryCount); - return new PTable(outputFile, Guid.NewGuid(), depth: cacheDepth, skipIndexVerify: skipIndexVerify); + } private static int GetMaxOf(List enumerators) @@ -324,7 +467,7 @@ private static void AppendRecordTo(Stream stream, byte[] buffer, byte version, I stream.Write(buffer, 0, indexEntrySize); } - private static List ComputeMidpoints(BufferedStream bs, FileStream fs, byte version, int indexEntrySize, long numIndexEntries, long requiredMidpointCount,List midpoints){ + private static List ComputeMidpoints(BufferedStream bs, FileStream fs, byte version, int indexEntrySize, long numIndexEntries, long requiredMidpointCount,List midpoints, CancellationToken ct = default(CancellationToken)){ int indexKeySize; if(version == PTableVersions.IndexV4) indexKeySize = IndexKeyV4Size; @@ -341,6 +484,9 @@ private static void AppendRecordTo(Stream stream, byte[] buffer, byte version, I IndexEntryKey previousKey = new IndexEntryKey(0,0); for(int k=0;k GetEnumerator() - { - return _enumerator; - } - public void Dispose() { if (_ptableEnumerator != null) diff --git a/src/EventStore.Core/Index/ScavengeResult.cs b/src/EventStore.Core/Index/ScavengeResult.cs new file mode 100644 index 00000000000..457b36fc432 --- /dev/null +++ b/src/EventStore.Core/Index/ScavengeResult.cs @@ -0,0 +1,34 @@ +namespace EventStore.Core.Index +{ + public class ScavengeResult + { + public readonly IndexMap ScavengedMap; + public readonly bool IsSuccess; + public readonly PTable OldTable; + public readonly PTable NewTable; + public readonly long SpaceSaved; + public readonly int Level; + public readonly int Index; + + private ScavengeResult(IndexMap scavengedMap, bool isSuccess, PTable oldTable, PTable newTable, long spaceSaved, int level, int index) + { + ScavengedMap = scavengedMap; + IsSuccess = isSuccess; + OldTable = oldTable; + NewTable = newTable; + SpaceSaved = spaceSaved; + Level = level; + Index = index; + } + + public static ScavengeResult Success(IndexMap scavengedMap, PTable oldTable, PTable newTable, long spaceSaved, int level, int index) + { + return new ScavengeResult(scavengedMap, true, oldTable ,newTable, spaceSaved, level, index); + } + + public static ScavengeResult Failed(PTable oldTable, int level, int index) + { + return new ScavengeResult(null, false, oldTable, null, 0, level, index); + } + } +} \ No newline at end of file diff --git a/src/EventStore.Core/Index/TableIndex.cs b/src/EventStore.Core/Index/TableIndex.cs index 876048acd6e..204d2c5f1b3 100644 --- a/src/EventStore.Core/Index/TableIndex.cs +++ b/src/EventStore.Core/Index/TableIndex.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Text; @@ -228,12 +229,7 @@ public void AddEntries(long commitPos, IList entries) _awaitingMemTables = newTables; if (_inMem) return; - if (!_backgroundRunning) - { - _backgroundRunningEvent.Reset(); - _backgroundRunning = true; - ThreadPool.QueueUserWorkItem(x => ReadOffQueue()); - } + TryProcessAwaitingTables(); if (_additionalReclaim) ThreadPool.QueueUserWorkItem(x => ReclaimMemoryIfNeeded(_awaitingMemTables)); @@ -241,6 +237,19 @@ public void AddEntries(long commitPos, IList entries) } } + private void TryProcessAwaitingTables() + { + lock(_awaitingTablesLock) + { + if (!_backgroundRunning) + { + _backgroundRunningEvent.Reset(); + _backgroundRunning = true; + ThreadPool.QueueUserWorkItem(x => ReadOffQueue()); + } + } + } + private void ReadOffQueue() { try @@ -254,10 +263,10 @@ private void ReadOffQueue() Log.Trace("Awaiting tables queue size is: {0}.", _awaitingMemTables.Count); if (_awaitingMemTables.Count == 1) { - _backgroundRunning = false; - _backgroundRunningEvent.Set(); + return; } + tableItem = _awaitingMemTables[_awaitingMemTables.Count - 1]; } @@ -266,21 +275,25 @@ private void ReadOffQueue() if (memtable != null) { memtable.MarkForConversion(); - ptable = PTable.FromMemtable(memtable, _fileNameProvider.GetFilenameNewTable(), _indexCacheDepth, _skipIndexVerify); + ptable = PTable.FromMemtable(memtable, _fileNameProvider.GetFilenameNewTable(), + _indexCacheDepth, _skipIndexVerify); } else - ptable = (PTable)tableItem.Table; + ptable = (PTable) tableItem.Table; var indexmapFile = Path.Combine(_directory, IndexMapFilename); MergeResult mergeResult; using (var reader = _tfReaderFactory()) { - mergeResult = _indexMap.AddPTable(ptable, tableItem.PrepareCheckpoint, tableItem.CommitCheckpoint, - (streamId, currentHash) => UpgradeHash(streamId, currentHash), - entry => reader.ExistsAt(entry.Position), - entry => ReadEntry(reader, entry.Position), _fileNameProvider, _ptableVersion, _indexCacheDepth, _skipIndexVerify); + mergeResult = _indexMap.AddPTable(ptable, tableItem.PrepareCheckpoint, + tableItem.CommitCheckpoint, + (streamId, currentHash) => UpgradeHash(streamId, currentHash), + entry => reader.ExistsAt(entry.Position), + entry => ReadEntry(reader, entry.Position), _fileNameProvider, _ptableVersion, + _indexCacheDepth, _skipIndexVerify); } + _indexMap = mergeResult.MergedMap; _indexMap.SaveToFile(indexmapFile); @@ -295,23 +308,135 @@ private void ReadOffQueue() // so if we have another PTable instance with same ID, // we need to kill that instance as we added ours already if (!ReferenceEquals(corrTable.Table, ptable) && corrTable.Table is PTable) - ((PTable)corrTable.Table).MarkForDestruction(); + ((PTable) corrTable.Table).MarkForDestruction(); Log.Trace("There are now {0} awaiting tables.", memTables.Count); _awaitingMemTables = memTables; } + mergeResult.ToDelete.ForEach(x => x.MarkForDestruction()); } } catch (FileBeingDeletedException exc) { - Log.ErrorException(exc, "Could not acquire chunk in TableIndex.ReadOffQueue. It is OK if node is shutting down."); + Log.ErrorException(exc, + "Could not acquire chunk in TableIndex.ReadOffQueue. It is OK if node is shutting down."); } catch (Exception exc) { Log.ErrorException(exc, "Error in TableIndex.ReadOffQueue"); throw; } + finally + { + lock (_awaitingTablesLock) + { + _backgroundRunning = false; + _backgroundRunningEvent.Set(); + } + } + } + + internal void WaitForBackgroundTasks() + { + if (!_backgroundRunningEvent.Wait(7000)) + { + throw new TimeoutException("Waiting for background tasks took too long."); + } + } + + public void Scavenge(IIndexScavengerLog log, CancellationToken ct) + { + GetExclusiveBackgroundTask(ct); + var sw = Stopwatch.StartNew(); + + try + { + Log.Info("Starting scavenge of TableIndex."); + ScavengeInternal(log, ct); + } + finally + { + lock (_awaitingTablesLock) + { + _backgroundRunning = false; + _backgroundRunningEvent.Set(); + + TryProcessAwaitingTables(); + } + + Log.Info("Completed scavenge of TableIndex. Elapsed: {0}", sw.Elapsed); + } + } + + private void ScavengeInternal(IIndexScavengerLog log, CancellationToken ct) + { + var toScavenge = _indexMap.InOrder().ToList(); + + foreach (var pTable in toScavenge) + { + var startNew = Stopwatch.StartNew(); + + try + { + ct.ThrowIfCancellationRequested(); + + using (var reader = _tfReaderFactory()) + { + var indexmapFile = Path.Combine(_directory, IndexMapFilename); + + var scavengeResult = _indexMap.Scavenge(pTable.Id, ct, + (streamId, currentHash) => UpgradeHash(streamId, currentHash), + entry => reader.ExistsAt(entry.Position), + entry => ReadEntry(reader, entry.Position), _fileNameProvider, _ptableVersion, + _indexCacheDepth, _skipIndexVerify); + + if (scavengeResult.IsSuccess) + { + _indexMap = scavengeResult.ScavengedMap; + _indexMap.SaveToFile(indexmapFile); + + scavengeResult.OldTable.MarkForDestruction(); + + var entriesDeleted = scavengeResult.OldTable.Count - scavengeResult.NewTable.Count; + log.IndexTableScavenged(scavengeResult.Level, scavengeResult.Index, startNew.Elapsed, entriesDeleted, scavengeResult.NewTable.Count, scavengeResult.SpaceSaved); + } + else + { + log.IndexTableNotScavenged(scavengeResult.Level, scavengeResult.Index, startNew.Elapsed, pTable.Count, ""); + } + } + } + catch (OperationCanceledException) + { + log.IndexTableNotScavenged(-1, -1, startNew.Elapsed, pTable.Count, "Scavenge cancelled"); + throw; + } + catch (Exception ex) + { + log.IndexTableNotScavenged(-1, -1, startNew.Elapsed, pTable.Count, ex.Message); + throw; + } + } + } + + private void GetExclusiveBackgroundTask(CancellationToken ct) + { + while (true) + { + lock (_awaitingTablesLock) + { + if (!_backgroundRunning) + { + _backgroundRunningEvent.Reset(); + _backgroundRunning = true; + return; + } + } + + Log.Info("Waiting for TableIndex background task to complete before starting scavenge."); + _backgroundRunningEvent.Wait(ct); + } } private static Tuple ReadEntry(TFReaderLease reader, long position) diff --git a/src/EventStore.Core/Services/SystemNames.cs b/src/EventStore.Core/Services/SystemNames.cs index 6edd396994b..7e481d34735 100644 --- a/src/EventStore.Core/Services/SystemNames.cs +++ b/src/EventStore.Core/Services/SystemNames.cs @@ -84,6 +84,7 @@ public static class SystemEventTypes public const string ScavengeCompleted = "$scavengeCompleted"; public const string ScavengeChunksCompleted = "$scavengeChunksCompleted"; public const string ScavengeMergeCompleted = "$scavengeMergeCompleted"; + public const string ScavengeIndexCompleted = "$scavengeIndexCompleted"; public static string StreamReferenceEventToStreamId(string eventType, byte[] data) { diff --git a/src/EventStore.Core/TransactionLog/Chunks/ITFChunkScavengerLog.cs b/src/EventStore.Core/TransactionLog/Chunks/ITFChunkScavengerLog.cs index 1a4628fec92..4bdbf552c4f 100644 --- a/src/EventStore.Core/TransactionLog/Chunks/ITFChunkScavengerLog.cs +++ b/src/EventStore.Core/TransactionLog/Chunks/ITFChunkScavengerLog.cs @@ -1,8 +1,9 @@ using System; +using EventStore.Core.Index; namespace EventStore.Core.TransactionLog.Chunks { - public interface ITFChunkScavengerLog + public interface ITFChunkScavengerLog : IIndexScavengerLog { string ScavengeId { get; } @@ -18,7 +19,7 @@ public interface ITFChunkScavengerLog void ScavengeCompleted(ScavengeResult result, string error, TimeSpan elapsed); } - + public enum ScavengeResult { Success, diff --git a/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs b/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs index 62a90d07050..11a0ab95f4b 100644 --- a/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs +++ b/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs @@ -93,6 +93,8 @@ public Task Scavenge(bool alwaysKeepScavenged, bool mergeChunks, int startFromCh _scavengerLog.ScavengeStarted(); ScavengeInternal(alwaysKeepScavenged, mergeChunks, startFromChunk, ct); + + _tableIndex.Scavenge(_scavengerLog, ct); } catch (OperationCanceledException) { diff --git a/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavengerLog.cs b/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavengerLog.cs index a4fae986ec1..529e533a793 100644 --- a/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavengerLog.cs +++ b/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavengerLog.cs @@ -137,6 +137,44 @@ public void ChunksNotMerged(int chunkStartNumber, int chunkEndNumber, TimeSpan e } + public void IndexTableScavenged(int level, int index, TimeSpan elapsed, long entriesDeleted, long entriesKept, + long spaceSaved) + { + Interlocked.Add(ref _spaceSaved, spaceSaved); + var evnt = new Event(Guid.NewGuid(), SystemEventTypes.ScavengeIndexCompleted, true, new Dictionary{ + {"scavengeId", _scavengeId}, + {"level", level}, + {"index", index}, + {"timeTaken", elapsed}, + {"entriesDeleted", entriesDeleted}, + {"entriesKept", entriesKept}, + {"spaceSaved", spaceSaved}, + {"wasScavenged", true}, + {"nodeEndpoint", _nodeId}, + {"errorMessage", ""} + }.ToJsonBytes(), null); + + WriteScavengeChunkCompletedEvent(_streamName, evnt, _retryAttempts); + } + + public void IndexTableNotScavenged(int level, int index, TimeSpan elapsed, long entriesKept, string errorMessage) + { + var evnt = new Event(Guid.NewGuid(), SystemEventTypes.ScavengeIndexCompleted, true, new Dictionary{ + {"scavengeId", _scavengeId}, + {"level", level}, + {"index", index}, + {"timeTaken", elapsed}, + {"entriesDeleted", 0}, + {"entriesKept", entriesKept}, + {"spaceSaved", 0}, + {"wasScavenged", false}, + {"nodeEndpoint", _nodeId}, + {"errorMessage", errorMessage} + }.ToJsonBytes(), null); + + WriteScavengeChunkCompletedEvent(_streamName, evnt, _retryAttempts); + } + private void WriteScavengeChunkCompletedEvent(string streamId, Event eventToWrite, int retryCount) { _ioDispatcher.WriteEvent(streamId, ExpectedVersion.Any, eventToWrite, SystemAccount.Principal, m => WriteScavengeChunkCompletedEventCompleted(m, streamId, eventToWrite, retryCount)); @@ -204,5 +242,6 @@ private void WriteScavengeDetailEventCompleted(ClientMessage.WriteEventsComplete WriteScavengeIndexEvent(linkToIndexEvent, _retryAttempts); } } + } } \ No newline at end of file