Skip to content

Commit

Permalink
Merge pull request #1842 from EventStore/trigger-indexmerge-with-maxA…
Browse files Browse the repository at this point in the history
…utomaticMerge

Adds a maximum index level for automatically merging ptables
  • Loading branch information
jen20 committed Jan 28, 2019
2 parents 7d08c56 + a65a109 commit f25424d
Show file tree
Hide file tree
Showing 51 changed files with 814 additions and 110 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -104,3 +104,6 @@ test_results

# Visual Studio Code
src/.vscode/

#NCrunch
_NCrunch_*/
5 changes: 5 additions & 0 deletions src/EventStore.ClusterNode/ClusterNodeOptions.cs
Expand Up @@ -242,6 +242,9 @@ public class ClusterNodeOptions : IOptions

[ArgDescription(Opts.StructuredLogDescr, Opts.DbGroup)]
public bool StructuredLog { get; set; }

[ArgDescription(Opts.MaxAutoMergeIndexLevelDescr, Opts.DbGroup)]
public int MaxAutoMergeIndexLevel { get; set; }

public ClusterNodeOptions()
{
Expand Down Expand Up @@ -363,6 +366,8 @@ public ClusterNodeOptions()

ConnectionPendingSendBytesThreshold = Opts.ConnectionPendingSendBytesThresholdDefault;
ChunkInitialReaderCount = Opts.ChunkInitialReaderCountDefault;

MaxAutoMergeIndexLevel = Opts.MaxAutoMergeIndexLevelDefault;
}
}
}
3 changes: 2 additions & 1 deletion src/EventStore.ClusterNode/Program.cs
Expand Up @@ -218,7 +218,8 @@ private static ClusterVNode BuildNode(ClusterNodeOptions options)
.HavingReaderThreads(options.ReaderThreadsCount)
.WithConnectionPendingSendBytesThreshold(options.ConnectionPendingSendBytesThreshold)
.WithChunkInitialReaderCount(options.ChunkInitialReaderCount)
.WithInitializationThreads(options.InitializationThreads);
.WithInitializationThreads(options.InitializationThreads)
.WithMaxAutoMergeIndexLevel(options.MaxAutoMergeIndexLevel);

if(options.GossipSeed.Length > 0)
builder.WithGossipSeeds(options.GossipSeed);
Expand Down
@@ -0,0 +1,33 @@
using System;
using System.Linq;
using EventStore.Core.Index;
using NUnit.Framework;

namespace EventStore.Core.Tests.Index.AutoMergeLevelTests
{
public class when_max_auto_merge_level_is_reduced : when_max_auto_merge_level_is_set
{
public when_max_auto_merge_level_is_reduced():base(5)
{
}

[Test]
public void should_merge_levels_above_max_level()
{
AddTables(201); //gives 1 level 0, 1 level 3 and 6 level 5s
Assert.AreEqual(8,_result.MergedMap.InOrder().Count());
var filename = GetFilePathFor("changemaxlevel");
_result.MergedMap.SaveToFile(filename);
_result.MergedMap.Dispose(TimeSpan.FromMilliseconds(100));
_map.Dispose(TimeSpan.FromMilliseconds(100));
_map = IndexMap.FromFile(filename, maxAutoMergeLevel:3);
var (level, table)= _map.GetTableForManualMerge();
Assert.AreEqual(5, level);
_result = _map.AddPTable(table, _result.MergedMap.PrepareCheckpoint, _result.MergedMap.CommitCheckpoint, UpgradeHash, ExistsAt,
RecordExistsAt, _fileNameProvider, _ptableVersion,
level: level,
skipIndexVerify: _skipIndexVerify);
Assert.AreEqual(2,_result.MergedMap.InOrder().Count());
}
}
}
@@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Core.Index;
using EventStore.Core.Tests.Services.Storage.Transactions;
using NUnit.Framework;

namespace EventStore.Core.Tests.Index.AutoMergeLevelTests
{
[TestFixture]
public class when_max_auto_merge_level_is_set : SpecificationWithDirectoryPerTestFixture
{
private readonly int _maxAutoMergeLevel;
protected string _filename;
protected IndexMap _map;
protected byte _ptableVersion = 4;
protected MergeResult _result;
protected bool _skipIndexVerify = true;
protected GuidFilenameProvider _fileNameProvider;

public when_max_auto_merge_level_is_set(int maxAutoMergeLevel = 2)
{
_maxAutoMergeLevel = maxAutoMergeLevel;
}
[OneTimeSetUp]
public void Setup()
{
_filename = GetTempFilePath();
_fileNameProvider = new GuidFilenameProvider(PathName);
_map = IndexMap.FromFile(_filename, maxTablesPerLevel: 2, maxAutoMergeLevel:_maxAutoMergeLevel);

}

protected void AddTables(int count)
{
var memtable = new HashListMemTable(_ptableVersion, maxSize: 10);
memtable.Add(0, 1, 0);
var first = _map;
if (_result != null)
first = _result.MergedMap;
_result = first.AddPTable(PTable.FromMemtable(memtable, GetTempFilePath(),skipIndexVerify:_skipIndexVerify),
10, 20, UpgradeHash, ExistsAt, RecordExistsAt, _fileNameProvider, _ptableVersion,
0, 0,skipIndexVerify:_skipIndexVerify);
for (int i = 3; i <= count * 2; i+=2)
{
_result = _result.MergedMap.AddPTable(
PTable.FromMemtable(memtable, GetTempFilePath(), skipIndexVerify: _skipIndexVerify),
i*10, (i+1)*10, (streamId, hash) => hash, _ => true, _ => new Tuple<string, bool>("", true),
_fileNameProvider, _ptableVersion, 0,0, skipIndexVerify: _skipIndexVerify);
_result.ToDelete.ForEach(x => x.MarkForDestruction());

}
}

[OneTimeTearDown]
public override void TestFixtureTearDown()
{
_result.ToDelete.ForEach(x=>x.MarkForDestruction());
_result.MergedMap.InOrder().ToList().ForEach(x => x.MarkForDestruction());
_result.MergedMap.Dispose(TimeSpan.FromMilliseconds(100));
_map.Dispose(TimeSpan.FromMilliseconds(100));
File.Delete(_filename);

base.TestFixtureTearDown();
}

protected Tuple<string, bool> RecordExistsAt(IndexEntry arg)
{
return Tuple.Create("", true);
}

protected bool ExistsAt(IndexEntry arg)
{
return true;
}

protected static ulong UpgradeHash(string stream, ulong hash)
{
return hash;
}
}
}
@@ -0,0 +1,35 @@
using System;
using System.Linq;
using NUnit.Framework;

namespace EventStore.Core.Tests.Index.AutoMergeLevelTests
{
public class when_no_tables_are_eligible_for_manual_merge: when_max_auto_merge_level_is_set
{
[SetUp]
public void Setup()
{
AddTables(8);
Assert.AreEqual(2, _result.MergedMap.InOrder().Count());
var (level, table)= _result.MergedMap.GetTableForManualMerge();
Assert.NotNull(table);

_result = _result.MergedMap.AddPTable(table, _result.MergedMap.PrepareCheckpoint, _result.MergedMap.CommitCheckpoint, UpgradeHash, ExistsAt,
RecordExistsAt, _fileNameProvider, _ptableVersion,
level: level,
skipIndexVerify: _skipIndexVerify);
_result.ToDelete.ForEach(x=>x.MarkForDestruction());
}
[Test]
public void should_not_return_table_for_merge()
{
Assert.AreEqual(1, _result.MergedMap.InOrder().Count());
AddTables(3); //adding 3 tables will cause an auto merge, but not enough to give us tables for manual merge
var (level, table)= _result.MergedMap.GetTableForManualMerge();
Assert.Null(table);

Assert.AreEqual(3,_result.MergedMap.InOrder().Count());

}
}
}
@@ -0,0 +1,19 @@
using System;
using System.Linq;
using NUnit.Framework;

namespace EventStore.Core.Tests.Index.AutoMergeLevelTests
{
public class when_no_tables_have_yet_reached_maximum_automerge_level: when_max_auto_merge_level_is_set
{
[Test]
public void should_not_return_table_for_merge()
{
AddTables(3);
Assert.AreEqual(2, _result.MergedMap.InOrder().Count());
var (level, table)= _result.MergedMap.GetTableForManualMerge();
Assert.AreEqual(1, level);
Assert.Null(table);
}
}
}
@@ -0,0 +1,24 @@
using System;
using System.Linq;
using NUnit.Framework;

namespace EventStore.Core.Tests.Index.AutoMergeLevelTests
{
public class when_tables_available_for_manual_merge : when_max_auto_merge_level_is_set
{
[Test]
public void should_merge_pending_tables_at_max_auto_merge_level()
{
AddTables(100);
Assert.AreEqual(25, _result.MergedMap.InOrder().Count());
var (level, table)= _result.MergedMap.GetTableForManualMerge();
Assert.AreEqual(2, level);
_result = _result.MergedMap.AddPTable(table, _result.MergedMap.PrepareCheckpoint, _result.MergedMap.CommitCheckpoint, UpgradeHash, ExistsAt,
RecordExistsAt, _fileNameProvider, _ptableVersion,
level: level,
skipIndexVerify: _skipIndexVerify);
Assert.AreEqual(1,_result.MergedMap.InOrder().Count());

}
}
}
Expand Up @@ -21,6 +21,7 @@ public class adding_four_items_to_empty_index_map_with_four_tables_per_level_cau
private MergeResult _result;
protected byte _ptableVersion = PTableVersions.IndexV1;
private bool _skipIndexVerify;
private int _maxAutoMergeIndexLevel = 4;

public adding_four_items_to_empty_index_map_with_four_tables_per_level_causes_merge(byte version, bool skipIndexVerify){
_ptableVersion = version;
Expand All @@ -41,22 +42,28 @@ public override void TestFixtureSetUp()

_result = _map.AddPTable(PTable.FromMemtable(memtable, GetTempFilePath(),skipIndexVerify:_skipIndexVerify), 1, 2,
(streamId, hash) => hash,
_ => true, _ => new System.Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion, skipIndexVerify: _skipIndexVerify);
_ => true,
_ => new System.Tuple<string, bool>("", true),
new GuidFilenameProvider(PathName),
_ptableVersion,
_maxAutoMergeIndexLevel,
0,
skipIndexVerify: _skipIndexVerify);
_result.ToDelete.ForEach(x => x.MarkForDestruction());

_result = _result.MergedMap.AddPTable(PTable.FromMemtable(memtable, GetTempFilePath(),skipIndexVerify:_skipIndexVerify), 3, 4,
(streamId, hash) => hash,
_ => true, _ => new System.Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion, skipIndexVerify: _skipIndexVerify);
_ => true, _ => new System.Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion, _maxAutoMergeIndexLevel, 0,skipIndexVerify: _skipIndexVerify);
_result.ToDelete.ForEach(x => x.MarkForDestruction());

_result = _result.MergedMap.AddPTable(PTable.FromMemtable(memtable, GetTempFilePath(),skipIndexVerify:_skipIndexVerify), 4, 5,
(streamId, hash) => hash,
_ => true, _ => new System.Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion, skipIndexVerify: _skipIndexVerify);
_ => true, _ => new System.Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion, _maxAutoMergeIndexLevel, 0,skipIndexVerify: _skipIndexVerify);
_result.ToDelete.ForEach(x => x.MarkForDestruction());

_result = _result.MergedMap.AddPTable(PTable.FromMemtable(memtable, GetTempFilePath(),skipIndexVerify:_skipIndexVerify), 0, 1,
(streamId, hash) => hash,
_ => true, _ => new System.Tuple<string, bool>("", true), new FakeFilenameProvider(_mergeFile), _ptableVersion, skipIndexVerify: _skipIndexVerify);
_ => true, _ => new System.Tuple<string, bool>("", true), new FakeFilenameProvider(_mergeFile), _ptableVersion, _maxAutoMergeIndexLevel, 0,skipIndexVerify: _skipIndexVerify);
_result.ToDelete.ForEach(x => x.MarkForDestruction());
}

Expand Down
Expand Up @@ -22,6 +22,7 @@ public class adding_four_items_to_empty_index_map_with_two_tables_per_level_caus
private MergeResult _result;
protected byte _ptableVersion = PTableVersions.IndexV1;
private bool _skipIndexVerify;
private int _maxAutoMergeIndexLevel = 4;

public adding_four_items_to_empty_index_map_with_two_tables_per_level_causes_double_merge(byte version, bool skipIndexVerify){
_ptableVersion = version;
Expand All @@ -41,16 +42,16 @@ public override void TestFixtureSetUp()
memtable.Add(0, 1, 0);

_result = _map.AddPTable(PTable.FromMemtable(memtable, GetTempFilePath(),skipIndexVerify:_skipIndexVerify),
10, 20, (streamId, hash) => hash, _ => true, _ => new Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion,skipIndexVerify:_skipIndexVerify);
10, 20, (streamId, hash) => hash, _ => true, _ => new Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion, _maxAutoMergeIndexLevel, 0,skipIndexVerify:_skipIndexVerify);
_result.ToDelete.ForEach(x => x.MarkForDestruction());
_result = _result.MergedMap.AddPTable(PTable.FromMemtable(memtable, GetTempFilePath(),skipIndexVerify:_skipIndexVerify),
20, 30, (streamId, hash) => hash, _ => true, _ => new Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion,skipIndexVerify:_skipIndexVerify);
20, 30, (streamId, hash) => hash, _ => true, _ => new Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion, _maxAutoMergeIndexLevel, 0,skipIndexVerify:_skipIndexVerify);
_result.ToDelete.ForEach(x => x.MarkForDestruction());
_result = _result.MergedMap.AddPTable(PTable.FromMemtable(memtable, GetTempFilePath(),skipIndexVerify:_skipIndexVerify),
30, 40, (streamId, hash) => hash, _ => true, _ => new Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion,skipIndexVerify:_skipIndexVerify);
30, 40, (streamId, hash) => hash, _ => true, _ => new Tuple<string, bool>("", true), new GuidFilenameProvider(PathName), _ptableVersion, _maxAutoMergeIndexLevel, 0,skipIndexVerify:_skipIndexVerify);
_result.ToDelete.ForEach(x => x.MarkForDestruction());
_result = _result.MergedMap.AddPTable(PTable.FromMemtable(memtable, GetTempFilePath(),skipIndexVerify:_skipIndexVerify),
50, 60, (streamId, hash) => hash, _ => true, _ => new Tuple<string, bool>("", true), new FakeFilenameProvider(_mergeFile + ".firstmerge", _mergeFile), _ptableVersion,skipIndexVerify:_skipIndexVerify);
50, 60, (streamId, hash) => hash, _ => true, _ => new Tuple<string, bool>("", true), new FakeFilenameProvider(_mergeFile + ".firstmerge", _mergeFile), _ptableVersion, _maxAutoMergeIndexLevel, 0,skipIndexVerify:_skipIndexVerify);
_result.ToDelete.ForEach(x => x.MarkForDestruction());
}

Expand Down
Expand Up @@ -22,6 +22,7 @@ public class adding_item_to_empty_index_map: SpecificationWithDirectoryPerTestFi
private MergeResult _result;
protected byte _ptableVersion = PTableVersions.IndexV1;
private bool _skipIndexVerify;
private int _maxAutoMergeIndexLevel = 4;

public adding_item_to_empty_index_map(byte version, bool skipIndexVerify){
_ptableVersion = version;
Expand All @@ -41,7 +42,7 @@ public override void TestFixtureSetUp()
var memtable = new HashListMemTable(_ptableVersion, maxSize: 10);
memtable.Add(0, 1, 0);
var table = PTable.FromMemtable(memtable, _tablename,skipIndexVerify:_skipIndexVerify);
_result = _map.AddPTable(table, 7, 11, (streamId, hash) => hash, _ => true, _ => new System.Tuple<string, bool>("", true), new FakeFilenameProvider(_mergeFile), _ptableVersion,skipIndexVerify: _skipIndexVerify);
_result = _map.AddPTable(table, 7, 11, (streamId, hash) => hash, _ => true, _ => new System.Tuple<string, bool>("", true), new FakeFilenameProvider(_mergeFile), _ptableVersion, _maxAutoMergeIndexLevel, 0,skipIndexVerify: _skipIndexVerify);
table.MarkForDestruction();
}

Expand Down

0 comments on commit f25424d

Please sign in to comment.