Skip to content

Commit

Permalink
wip: Add fsyncing persistence strategy for bloom filter
Browse files Browse the repository at this point in the history
todo: testing and more testing
  • Loading branch information
timothycoleman committed Feb 14, 2022
1 parent bfdb4db commit abc7823
Show file tree
Hide file tree
Showing 25 changed files with 986 additions and 416 deletions.
@@ -1,7 +1,7 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter.PersistentBloomFilters;
using EventStore.Core.Index.Hashes;
using NUnit.Framework;

Expand Down
@@ -1,4 +1,4 @@
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter.PersistentBloomFilters;
using NUnit.Framework;

namespace EventStore.Core.Tests.DataStructures {
Expand Down
@@ -1,11 +1,11 @@
using System;
using System.Collections.Generic;
using System.IO;
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter.PersistentBloomFilters;
using NUnit.Framework;

namespace EventStore.Core.Tests.DataStructures {
public class memory_mapped_file_stream_bloom_filter : SpecificationWithDirectoryPerTestFixture {
public class persistent_stream_bloom_filter : SpecificationWithDirectoryPerTestFixture {
private static string GenerateCharset() {
var charset = "";
for (var c = 'a'; c <= 'z'; c++) {
Expand Down Expand Up @@ -44,14 +44,19 @@ public class memory_mapped_file_stream_bloom_filter : SpecificationWithDirectory
}

[TestFixture]
private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
private MemoryMappedFileStreamBloomFilter _filter;
private class with_fixed_size_filter : persistent_stream_bloom_filter {
private PersistentStreamBloomFilter _filter;
private string _path;

[SetUp]
public void SetUp() {
_path = GetTempFilePath();
_filter = new MemoryMappedFileStreamBloomFilter(_path, create: true, MemoryMappedFileBloomFilter.MinSizeKB * 1000, hasher: null);
_filter = new PersistentStreamBloomFilter(
new FileStreamPersistence(
size: BloomFilterAccessor.MinSizeKB * 1000,
path: _path,
create: true),
hasher: null);
}

[TearDown]
Expand All @@ -63,8 +68,14 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
[Test]
public void can_close_and_reopen() {
_filter.Add("hello");
_filter.Flush();
_filter.Dispose();
using var newFilter = new MemoryMappedFileStreamBloomFilter(_path, create: false, MemoryMappedFileBloomFilter.MinSizeKB * 1000, hasher: null);
using var newFilter = new PersistentStreamBloomFilter(
new FileStreamPersistence(
size: BloomFilterAccessor.MinSizeKB * 1000,
path: _path,
create: false),
hasher: null);
Assert.IsTrue(newFilter.MightContain("hello"));
}

Expand All @@ -82,7 +93,7 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
var numBits = binaryReader.ReadInt64();
Assert.AreEqual( 0x01, version);
Assert.AreEqual( 0, corruptionRebuildCount);
Assert.AreEqual( MemoryMappedFileBloomFilter.MinSizeKB * 1000 * 8, numBits);
Assert.AreEqual(BloomFilterAccessor.MinSizeKB * 1000 * 8, numBits);
}

[Test]
Expand All @@ -99,10 +110,15 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {

[Test, Combinatorial]
public void has_false_positives_with_probability_p(
[Values(MemoryMappedFileBloomFilter.MinSizeKB*1000,2*MemoryMappedFileBloomFilter.MinSizeKB*1000)] long size,
[Values(BloomFilterAccessor.MinSizeKB*1000,2* BloomFilterAccessor.MinSizeKB*1000)] long size,
[Values(0.001,0.02,0.05,0.1,0.2)] double p
) {
using var filter = new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, size, hasher: null);
using var filter = new PersistentStreamBloomFilter(
new FileStreamPersistence(
size: size,
path: GetTempFilePath(),
create: true),
hasher: null);
var n = (int) filter.CalculateOptimalNumItems(p);

var random = new Random(123);
Expand Down Expand Up @@ -157,8 +173,13 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {

[Test, Category("LongRunning")]
public void always_returns_true_when_an_item_was_added([Range(10_000, 100_000, 13337)] long size) {
using var filter = new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, size, hasher: null);
var strings = GenerateRandomStrings((int)filter.CalculateOptimalNumItems(MemoryMappedFileBloomFilter.RecommendedFalsePositiveProbability), 100);
using var filter = new PersistentStreamBloomFilter(
new FileStreamPersistence(
size: size,
path: GetTempFilePath(),
create: true),
hasher: null);
var strings = GenerateRandomStrings((int)filter.CalculateOptimalNumItems(PersistentBloomFilter.RecommendedFalsePositiveProbability), 100);

//no items added yet
foreach (var s in strings) {
Expand All @@ -180,25 +201,45 @@ private class with_fixed_size_filter : memory_mapped_file_stream_bloom_filter {
[Test]
public void throws_argument_out_of_range_exception_when_given_negative_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, -1, hasher: null));
new PersistentStreamBloomFilter(
new FileStreamPersistence(
size: -1,
path: GetTempFilePath(),
create: true),
hasher: null));
}

[Test]
public void throws_argument_out_of_range_exception_when_given_zero_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, 0, hasher: null));
new PersistentStreamBloomFilter(
new FileStreamPersistence(
size: 0,
path: GetTempFilePath(),
create: true),
hasher: null));
}

[Test]
public void throws_argument_out_of_range_exception_when_size_less_than_min_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, MemoryMappedFileBloomFilter.MinSizeKB * 1000 - 1, hasher: null));
new PersistentStreamBloomFilter(
new FileStreamPersistence(
size: BloomFilterAccessor.MinSizeKB * 1000 - 1,
path: GetTempFilePath(),
create: true),
hasher: null));
}

[Test]
public void throws_argument_out_of_range_exception_when_size_greater_than_max_size() {
Assert.Throws<ArgumentOutOfRangeException>(() =>
new MemoryMappedFileStreamBloomFilter(GetTempFilePath(), create: true, MemoryMappedFileBloomFilter.MaxSizeKB * 1000 + 1, hasher: null));
new PersistentStreamBloomFilter(
new FileStreamPersistence(
size: BloomFilterAccessor.MaxSizeKB * 1000 + 1,
path: GetTempFilePath(),
create: true),
hasher: null));
}
}
}
@@ -1,7 +1,7 @@
using System;
using System.IO;
using System.Threading;
using EventStore.Core.DataStructures.ProbabilisticFilter.MemoryMappedFileBloomFilter;
using EventStore.Core.DataStructures.ProbabilisticFilter.PersistentBloomFilters;
using EventStore.Core.LogAbstraction;
using EventStore.Core.LogAbstraction.Common;
using EventStore.Core.TransactionLog.Checkpoint;
Expand Down Expand Up @@ -29,7 +29,7 @@ public class StreamExistenceFilterTests :

checkpointInterval ??= TimeSpan.FromMilliseconds(10);
var checkpointPath = Path.Combine(_fixture.Directory, $"{name}.chk");
var checkpoint = new MemoryMappedFileCheckpoint(checkpointPath, name, cached: true, initValue: -1);
var checkpoint = new FileCheckpoint(checkpointPath, name, cached: true, initValue: -1);
var filter = new StreamExistenceFilter(
directory: _fixture.Directory,
checkpoint: checkpoint,
Expand Down Expand Up @@ -77,8 +77,12 @@ public class StreamExistenceFilterTests :
var sut = GenSut();
sut.Initialize(new MockExistenceFilterInitializer("0", "1", "2"), 0);
Assert.Equal(2L, sut.CurrentCheckpoint);
// wait for flush so that we have something to truncate (or it will do nothing)
AssertEx.IsOrBecomesTrue(() => sut.CurrentCheckpointFlushed == 2, TimeSpan.FromSeconds(5));

// truncate works
sut.Dispose();
sut = GenSut();
sut.TruncateTo(1);
Assert.Equal(1L, sut.CurrentCheckpoint);

Expand Down

This file was deleted.

This file was deleted.

0 comments on commit abc7823

Please sign in to comment.